drug_pipelines.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import json
  2. import time
  3. from itertools import product
  4. from pathlib import Path
  5. from commons.conn_mysql import MySQLPool39, MySQLPoolOnline
  6. from commons.sql_data import RETRIEVE_SCRAPE_INSERT_COLUMNS, sql_map
  7. from commons.Logger import get_spider_logger
  8. from pipelines.shop_pipelines import ShopPipeline
  9. class DrugPipeline:
  10. def __init__(self, spider_name: str):
  11. self.db_online = MySQLPool39()
  12. # self.db_online = MySQLPoolOnline()
  13. self.sql_map = sql_map
  14. self.crawl_count = 0
  15. self.spider_name = spider_name
  16. self.logger = get_spider_logger(spider_name)
  17. self.shop_pipeline = ShopPipeline(spider_name)
  18. @staticmethod
  19. def _db_int(val):
  20. if val is None or val == "":
  21. return 0
  22. try:
  23. return int(val)
  24. except (TypeError, ValueError):
  25. return 0
  26. @staticmethod
  27. def _db_decimal(val):
  28. if val is None or val is False:
  29. return 0.0
  30. if isinstance(val, (int, float)) and not isinstance(val, bool):
  31. return float(val)
  32. s = str(val).strip()
  33. if not s:
  34. return 0.0
  35. try:
  36. return float(s)
  37. except ValueError:
  38. return 0.0
  39. def get_shop_city(self, product):
  40. # 补齐店铺等信息
  41. sql_data = """ SELECT *
  42. FROM `retrieve_scrape_shop_info`
  43. WHERE `platform` = %s
  44. AND `city` IS NOT NULL
  45. AND `city` != '' and `city` !='未知' AND `shop` = %s LIMIT 1"""
  46. data = self.db_online.select_data(sql_data, (str(product["platform"]), product.get("shop_name", "")))
  47. if data:
  48. shop_row = data[0]
  49. if not product.get("city_id") and not product.get("shipment_city_id"):
  50. product["city_name"] = shop_row.get("city", "") or product.get("city_name", "")
  51. if not product.get("province_id") and not product.get("shipment_province_id"):
  52. product["province_name"] = shop_row.get("province", "") or product.get("province_name", "")
  53. product["area_info"] = shop_row.get("contact_address", product.get("area_info", ""))
  54. product["company_name"] = shop_row.get("business_license_company", product.get("company_name", ""))
  55. return product
  56. def storge_data(self, product):
  57. product = self.get_shop_city(product)
  58. row = {
  59. "platform_id": product["platform"],
  60. "platform_item_id": str(product["item_id"]),
  61. "enterprise_id": self._db_int(product["enterprise_id"]),
  62. "product_name": product["product_name"] or "",
  63. "product_specs": product["spec"] or "",
  64. "product_brand": product.get("product_brand") or "",
  65. "one_box_price": self._db_decimal(product.get("one_price")),
  66. "link_url": product["detail_url"] or "",
  67. "store_name": product["shop_name"] or "",
  68. "store_url": product["shop_url"] or "",
  69. "shipment_province_id": self._db_int(product.get("shipment_province_id")),
  70. "shipment_province_name": product.get("shipment_province_name") or "",
  71. "shipment_city_id": self._db_int(product.get("shipment_city_id")),
  72. "shipment_city_name": product.get("shipment_city_name") or "",
  73. "manufacturer": product.get("factory_name") or "",
  74. "company_name": product.get("company_name") or "",
  75. "scrape_date": product["scrape_date"],
  76. "is_sold_out": self._db_int(product.get("is_sold_out")),
  77. "min_price": self._db_decimal(product.get("price")),
  78. "sales": product["sales"] or "",
  79. "inventory": str(product["stock_count"]) if product["stock_count"] not in (None, "") else "",
  80. "snapshot_url": product["snapshot_url"] or "",
  81. "approval_number": product["approval_num"] or "",
  82. "expiry_date": product["deadline"] or "",
  83. "update_time": product["update_time"],
  84. "insert_time": product.get("insert_time") or product["update_time"],
  85. "number": self._db_int(product.get("number", 1)),
  86. "task_id": self._db_int(product.get("collect_task_id")),
  87. "anonymous_store_name": product.get("anonymous_store_name") or "",
  88. "search_name": product.get("search_name") or "",
  89. "collect_config_info": product.get("collect_config_info") or "",
  90. "area_info": product.get("area_info") or "",
  91. "city_name": product.get("city_name") or "",
  92. "city_id": self._db_int(product.get("city_id")),
  93. "province_name": product.get("province_name") or "",
  94. "province_id": self._db_int(product.get("province_id")),
  95. "collect_equipment_account_id": self._db_int(product.get("account_id")),
  96. "collect_region_id": self._db_int(product.get("collect_region_id")),
  97. "collect_round": self._db_int(product.get("collect_round")),
  98. }
  99. tpl = tuple(row[k] for k in RETRIEVE_SCRAPE_INSERT_COLUMNS)
  100. params = tpl + (
  101. row["platform_id"],
  102. row["scrape_date"],
  103. row["platform_item_id"],
  104. row["collect_equipment_account_id"],
  105. row["collect_round"],
  106. )
  107. sql = self.sql_map["retrieve_scrape_insert_if_absent_sql"]
  108. affected_rows = self.db_online.execute(sql, params)
  109. # #endregion
  110. if affected_rows > 0:
  111. self.crawl_count += 1
  112. self.logger.info(
  113. "pipeline入库成功 spider=%s item_id=%s total=%s",
  114. self.spider_name,
  115. product.get("item_id"),
  116. self.crawl_count,
  117. )
  118. else:
  119. self.logger.info(
  120. "pipeline跳过入库(已存在或失败) item_id=%s",
  121. product.get("item_id"),
  122. )
  123. # # 存入店铺表
  124. # shop_data = {
  125. # "shop": product["shop_name"] or "",
  126. # "shop_url": product["shop_url"] or "",
  127. # "city": product.get("city_name") or "",
  128. # "qualification_number": "",
  129. # "business_license_company": product.get("company_name") or "",
  130. # "province": product.get("province_name") or "",
  131. # "scrape_date": time.strftime("%Y-%m-%d %H:%M:%S"),
  132. # "business_license_address": "",
  133. # "create_time": time.strftime("%Y-%m-%d %H:%M:%S"),
  134. # "update_time": time.strftime("%Y-%m-%d %H:%M:%S"),
  135. # "platform": product["platform"]
  136. # }
  137. # try:
  138. # self.shop_pipeline.storge_data(shop_data)
  139. # except Exception as e:
  140. # pass
  141. return affected_rows
  142. if __name__ == '__main__':
  143. pass