| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- from commons.conn_mysql import MySQLPool39,MySQLPoolOnline
- from commons.sql_data import sql_map
- from commons.Logger import get_spider_logger
- class ShopPipeline:
- def __init__(self, spider_name):
- self.db_online = MySQLPoolOnline()
- self.sql_map = sql_map
- self.crawl_count = 0
- self.spider_name = spider_name
- self.logger = get_spider_logger(spider_name)
- @staticmethod
- def _db_int(val):
- if val is None or val == "":
- return 0
- try:
- return int(val)
- except (TypeError, ValueError):
- return 0
- @staticmethod
- def _db_decimal(val):
- if val is None or val is False:
- return 0.0
- if isinstance(val, (int, float)) and not isinstance(val, bool):
- return float(val)
- s = str(val).strip()
- if not s:
- return 0.0
- try:
- return float(s)
- except ValueError:
- return 0.0
- def storge_data(self, product):
- # 数据入库:表字段与 product key 一致,按 key 动态构造 SQL
- if not isinstance(product, dict) or not product:
- self.logger.warning("pipeline入库失败: product 为空或格式错误")
- return 0
- table_name = "retrieve_scrape_shop_info"
- columns = list(product.keys())
- values = tuple(product[col] for col in columns)
- column_sql = ", ".join(f"`{col}`" for col in columns)
- placeholder_sql = ", ".join(["%s"] * len(columns))
- sql = f"INSERT INTO `{table_name}` ({column_sql}) VALUES ({placeholder_sql})"
- affected_rows = self.db_online.execute(sql, values)
- if affected_rows > 0:
- self.crawl_count += 1
- self.logger.info(
- "shop pipeline入库成功 spider=%s total=%s shop=%s",
- self.spider_name,
- self.crawl_count,
- product.get("shop"),
- )
- else:
- self.logger.warning(
- "shop pipeline入库失败 spider=%s shop=%s",
- self.spider_name,
- product.get("shop"),
- )
- return affected_rows
- if __name__ == '__main__':
- pass
|