Ver Fonte

修改pipeline逻辑

zhuoyuncheng há 1 semana atrás
pai
commit
a3bf2a06b9
2 ficheiros alterados com 16 adições e 9 exclusões
  1. 10 6
      pipelines/drug_pipelines.py
  2. 6 3
      pipelines/shop_pipelines.py

+ 10 - 6
pipelines/drug_pipelines.py

@@ -2,7 +2,7 @@ import json
 import time
 from itertools import product
 from pathlib import Path
-
+from area_info.city_name_to_id_2 import get_city
 from commons.conn_mysql import MySQLPool39, MySQLPoolOnline
 from commons.sql_data import RETRIEVE_SCRAPE_INSERT_COLUMNS, sql_map
 from commons.Logger import get_spider_logger
@@ -19,6 +19,7 @@ class DrugPipeline:
         self.logger = get_spider_logger(spider_name)
         self.shop_pipeline = ShopPipeline(spider_name)
 
+
     @staticmethod
     def _db_int(val):
         if val is None or val == "":
@@ -52,10 +53,13 @@ class DrugPipeline:
         data = self.db_online.select_data(sql_data, (str(product["platform"]), product.get("shop_name", "")))
         if data:
             shop_row = data[0]
-            if not product.get("city_id") and not product.get("shipment_city_id"):
+            if not product.get("city_id"):
                 product["city_name"] = shop_row.get("city", "") or product.get("city_name", "")
-            if not product.get("province_id") and not product.get("shipment_province_id"):
-                product["province_name"] = shop_row.get("province", "") or product.get("province_name", "")
+                city_id, province_id, city, province = get_city(shop_row["city"])
+                product["city_id"] = city_id
+                product["province_id"] = province_id
+                product["city_name"] = city
+                product["province_name"] = province
             product["area_info"] = shop_row.get("contact_address", product.get("area_info", ""))
             product["company_name"] = shop_row.get("business_license_company", product.get("company_name", ""))
         return product
@@ -87,8 +91,8 @@ class DrugPipeline:
             "snapshot_url": product["snapshot_url"] or "",
             "approval_number": product["approval_num"] or "",
             "expiry_date": product["deadline"] or "",
-            "update_time": product["update_time"],
-            "insert_time": product.get("insert_time") or product["update_time"],
+            "update_time": time.strftime("%Y-%m-%d %H:%M:%S"),
+            "insert_time": time.strftime("%Y-%m-%d %H:%M:%S"),
             "number": self._db_int(product.get("number", 1)),
             "task_id": self._db_int(product.get("collect_task_id")),
             "anonymous_store_name": product.get("anonymous_store_name") or "",

+ 6 - 3
pipelines/shop_pipelines.py

@@ -46,7 +46,10 @@ class ShopPipeline:
 
         column_sql = ", ".join(f"`{col}`" for col in columns)
         placeholder_sql = ", ".join(["%s"] * len(columns))
-        sql = f"INSERT INTO `{table_name}` ({column_sql}) VALUES ({placeholder_sql})"
+        # idx_platform_shop 为 (platform, shop) 唯一键;重复则跳过,不更新
+        sql = (
+            f"INSERT IGNORE INTO `{table_name}` ({column_sql}) VALUES ({placeholder_sql})"
+        )
 
         affected_rows = self.db_online.execute(sql, values)
         if affected_rows > 0:
@@ -58,8 +61,8 @@ class ShopPipeline:
                 product.get("shop"),
             )
         else:
-            self.logger.warning(
-                "shop pipeline入库失败 spider=%s shop=%s",
+            self.logger.info(
+                "shop pipeline跳过入库(店铺已存在) spider=%s shop=%s",
                 self.spider_name,
                 product.get("shop"),
             )