import json import time from itertools import product from pathlib import Path from area_info.city_name_to_id_2 import get_city from commons.conn_mysql import MySQLPool39, MySQLPoolOnline from commons.sql_data import RETRIEVE_SCRAPE_INSERT_COLUMNS, sql_map from commons.Logger import get_spider_logger from pipelines.shop_pipelines import ShopPipeline class DrugPipeline: def __init__(self, spider_name: str): self.db_online = MySQLPool39() # self.db_online = MySQLPoolOnline() self.sql_map = sql_map self.crawl_count = 0 self.spider_name = spider_name self.logger = get_spider_logger(spider_name) self.shop_pipeline = ShopPipeline(spider_name) @staticmethod def _db_int(val): if val is None or val == "": return 0 try: return int(val) except (TypeError, ValueError): return 0 @staticmethod def _db_decimal(val): if val is None or val is False: return 0.0 if isinstance(val, (int, float)) and not isinstance(val, bool): return float(val) s = str(val).strip() if not s: return 0.0 try: return float(s) except ValueError: return 0.0 def get_shop_city(self, product): # 补齐店铺等信息 sql_data = """ SELECT * FROM `retrieve_scrape_shop_info` WHERE `platform` = %s AND `city` IS NOT NULL AND `city` != '' and `city` !='未知' AND `shop` = %s LIMIT 1""" data = self.db_online.select_data(sql_data, (str(product["platform"]), product.get("shop_name", ""))) if data: shop_row = data[0] if not product.get("city_id"): product["city_name"] = shop_row.get("city", "") or product.get("city_name", "") city_id, province_id, city, province = get_city(shop_row["city"]) product["city_id"] = city_id product["province_id"] = province_id product["city_name"] = city product["province_name"] = province product["area_info"] = shop_row.get("contact_address", product.get("area_info", "")) product["company_name"] = shop_row.get("business_license_company", product.get("company_name", "")) return product def storge_data(self, product): product = self.get_shop_city(product) row = { "platform_id": product["platform"], "platform_item_id": str(product["item_id"]), "enterprise_id": self._db_int(product["enterprise_id"]), "product_name": product["product_name"] or "", "product_specs": product["spec"] or "", "product_brand": product.get("product_brand") or "", "one_box_price": self._db_decimal(product.get("one_price")), "link_url": product["detail_url"] or "", "store_name": product["shop_name"] or "", "store_url": product["shop_url"] or "", "shipment_province_id": self._db_int(product.get("shipment_province_id")), "shipment_province_name": product.get("shipment_province_name") or "", "shipment_city_id": self._db_int(product.get("shipment_city_id")), "shipment_city_name": product.get("shipment_city_name") or "", "manufacturer": product.get("factory_name") or "", "company_name": product.get("company_name") or "", "scrape_date": product["scrape_date"], "is_sold_out": self._db_int(product.get("is_sold_out")), "min_price": self._db_decimal(product.get("price")), "sales": product["sales"] or "", "inventory": str(product["stock_count"]) if product["stock_count"] not in (None, "") else "", "snapshot_url": product["snapshot_url"] or "", "approval_number": product["approval_num"] or "", "expiry_date": product["deadline"] or "", "update_time": time.strftime("%Y-%m-%d %H:%M:%S"), "insert_time": time.strftime("%Y-%m-%d %H:%M:%S"), "number": self._db_int(product.get("number", 1)), "task_id": self._db_int(product.get("collect_task_id")), "anonymous_store_name": product.get("anonymous_store_name") or "", "search_name": product.get("search_name") or "", "collect_config_info": product.get("collect_config_info") or "", "area_info": product.get("area_info") or "", "city_name": product.get("city_name") or "", "city_id": self._db_int(product.get("city_id")), "province_name": product.get("province_name") or "", "province_id": self._db_int(product.get("province_id")), "collect_equipment_account_id": self._db_int(product.get("account_id")), "collect_region_id": self._db_int(product.get("collect_region_id")), "collect_round": self._db_int(product.get("collect_round")), } tpl = tuple(row[k] for k in RETRIEVE_SCRAPE_INSERT_COLUMNS) params = tpl + ( row["platform_id"], row["scrape_date"], row["platform_item_id"], row["collect_equipment_account_id"], row["collect_round"], ) sql = self.sql_map["retrieve_scrape_insert_if_absent_sql"] affected_rows = self.db_online.execute(sql, params) # #endregion if affected_rows > 0: self.crawl_count += 1 self.logger.info( "pipeline入库成功 spider=%s item_id=%s total=%s", self.spider_name, product.get("item_id"), self.crawl_count, ) else: self.logger.info( "pipeline跳过入库(已存在或失败) item_id=%s", product.get("item_id"), ) # # 存入店铺表 # shop_data = { # "shop": product["shop_name"] or "", # "shop_url": product["shop_url"] or "", # "city": product.get("city_name") or "", # "qualification_number": "", # "business_license_company": product.get("company_name") or "", # "province": product.get("province_name") or "", # "scrape_date": time.strftime("%Y-%m-%d %H:%M:%S"), # "business_license_address": "", # "create_time": time.strftime("%Y-%m-%d %H:%M:%S"), # "update_time": time.strftime("%Y-%m-%d %H:%M:%S"), # "platform": product["platform"] # } # try: # self.shop_pipeline.storge_data(shop_data) # except Exception as e: # pass return affected_rows if __name__ == '__main__': pass