| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- import argparse
- import os
- from collections import Counter, defaultdict
- from pathlib import Path
- from typing import Dict, Optional, Tuple
- import pymysql
- from dotenv import load_dotenv
- """
- 运行命令
- # 先预演(不写库)
- python pdd1/process_shop/backfill_retrieve_scrape_data_pc.py --dry-run
- # 正式写库
- python pdd1/process_shop/backfill_retrieve_scrape_data_pc.py
- """
- BASE_DIR = Path(__file__).resolve().parent
- ENV_PATH = BASE_DIR / ".env.process"
- PLATFORM_ID = "3"
- SOURCE_TABLE = "pdd_shop_info_middle"
- TARGET_TABLE = "retrieve_scrape_data"
- def normalize_text(value) -> str:
- if value is None:
- return ""
- return str(value).strip()
- def load_db_config() -> Dict[str, object]:
- if not ENV_PATH.is_file():
- raise FileNotFoundError(f"找不到配置文件: {ENV_PATH}")
- load_dotenv(ENV_PATH)
- db_name = normalize_text(os.getenv("DB_DATABASE"))
- if not db_name:
- raise ValueError("DB_DATABASE 不能为空")
- return {
- "host": normalize_text(os.getenv("DB_HOST")) or "localhost",
- "port": int(normalize_text(os.getenv("DB_PORT")) or "3306"),
- "user": normalize_text(os.getenv("DB_USERNAME")) or "root",
- "password": os.getenv("DB_PASSWORD", ""),
- "db_name": db_name,
- }
- def connect_db(config: Dict[str, object]) -> pymysql.connections.Connection:
- return pymysql.connect(
- host=config["host"],
- port=int(config["port"]),
- user=config["user"],
- password=config["password"],
- database=config["db_name"],
- charset="utf8mb4",
- cursorclass=pymysql.cursors.DictCursor,
- autocommit=False,
- connect_timeout=10,
- )
- def build_source_shop_map(
- conn: pymysql.connections.Connection,
- ) -> Tuple[Dict[str, Tuple[str, str]], Counter]:
- """
- 当前策略(按你的要求):
- 仅按 shop 匹配省市,不使用 store_url。
- 说明:
- - pdd_shop_info_middle 的 shop 字段有唯一索引,适合直接映射。
- - 后续如果你要切回“shop+store_url”精确匹配,可以恢复下方注释中的逻辑。
- """
- sql = f"""
- SELECT
- shop,
- store_url,
- province,
- city
- FROM `{SOURCE_TABLE}`
- WHERE COALESCE(TRIM(shop),'') <> ''
- AND COALESCE(TRIM(province),'') <> ''
- AND COALESCE(TRIM(city),'') <> ''
- """
- with conn.cursor() as cur:
- cur.execute(sql)
- rows = cur.fetchall()
- # exact_candidates 用于将来恢复“shop+store_url”精确匹配
- # 先保留,不删除
- exact_candidates = defaultdict(set)
- shop_map: Dict[str, Tuple[str, str]] = {}
- stats = Counter()
- for row in rows:
- shop = normalize_text(row.get("shop"))
- store_url = normalize_text(row.get("store_url"))
- province = normalize_text(row.get("province"))
- city = normalize_text(row.get("city"))
- if not shop or not province or not city:
- continue
- # 当前启用:按 shop 建立唯一映射
- shop_map[shop] = (province, city)
- # 仅保留给未来的“shop+store_url”精确匹配(暂不启用)
- if store_url:
- exact_candidates[(shop, store_url)].add((province, city))
- # 仅统计保留,不参与当前匹配逻辑
- for _, pcs in exact_candidates.items():
- if len(pcs) == 1:
- stats["exact_unique_reserved"] += 1
- else:
- stats["exact_ambiguous_reserved"] += 1
- stats["source_rows"] = len(rows)
- stats["shop_map_rows"] = len(shop_map)
- return shop_map, stats
- def backfill(
- conn: pymysql.connections.Connection,
- dry_run: bool,
- limit: Optional[int] = None,
- ) -> Counter:
- shop_map, source_stats = build_source_shop_map(conn)
- stats = Counter()
- stats.update(source_stats)
- where_limit = f" LIMIT {int(limit)}" if limit and limit > 0 else ""
- select_sql = f"""
- SELECT
- id,
- store_name,
- link_url,
- province_name,
- city_name
- FROM `{TARGET_TABLE}`
- WHERE platform_id = %s
- AND (
- COALESCE(TRIM(province_name),'') = ''
- OR COALESCE(TRIM(city_name),'') = ''
- )
- ORDER BY id ASC
- {where_limit}
- """
- with conn.cursor() as cur:
- cur.execute(select_sql, (PLATFORM_ID,))
- targets = cur.fetchall()
- stats["target_rows"] = len(targets)
- if not targets:
- return stats
- update_sql = f"""
- UPDATE `{TARGET_TABLE}`
- SET province_name = %s,
- city_name = %s,
- update_time = NOW()
- WHERE id = %s
- """
- with conn.cursor() as cur:
- for row in targets:
- row_id = row["id"]
- shop = normalize_text(row.get("store_name"))
- # 先保留 link 字段读取,方便后续恢复“shop+link”精确匹配
- # 当前按你的要求,不使用 link 参与判断
- link = normalize_text(row.get("link_url"))
- old_province = normalize_text(row.get("province_name"))
- old_city = normalize_text(row.get("city_name"))
- # 当前启用:仅按店铺名匹配
- if not shop or shop not in shop_map:
- stats["skipped_no_match"] += 1
- continue
- match_pc: Tuple[str, str] = shop_map[shop]
- # ------- 预留:将来恢复“店铺+链接”精确匹配 -------
- # if shop and link and (shop, link) in exact_map:
- # match_pc = exact_map[(shop, link)]
- # stats["matched_exact"] += 1
- # elif shop in shop_map:
- # match_pc = shop_map[shop]
- # stats["matched_by_shop"] += 1
- # else:
- # stats["skipped_no_match"] += 1
- # continue
- # ---------------------------------------------
- stats["matched_by_shop"] += 1
- province, city = match_pc
- new_province = old_province or province
- new_city = old_city or city
- # 只补空值:如果两列都不需要补,跳过
- if new_province == old_province and new_city == old_city:
- stats["skipped_already_filled"] += 1
- continue
- stats["to_update"] += 1
- if not dry_run:
- cur.execute(update_sql, (new_province, new_city, row_id))
- stats["updated"] += 1
- if dry_run:
- conn.rollback()
- else:
- conn.commit()
- return stats
- def print_summary(stats: Counter, dry_run: bool) -> None:
- mode = "DRY-RUN(不写库)" if dry_run else "WRITE(写库)"
- print(f"运行模式: {mode}")
- print("匹配策略: 仅按店铺名(store_name == shop)匹配")
- print(f"来源行数(source_rows): {stats.get('source_rows', 0)}")
- print(f"来源店铺映射(shop_map_rows): {stats.get('shop_map_rows', 0)}")
- print(f"预留统计-链接唯一(exact_unique_reserved): {stats.get('exact_unique_reserved', 0)}")
- print(f"预留统计-链接冲突(exact_ambiguous_reserved): {stats.get('exact_ambiguous_reserved', 0)}")
- print(f"目标行数(target_rows): {stats.get('target_rows', 0)}")
- print(f"店铺命中(matched_by_shop): {stats.get('matched_by_shop', 0)}")
- print(f"待更新(to_update): {stats.get('to_update', 0)}")
- print(f"已更新(updated): {stats.get('updated', 0)}")
- print(f"跳过-未匹配(skipped_no_match): {stats.get('skipped_no_match', 0)}")
- print(f"跳过-已完整(skipped_already_filled): {stats.get('skipped_already_filled', 0)}")
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="回填 retrieve_scrape_data 的发货省市(platform_id=3)"
- )
- parser.add_argument(
- "--dry-run",
- action="store_true",
- help="仅预演,不写库",
- )
- parser.add_argument(
- "--limit",
- type=int,
- default=None,
- help="仅处理前 N 条目标数据(调试用)",
- )
- return parser
- def main() -> int:
- args = build_parser().parse_args()
- config = load_db_config()
- conn = connect_db(config)
- try:
- stats = backfill(
- conn=conn,
- dry_run=bool(args.dry_run),
- limit=args.limit,
- )
- finally:
- conn.close()
- print_summary(stats, dry_run=bool(args.dry_run))
- return 0
- if __name__ == "__main__":
- try:
- raise SystemExit(main())
- except KeyboardInterrupt:
- print("用户中断执行")
- raise SystemExit(130)
- except Exception as exc:
- print(f"执行失败: {exc}")
- raise SystemExit(1)
|