drug_pipelines.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import json
  2. import time
  3. from itertools import product
  4. from pathlib import Path
  5. from area_info.city_name_to_id_2 import get_city
  6. from commons.conn_mysql import MySQLPool39, MySQLPoolOnline
  7. from commons.sql_data import RETRIEVE_SCRAPE_INSERT_COLUMNS, sql_map
  8. from commons.Logger import get_spider_logger
  9. from pipelines.shop_pipelines import ShopPipeline
  10. class DrugPipeline:
  11. def __init__(self, spider_name: str):
  12. self.db_online = MySQLPool39()
  13. # self.db_online = MySQLPoolOnline()
  14. self.sql_map = sql_map
  15. self.crawl_count = 0
  16. self.spider_name = spider_name
  17. self.logger = get_spider_logger(spider_name)
  18. self.shop_pipeline = ShopPipeline(spider_name)
  19. @staticmethod
  20. def _db_int(val):
  21. if val is None or val == "":
  22. return 0
  23. try:
  24. return int(val)
  25. except (TypeError, ValueError):
  26. return 0
  27. @staticmethod
  28. def _db_decimal(val):
  29. if val is None or val is False:
  30. return 0.0
  31. if isinstance(val, (int, float)) and not isinstance(val, bool):
  32. return float(val)
  33. s = str(val).strip()
  34. if not s:
  35. return 0.0
  36. try:
  37. return float(s)
  38. except ValueError:
  39. return 0.0
  40. def get_shop_city(self, product):
  41. # 补齐店铺等信息
  42. sql_data = """ SELECT *
  43. FROM `retrieve_scrape_shop_info`
  44. WHERE `platform` = %s
  45. AND `city` IS NOT NULL
  46. AND `city` != '' and `city` !='未知' AND `shop` = %s LIMIT 1"""
  47. data = self.db_online.select_data(sql_data, (str(product["platform"]), product.get("shop_name", "")))
  48. if data:
  49. shop_row = data[0]
  50. if not product.get("city_id"):
  51. product["city_name"] = shop_row.get("city", "") or product.get("city_name", "")
  52. city_id, province_id, city, province = get_city(shop_row["city"])
  53. product["city_id"] = city_id
  54. product["province_id"] = province_id
  55. product["city_name"] = city
  56. product["province_name"] = province
  57. product["area_info"] = shop_row.get("contact_address", product.get("area_info", ""))
  58. product["company_name"] = shop_row.get("business_license_company", product.get("company_name", ""))
  59. return product
  60. def storge_data(self, product):
  61. product = self.get_shop_city(product)
  62. row = {
  63. "platform_id": product["platform"],
  64. "platform_item_id": str(product["item_id"]),
  65. "enterprise_id": self._db_int(product["enterprise_id"]),
  66. "product_name": product["product_name"] or "",
  67. "product_specs": product["spec"] or "",
  68. "product_brand": product.get("product_brand") or "",
  69. "one_box_price": self._db_decimal(product.get("one_price")),
  70. "link_url": product["detail_url"] or "",
  71. "store_name": product["shop_name"] or "",
  72. "store_url": product["shop_url"] or "",
  73. "shipment_province_id": self._db_int(product.get("shipment_province_id")),
  74. "shipment_province_name": product.get("shipment_province_name") or "",
  75. "shipment_city_id": self._db_int(product.get("shipment_city_id")),
  76. "shipment_city_name": product.get("shipment_city_name") or "",
  77. "manufacturer": product.get("factory_name") or "",
  78. "company_name": product.get("company_name") or "",
  79. "scrape_date": product["scrape_date"],
  80. "is_sold_out": self._db_int(product.get("is_sold_out")),
  81. "min_price": self._db_decimal(product.get("price")),
  82. "sales": product["sales"] or "",
  83. "inventory": str(product["stock_count"]) if product["stock_count"] not in (None, "") else "",
  84. "snapshot_url": product["snapshot_url"] or "",
  85. "approval_number": product["approval_num"] or "",
  86. "expiry_date": product["deadline"] or "",
  87. "update_time": time.strftime("%Y-%m-%d %H:%M:%S"),
  88. "insert_time": time.strftime("%Y-%m-%d %H:%M:%S"),
  89. "number": self._db_int(product.get("number", 1)),
  90. "task_id": self._db_int(product.get("collect_task_id")),
  91. "anonymous_store_name": product.get("anonymous_store_name") or "",
  92. "search_name": product.get("search_name") or "",
  93. "collect_config_info": product.get("collect_config_info") or "",
  94. "area_info": product.get("area_info") or "",
  95. "city_name": product.get("city_name") or "",
  96. "city_id": self._db_int(product.get("city_id")),
  97. "province_name": product.get("province_name") or "",
  98. "province_id": self._db_int(product.get("province_id")),
  99. "collect_equipment_account_id": self._db_int(product.get("account_id")),
  100. "collect_region_id": self._db_int(product.get("collect_region_id")),
  101. "collect_round": self._db_int(product.get("collect_round")),
  102. }
  103. tpl = tuple(row[k] for k in RETRIEVE_SCRAPE_INSERT_COLUMNS)
  104. params = tpl + (
  105. row["platform_id"],
  106. row["scrape_date"],
  107. row["platform_item_id"],
  108. row["collect_equipment_account_id"],
  109. row["collect_round"],
  110. )
  111. sql = self.sql_map["retrieve_scrape_insert_if_absent_sql"]
  112. affected_rows = self.db_online.execute(sql, params)
  113. # #endregion
  114. if affected_rows > 0:
  115. self.crawl_count += 1
  116. self.logger.info(
  117. "pipeline入库成功 spider=%s item_id=%s total=%s",
  118. self.spider_name,
  119. product.get("item_id"),
  120. self.crawl_count,
  121. )
  122. else:
  123. self.logger.info(
  124. "pipeline跳过入库(已存在或失败) item_id=%s",
  125. product.get("item_id"),
  126. )
  127. # # 存入店铺表
  128. # shop_data = {
  129. # "shop": product["shop_name"] or "",
  130. # "shop_url": product["shop_url"] or "",
  131. # "city": product.get("city_name") or "",
  132. # "qualification_number": "",
  133. # "business_license_company": product.get("company_name") or "",
  134. # "province": product.get("province_name") or "",
  135. # "scrape_date": time.strftime("%Y-%m-%d %H:%M:%S"),
  136. # "business_license_address": "",
  137. # "create_time": time.strftime("%Y-%m-%d %H:%M:%S"),
  138. # "update_time": time.strftime("%Y-%m-%d %H:%M:%S"),
  139. # "platform": product["platform"]
  140. # }
  141. # try:
  142. # self.shop_pipeline.storge_data(shop_data)
  143. # except Exception as e:
  144. # pass
  145. return affected_rows
  146. if __name__ == '__main__':
  147. pass