import argparse import os from collections import Counter, defaultdict from pathlib import Path from typing import Dict, Optional, Tuple import pymysql from dotenv import load_dotenv """ 运行命令 # 先预演(不写库) python pdd1/process_shop/backfill_retrieve_scrape_data_pc.py --dry-run # 正式写库 python pdd1/process_shop/backfill_retrieve_scrape_data_pc.py """ BASE_DIR = Path(__file__).resolve().parent ENV_PATH = BASE_DIR / ".env.process" PLATFORM_ID = "3" SOURCE_TABLE = "pdd_shop_info_middle" TARGET_TABLE = "retrieve_scrape_data" def normalize_text(value) -> str: if value is None: return "" return str(value).strip() def load_db_config() -> Dict[str, object]: if not ENV_PATH.is_file(): raise FileNotFoundError(f"找不到配置文件: {ENV_PATH}") load_dotenv(ENV_PATH) db_name = normalize_text(os.getenv("DB_DATABASE")) if not db_name: raise ValueError("DB_DATABASE 不能为空") return { "host": normalize_text(os.getenv("DB_HOST")) or "localhost", "port": int(normalize_text(os.getenv("DB_PORT")) or "3306"), "user": normalize_text(os.getenv("DB_USERNAME")) or "root", "password": os.getenv("DB_PASSWORD", ""), "db_name": db_name, } def connect_db(config: Dict[str, object]) -> pymysql.connections.Connection: return pymysql.connect( host=config["host"], port=int(config["port"]), user=config["user"], password=config["password"], database=config["db_name"], charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, autocommit=False, connect_timeout=10, ) def build_source_shop_map( conn: pymysql.connections.Connection, ) -> Tuple[Dict[str, Tuple[str, str]], Counter]: """ 当前策略(按你的要求): 仅按 shop 匹配省市,不使用 store_url。 说明: - pdd_shop_info_middle 的 shop 字段有唯一索引,适合直接映射。 - 后续如果你要切回“shop+store_url”精确匹配,可以恢复下方注释中的逻辑。 """ sql = f""" SELECT shop, store_url, province, city FROM `{SOURCE_TABLE}` WHERE COALESCE(TRIM(shop),'') <> '' AND COALESCE(TRIM(province),'') <> '' AND COALESCE(TRIM(city),'') <> '' """ with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() # exact_candidates 用于将来恢复“shop+store_url”精确匹配 # 先保留,不删除 exact_candidates = defaultdict(set) shop_map: Dict[str, Tuple[str, str]] = {} stats = Counter() for row in rows: shop = normalize_text(row.get("shop")) store_url = normalize_text(row.get("store_url")) province = normalize_text(row.get("province")) city = normalize_text(row.get("city")) if not shop or not province or not city: continue # 当前启用:按 shop 建立唯一映射 shop_map[shop] = (province, city) # 仅保留给未来的“shop+store_url”精确匹配(暂不启用) if store_url: exact_candidates[(shop, store_url)].add((province, city)) # 仅统计保留,不参与当前匹配逻辑 for _, pcs in exact_candidates.items(): if len(pcs) == 1: stats["exact_unique_reserved"] += 1 else: stats["exact_ambiguous_reserved"] += 1 stats["source_rows"] = len(rows) stats["shop_map_rows"] = len(shop_map) return shop_map, stats def backfill( conn: pymysql.connections.Connection, dry_run: bool, limit: Optional[int] = None, ) -> Counter: shop_map, source_stats = build_source_shop_map(conn) stats = Counter() stats.update(source_stats) where_limit = f" LIMIT {int(limit)}" if limit and limit > 0 else "" select_sql = f""" SELECT id, store_name, link_url, province_name, city_name FROM `{TARGET_TABLE}` WHERE platform_id = %s AND ( COALESCE(TRIM(province_name),'') = '' OR COALESCE(TRIM(city_name),'') = '' ) ORDER BY id ASC {where_limit} """ with conn.cursor() as cur: cur.execute(select_sql, (PLATFORM_ID,)) targets = cur.fetchall() stats["target_rows"] = len(targets) if not targets: return stats update_sql = f""" UPDATE `{TARGET_TABLE}` SET province_name = %s, city_name = %s, update_time = NOW() WHERE id = %s """ with conn.cursor() as cur: for row in targets: row_id = row["id"] shop = normalize_text(row.get("store_name")) # 先保留 link 字段读取,方便后续恢复“shop+link”精确匹配 # 当前按你的要求,不使用 link 参与判断 link = normalize_text(row.get("link_url")) old_province = normalize_text(row.get("province_name")) old_city = normalize_text(row.get("city_name")) # 当前启用:仅按店铺名匹配 if not shop or shop not in shop_map: stats["skipped_no_match"] += 1 continue match_pc: Tuple[str, str] = shop_map[shop] # ------- 预留:将来恢复“店铺+链接”精确匹配 ------- # if shop and link and (shop, link) in exact_map: # match_pc = exact_map[(shop, link)] # stats["matched_exact"] += 1 # elif shop in shop_map: # match_pc = shop_map[shop] # stats["matched_by_shop"] += 1 # else: # stats["skipped_no_match"] += 1 # continue # --------------------------------------------- stats["matched_by_shop"] += 1 province, city = match_pc new_province = old_province or province new_city = old_city or city # 只补空值:如果两列都不需要补,跳过 if new_province == old_province and new_city == old_city: stats["skipped_already_filled"] += 1 continue stats["to_update"] += 1 if not dry_run: cur.execute(update_sql, (new_province, new_city, row_id)) stats["updated"] += 1 if dry_run: conn.rollback() else: conn.commit() return stats def print_summary(stats: Counter, dry_run: bool) -> None: mode = "DRY-RUN(不写库)" if dry_run else "WRITE(写库)" print(f"运行模式: {mode}") print("匹配策略: 仅按店铺名(store_name == shop)匹配") print(f"来源行数(source_rows): {stats.get('source_rows', 0)}") print(f"来源店铺映射(shop_map_rows): {stats.get('shop_map_rows', 0)}") print(f"预留统计-链接唯一(exact_unique_reserved): {stats.get('exact_unique_reserved', 0)}") print(f"预留统计-链接冲突(exact_ambiguous_reserved): {stats.get('exact_ambiguous_reserved', 0)}") print(f"目标行数(target_rows): {stats.get('target_rows', 0)}") print(f"店铺命中(matched_by_shop): {stats.get('matched_by_shop', 0)}") print(f"待更新(to_update): {stats.get('to_update', 0)}") print(f"已更新(updated): {stats.get('updated', 0)}") print(f"跳过-未匹配(skipped_no_match): {stats.get('skipped_no_match', 0)}") print(f"跳过-已完整(skipped_already_filled): {stats.get('skipped_already_filled', 0)}") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="回填 retrieve_scrape_data 的发货省市(platform_id=3)" ) parser.add_argument( "--dry-run", action="store_true", help="仅预演,不写库", ) parser.add_argument( "--limit", type=int, default=None, help="仅处理前 N 条目标数据(调试用)", ) return parser def main() -> int: args = build_parser().parse_args() config = load_db_config() conn = connect_db(config) try: stats = backfill( conn=conn, dry_run=bool(args.dry_run), limit=args.limit, ) finally: conn.close() print_summary(stats, dry_run=bool(args.dry_run)) return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("用户中断执行") raise SystemExit(130) except Exception as exc: print(f"执行失败: {exc}") raise SystemExit(1)