| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import json
- import time
- from itertools import product
- from pathlib import Path
- from area_info.city_name_to_id_2 import get_city
- from commons.conn_mysql import MySQLPool39, MySQLPoolOnline
- from commons.sql_data import RETRIEVE_SCRAPE_INSERT_COLUMNS, sql_map
- from commons.Logger import get_spider_logger
- from pipelines.shop_pipelines import ShopPipeline
- class DrugPipeline:
- def __init__(self, spider_name: str):
- self.db_online = MySQLPool39()
- # self.db_online = MySQLPoolOnline()
- self.sql_map = sql_map
- self.crawl_count = 0
- self.spider_name = spider_name
- self.logger = get_spider_logger(spider_name)
- self.shop_pipeline = ShopPipeline(spider_name)
- @staticmethod
- def _db_int(val):
- if val is None or val == "":
- return 0
- try:
- return int(val)
- except (TypeError, ValueError):
- return 0
- @staticmethod
- def _db_decimal(val):
- if val is None or val is False:
- return 0.0
- if isinstance(val, (int, float)) and not isinstance(val, bool):
- return float(val)
- s = str(val).strip()
- if not s:
- return 0.0
- try:
- return float(s)
- except ValueError:
- return 0.0
- def get_shop_city(self, product):
- # 补齐店铺等信息
- sql_data = """ SELECT *
- FROM `retrieve_scrape_shop_info`
- WHERE `platform` = %s
- AND `city` IS NOT NULL
- AND `city` != '' and `city` !='未知' AND `shop` = %s LIMIT 1"""
- data = self.db_online.select_data(sql_data, (str(product["platform"]), product.get("shop_name", "")))
- if data:
- shop_row = data[0]
- if not product.get("city_id"):
- product["city_name"] = shop_row.get("city", "") or product.get("city_name", "")
- city_id, province_id, city, province = get_city(shop_row["city"])
- product["city_id"] = city_id
- product["province_id"] = province_id
- product["city_name"] = city
- product["province_name"] = province
- product["area_info"] = shop_row.get("contact_address", product.get("area_info", ""))
- product["company_name"] = shop_row.get("business_license_company", product.get("company_name", ""))
- return product
- def storge_data(self, product):
- product = self.get_shop_city(product)
- row = {
- "platform_id": product["platform"],
- "platform_item_id": str(product["item_id"]),
- "enterprise_id": self._db_int(product["enterprise_id"]),
- "product_name": product["product_name"] or "",
- "product_specs": product["spec"] or "",
- "product_brand": product.get("product_brand") or "",
- "one_box_price": self._db_decimal(product.get("one_price")),
- "link_url": product["detail_url"] or "",
- "store_name": product["shop_name"] or "",
- "store_url": product["shop_url"] or "",
- "shipment_province_id": self._db_int(product.get("shipment_province_id")),
- "shipment_province_name": product.get("shipment_province_name") or "",
- "shipment_city_id": self._db_int(product.get("shipment_city_id")),
- "shipment_city_name": product.get("shipment_city_name") or "",
- "manufacturer": product.get("factory_name") or "",
- "company_name": product.get("company_name") or "",
- "scrape_date": product["scrape_date"],
- "is_sold_out": self._db_int(product.get("is_sold_out")),
- "min_price": self._db_decimal(product.get("price")),
- "sales": product["sales"] or "",
- "inventory": str(product["stock_count"]) if product["stock_count"] not in (None, "") else "",
- "snapshot_url": product["snapshot_url"] or "",
- "approval_number": product["approval_num"] or "",
- "expiry_date": product["deadline"] or "",
- "update_time": time.strftime("%Y-%m-%d %H:%M:%S"),
- "insert_time": time.strftime("%Y-%m-%d %H:%M:%S"),
- "number": self._db_int(product.get("number", 1)),
- "task_id": self._db_int(product.get("collect_task_id")),
- "anonymous_store_name": product.get("anonymous_store_name") or "",
- "search_name": product.get("search_name") or "",
- "collect_config_info": product.get("collect_config_info") or "",
- "area_info": product.get("area_info") or "",
- "city_name": product.get("city_name") or "",
- "city_id": self._db_int(product.get("city_id")),
- "province_name": product.get("province_name") or "",
- "province_id": self._db_int(product.get("province_id")),
- "collect_equipment_account_id": self._db_int(product.get("account_id")),
- "collect_region_id": self._db_int(product.get("collect_region_id")),
- "collect_round": self._db_int(product.get("collect_round")),
- }
- tpl = tuple(row[k] for k in RETRIEVE_SCRAPE_INSERT_COLUMNS)
- params = tpl + (
- row["platform_id"],
- row["scrape_date"],
- row["platform_item_id"],
- row["collect_equipment_account_id"],
- row["collect_round"],
- )
- sql = self.sql_map["retrieve_scrape_insert_if_absent_sql"]
- affected_rows = self.db_online.execute(sql, params)
- # #endregion
- if affected_rows > 0:
- self.crawl_count += 1
- self.logger.info(
- "pipeline入库成功 spider=%s item_id=%s total=%s",
- self.spider_name,
- product.get("item_id"),
- self.crawl_count,
- )
- else:
- self.logger.info(
- "pipeline跳过入库(已存在或失败) item_id=%s",
- product.get("item_id"),
- )
- # # 存入店铺表
- # shop_data = {
- # "shop": product["shop_name"] or "",
- # "shop_url": product["shop_url"] or "",
- # "city": product.get("city_name") or "",
- # "qualification_number": "",
- # "business_license_company": product.get("company_name") or "",
- # "province": product.get("province_name") or "",
- # "scrape_date": time.strftime("%Y-%m-%d %H:%M:%S"),
- # "business_license_address": "",
- # "create_time": time.strftime("%Y-%m-%d %H:%M:%S"),
- # "update_time": time.strftime("%Y-%m-%d %H:%M:%S"),
- # "platform": product["platform"]
- # }
- # try:
- # self.shop_pipeline.storge_data(shop_data)
- # except Exception as e:
- # pass
- return affected_rows
- if __name__ == '__main__':
- pass
|