yaoyigou_crawl.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import time
  2. import requests
  3. import json
  4. from commons.Logger import get_spider_logger
  5. from pipelines.drug_pipelines import DrugPipeline
  6. logger = get_spider_logger("yaoyigou")
  7. user_name = '青羊新洋诊所'
  8. password = '123321'
  9. headers = {
  10. 'accept': 'application/json, text/plain, */*',
  11. 'accept-language': 'zh-CN,zh;q=0.9',
  12. 'content-type': 'application/json;charset=UTF-8',
  13. 'origin': 'https://www.hezongyy.com',
  14. 'priority': 'u=1, i',
  15. 'referer': 'https://www.hezongyy.com/',
  16. 'sec-ch-ua': '"Google Chrome";v="147", "Not.A/Brand";v="8", "Chromium";v="147"',
  17. 'sec-ch-ua-mobile': '?0',
  18. 'sec-ch-ua-platform': '"Windows"',
  19. 'sec-fetch-dest': 'empty',
  20. 'sec-fetch-mode': 'cors',
  21. 'sec-fetch-site': 'same-site',
  22. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36',
  23. }
  24. class Yaoyigou:
  25. def __init__(self, task_dict=None):
  26. self.hesytoken = None
  27. self.headers = headers.copy()
  28. self.platform = 8
  29. self.collect_task_id = None
  30. self.pipeline = DrugPipeline("yaoyigou")
  31. self.company_id = 0
  32. self.brand = ""
  33. self.is_success = True
  34. self.task_dict = task_dict
  35. if self.task_dict:
  36. self.get_product_data()
  37. def get_product_data(self):
  38. self.task_id = self.task_dict["id"]
  39. self.company_id = self.task_dict["company_id"]
  40. self.product = self.task_dict["product_name"]
  41. self.product_desc = self.task_dict.get("product_specs", "")
  42. self.brand = self.task_dict.get("product_brand", "")
  43. self.product_keyword = self.task_dict.get("product_keyword", "")
  44. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  45. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  46. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  47. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  48. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  49. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  50. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  51. self.collect_round = self.task_dict.get("collect_round",1)
  52. def _post_json_with_retry(self, url, payload, max_retry=1, timeout=20):
  53. """
  54. 请求失败后重试一次(默认总尝试 2 次)。
  55. """
  56. for attempt in range(max_retry + 1):
  57. try:
  58. response = requests.post(
  59. url,
  60. headers=self.headers,
  61. json=payload,
  62. timeout=timeout,
  63. )
  64. response.raise_for_status()
  65. return response.json()
  66. except Exception as e:
  67. if attempt < max_retry:
  68. logger.warning("请求失败,准备重试(%s/%s): %s", attempt + 1, max_retry + 1, e)
  69. time.sleep(2)
  70. continue
  71. logger.error("请求失败,已达最大重试次数: %s", e)
  72. raise
  73. def login_yaoyigou(self):
  74. json_data = {
  75. 'username': user_name,
  76. 'password': password,
  77. 'channel': 1,
  78. 'timeout': 604800,
  79. 'fingerprint': 'b86bae775df98bc20c5199dab0ac4edb',
  80. 'companyLimited': 0, }
  81. try:
  82. data_json = self._post_json_with_retry(
  83. 'https://newapi.hezongyy.com/users/UserLogin/login',
  84. json_data,
  85. max_retry=1,
  86. timeout=20,
  87. )
  88. self.hesytoken = data_json.get('content', "")
  89. except Exception as e:
  90. logger.error("登录失败: %s", e)
  91. self.hesytoken = ""
  92. return self.hesytoken
  93. def parse_product(self, item):
  94. item_id = item.get("id")
  95. now = time.strftime("%Y-%m-%d %H:%M:%S")
  96. safe_item_id = str(item_id).strip() if item_id not in (None, "") else ""
  97. detail_url = ""
  98. if safe_item_id:
  99. detail_url = f"https://www.hezongyy.com/#/goodsDetails?id={safe_item_id}"
  100. # 字段与 yaofangwang_crawl 中 product 对齐(供 DrugPipeline)
  101. product = {
  102. "platform": self.platform,
  103. "item_id": item_id,
  104. "enterprise_id": self.company_id,
  105. "product_name": item.get("name", ""),
  106. "spec": item.get("specification", ""),
  107. "one_price": "",
  108. "detail_url": detail_url,
  109. "shop_name": "",
  110. "anonymous_store_name": "",
  111. "shop_url": "",
  112. "city_name": "",
  113. "city_id": "",
  114. "province_name": "",
  115. "province_id": "",
  116. "shipment_city_name": "",
  117. "shipment_city_id": "",
  118. "shipment_province_name": "",
  119. "shipment_province_id": "",
  120. "area_info": "",
  121. "factory_name": item.get("manufacturerName", ""),
  122. "scrape_date": time.strftime("%Y-%m-%d"),
  123. "price": item.get("sellingPrice"),
  124. "sales": "",
  125. "stock_count": item.get("quantity", ""),
  126. "snapshot_url": "",
  127. "approval_num": item.get("licenseNumber"),
  128. "produced_time": "",
  129. "deadline": item.get("expirationDate", ""),
  130. "update_time": now,
  131. "insert_time": now,
  132. "number": 1,
  133. "product_brand": self.brand or "",
  134. "collect_task_id": self.collect_task_id,
  135. "search_name": self.product,
  136. "company_name": "",
  137. "collect_config_info": json.dumps(
  138. {
  139. "sampling_cycle": self.sampling_cycle,
  140. "sampling_start_time": self.sampling_start_time,
  141. "sampling_end_time": self.sampling_end_time,
  142. }
  143. ),
  144. "account_id": self.account_id,
  145. "collect_region_id": self.collect_region_id,
  146. "collect_round": self.collect_round,
  147. "is_sold_out": 1
  148. }
  149. return product
  150. def get_good_ids(self, page_num):
  151. keyword = self.product_keyword or self.product
  152. json_data = {
  153. 'channelType': 0,
  154. 'columnType': 0,
  155. 'searchType': 0,
  156. 'pageNumber': page_num,
  157. 'pageSize': 20,
  158. 'keyWords': keyword,
  159. 'newCategoryLabelId': '-1',
  160. }
  161. good_ids = []
  162. data_json = self._post_json_with_retry(
  163. 'https://newapi.hezongyy.com/goods/search/onCondition',
  164. json_data,
  165. max_retry=1,
  166. timeout=20,
  167. )
  168. list_data = data_json.get("content", {}).get("list", [])
  169. for list_item in list_data:
  170. good_ids.append(str(list_item.get("goodsId")) + '0')
  171. return good_ids
  172. def search_data(self):
  173. for page_num in range(1, 20):
  174. good_ids = self.get_good_ids(page_num)
  175. if not good_ids:
  176. break
  177. json_data = {
  178. 'goodsIdList': good_ids,
  179. 'limitSize': 20,
  180. 'limitStart': (page_num - 1) * 20,
  181. 'keyword': self.product_keyword or self.product,
  182. 'clientType': 1,
  183. }
  184. json_data = self._post_json_with_retry(
  185. 'https://newapi.hezongyy.com/goods/goods/listNormal',
  186. json_data,
  187. max_retry=1,
  188. timeout=20,
  189. )
  190. contents = json_data.get("content", [])
  191. if not contents:
  192. break
  193. for content in contents:
  194. company = content.get("manufacturerName", "")
  195. if self.brand not in company:
  196. continue
  197. try:
  198. product = self.parse_product(content)
  199. if not product.get("item_id"):
  200. continue
  201. self.pipeline.storge_data(product)
  202. logger.info(f"入库成功: {json.dumps(product, ensure_ascii=False)}")
  203. except Exception as e:
  204. logger.exception("入库失败: %s", e)
  205. def run(self):
  206. self.login_yaoyigou()
  207. if not self.hesytoken:
  208. logger.error(f"请检查账号是否出现问题")
  209. self.is_success = False
  210. return self.pipeline.crawl_count, self.is_success
  211. self.headers["hesytoken"] = self.hesytoken
  212. try:
  213. self.search_data()
  214. except Exception as e:
  215. logger.error(f"爬取搜索失败{str(e)}")
  216. self.is_success = False
  217. return self.pipeline.crawl_count, self.is_success