from commons.conn_mysql import MySQLPool39,MySQLPoolOnline from commons.sql_data import sql_map from commons.Logger import get_spider_logger class ShopPipeline: def __init__(self, spider_name): self.db_online = MySQLPoolOnline() self.sql_map = sql_map self.crawl_count = 0 self.spider_name = spider_name self.logger = get_spider_logger(spider_name) @staticmethod def _db_int(val): if val is None or val == "": return 0 try: return int(val) except (TypeError, ValueError): return 0 @staticmethod def _db_decimal(val): if val is None or val is False: return 0.0 if isinstance(val, (int, float)) and not isinstance(val, bool): return float(val) s = str(val).strip() if not s: return 0.0 try: return float(s) except ValueError: return 0.0 def storge_data(self, product): # 数据入库:表字段与 product key 一致,按 key 动态构造 SQL if not isinstance(product, dict) or not product: self.logger.warning("pipeline入库失败: product 为空或格式错误") return 0 table_name = "retrieve_scrape_shop_info" columns = list(product.keys()) values = tuple(product[col] for col in columns) column_sql = ", ".join(f"`{col}`" for col in columns) placeholder_sql = ", ".join(["%s"] * len(columns)) # idx_platform_shop 为 (platform, shop) 唯一键;重复则跳过,不更新 sql = ( f"INSERT IGNORE INTO `{table_name}` ({column_sql}) VALUES ({placeholder_sql})" ) affected_rows = self.db_online.execute(sql, values) if affected_rows > 0: self.crawl_count += 1 self.logger.info( "shop pipeline入库成功 spider=%s total=%s shop=%s", self.spider_name, self.crawl_count, product.get("shop"), ) else: self.logger.info( "shop pipeline跳过入库(店铺已存在) spider=%s shop=%s", self.spider_name, product.get("shop"), ) return affected_rows if __name__ == '__main__': pass