backfill_retrieve_scrape_data_pc.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. import argparse
  2. import os
  3. from collections import Counter, defaultdict
  4. from pathlib import Path
  5. from typing import Dict, Optional, Tuple
  6. import pymysql
  7. from dotenv import load_dotenv
  8. """
  9. 运行命令
  10. # 先预演(不写库)
  11. python pdd1/process_shop/backfill_retrieve_scrape_data_pc.py --dry-run
  12. # 正式写库
  13. python pdd1/process_shop/backfill_retrieve_scrape_data_pc.py
  14. """
  15. BASE_DIR = Path(__file__).resolve().parent
  16. ENV_PATH = BASE_DIR / ".env.process"
  17. PLATFORM_ID = "3"
  18. SOURCE_TABLE = "pdd_shop_info_middle"
  19. TARGET_TABLE = "retrieve_scrape_data"
  20. def normalize_text(value) -> str:
  21. if value is None:
  22. return ""
  23. return str(value).strip()
  24. def load_db_config() -> Dict[str, object]:
  25. if not ENV_PATH.is_file():
  26. raise FileNotFoundError(f"找不到配置文件: {ENV_PATH}")
  27. load_dotenv(ENV_PATH)
  28. db_name = normalize_text(os.getenv("DB_DATABASE"))
  29. if not db_name:
  30. raise ValueError("DB_DATABASE 不能为空")
  31. return {
  32. "host": normalize_text(os.getenv("DB_HOST")) or "localhost",
  33. "port": int(normalize_text(os.getenv("DB_PORT")) or "3306"),
  34. "user": normalize_text(os.getenv("DB_USERNAME")) or "root",
  35. "password": os.getenv("DB_PASSWORD", ""),
  36. "db_name": db_name,
  37. }
  38. def connect_db(config: Dict[str, object]) -> pymysql.connections.Connection:
  39. return pymysql.connect(
  40. host=config["host"],
  41. port=int(config["port"]),
  42. user=config["user"],
  43. password=config["password"],
  44. database=config["db_name"],
  45. charset="utf8mb4",
  46. cursorclass=pymysql.cursors.DictCursor,
  47. autocommit=False,
  48. connect_timeout=10,
  49. )
  50. def build_source_shop_map(
  51. conn: pymysql.connections.Connection,
  52. ) -> Tuple[Dict[str, Tuple[str, str]], Counter]:
  53. """
  54. 当前策略(按你的要求):
  55. 仅按 shop 匹配省市,不使用 store_url。
  56. 说明:
  57. - pdd_shop_info_middle 的 shop 字段有唯一索引,适合直接映射。
  58. - 后续如果你要切回“shop+store_url”精确匹配,可以恢复下方注释中的逻辑。
  59. """
  60. sql = f"""
  61. SELECT
  62. shop,
  63. store_url,
  64. province,
  65. city
  66. FROM `{SOURCE_TABLE}`
  67. WHERE COALESCE(TRIM(shop),'') <> ''
  68. AND COALESCE(TRIM(province),'') <> ''
  69. AND COALESCE(TRIM(city),'') <> ''
  70. """
  71. with conn.cursor() as cur:
  72. cur.execute(sql)
  73. rows = cur.fetchall()
  74. # exact_candidates 用于将来恢复“shop+store_url”精确匹配
  75. # 先保留,不删除
  76. exact_candidates = defaultdict(set)
  77. shop_map: Dict[str, Tuple[str, str]] = {}
  78. stats = Counter()
  79. for row in rows:
  80. shop = normalize_text(row.get("shop"))
  81. store_url = normalize_text(row.get("store_url"))
  82. province = normalize_text(row.get("province"))
  83. city = normalize_text(row.get("city"))
  84. if not shop or not province or not city:
  85. continue
  86. # 当前启用:按 shop 建立唯一映射
  87. shop_map[shop] = (province, city)
  88. # 仅保留给未来的“shop+store_url”精确匹配(暂不启用)
  89. if store_url:
  90. exact_candidates[(shop, store_url)].add((province, city))
  91. # 仅统计保留,不参与当前匹配逻辑
  92. for _, pcs in exact_candidates.items():
  93. if len(pcs) == 1:
  94. stats["exact_unique_reserved"] += 1
  95. else:
  96. stats["exact_ambiguous_reserved"] += 1
  97. stats["source_rows"] = len(rows)
  98. stats["shop_map_rows"] = len(shop_map)
  99. return shop_map, stats
  100. def backfill(
  101. conn: pymysql.connections.Connection,
  102. dry_run: bool,
  103. limit: Optional[int] = None,
  104. ) -> Counter:
  105. shop_map, source_stats = build_source_shop_map(conn)
  106. stats = Counter()
  107. stats.update(source_stats)
  108. where_limit = f" LIMIT {int(limit)}" if limit and limit > 0 else ""
  109. select_sql = f"""
  110. SELECT
  111. id,
  112. store_name,
  113. link_url,
  114. province_name,
  115. city_name
  116. FROM `{TARGET_TABLE}`
  117. WHERE platform_id = %s
  118. AND (
  119. COALESCE(TRIM(province_name),'') = ''
  120. OR COALESCE(TRIM(city_name),'') = ''
  121. )
  122. ORDER BY id ASC
  123. {where_limit}
  124. """
  125. with conn.cursor() as cur:
  126. cur.execute(select_sql, (PLATFORM_ID,))
  127. targets = cur.fetchall()
  128. stats["target_rows"] = len(targets)
  129. if not targets:
  130. return stats
  131. update_sql = f"""
  132. UPDATE `{TARGET_TABLE}`
  133. SET province_name = %s,
  134. city_name = %s,
  135. update_time = NOW()
  136. WHERE id = %s
  137. """
  138. with conn.cursor() as cur:
  139. for row in targets:
  140. row_id = row["id"]
  141. shop = normalize_text(row.get("store_name"))
  142. # 先保留 link 字段读取,方便后续恢复“shop+link”精确匹配
  143. # 当前按你的要求,不使用 link 参与判断
  144. link = normalize_text(row.get("link_url"))
  145. old_province = normalize_text(row.get("province_name"))
  146. old_city = normalize_text(row.get("city_name"))
  147. # 当前启用:仅按店铺名匹配
  148. if not shop or shop not in shop_map:
  149. stats["skipped_no_match"] += 1
  150. continue
  151. match_pc: Tuple[str, str] = shop_map[shop]
  152. # ------- 预留:将来恢复“店铺+链接”精确匹配 -------
  153. # if shop and link and (shop, link) in exact_map:
  154. # match_pc = exact_map[(shop, link)]
  155. # stats["matched_exact"] += 1
  156. # elif shop in shop_map:
  157. # match_pc = shop_map[shop]
  158. # stats["matched_by_shop"] += 1
  159. # else:
  160. # stats["skipped_no_match"] += 1
  161. # continue
  162. # ---------------------------------------------
  163. stats["matched_by_shop"] += 1
  164. province, city = match_pc
  165. new_province = old_province or province
  166. new_city = old_city or city
  167. # 只补空值:如果两列都不需要补,跳过
  168. if new_province == old_province and new_city == old_city:
  169. stats["skipped_already_filled"] += 1
  170. continue
  171. stats["to_update"] += 1
  172. if not dry_run:
  173. cur.execute(update_sql, (new_province, new_city, row_id))
  174. stats["updated"] += 1
  175. if dry_run:
  176. conn.rollback()
  177. else:
  178. conn.commit()
  179. return stats
  180. def print_summary(stats: Counter, dry_run: bool) -> None:
  181. mode = "DRY-RUN(不写库)" if dry_run else "WRITE(写库)"
  182. print(f"运行模式: {mode}")
  183. print("匹配策略: 仅按店铺名(store_name == shop)匹配")
  184. print(f"来源行数(source_rows): {stats.get('source_rows', 0)}")
  185. print(f"来源店铺映射(shop_map_rows): {stats.get('shop_map_rows', 0)}")
  186. print(f"预留统计-链接唯一(exact_unique_reserved): {stats.get('exact_unique_reserved', 0)}")
  187. print(f"预留统计-链接冲突(exact_ambiguous_reserved): {stats.get('exact_ambiguous_reserved', 0)}")
  188. print(f"目标行数(target_rows): {stats.get('target_rows', 0)}")
  189. print(f"店铺命中(matched_by_shop): {stats.get('matched_by_shop', 0)}")
  190. print(f"待更新(to_update): {stats.get('to_update', 0)}")
  191. print(f"已更新(updated): {stats.get('updated', 0)}")
  192. print(f"跳过-未匹配(skipped_no_match): {stats.get('skipped_no_match', 0)}")
  193. print(f"跳过-已完整(skipped_already_filled): {stats.get('skipped_already_filled', 0)}")
  194. def build_parser() -> argparse.ArgumentParser:
  195. parser = argparse.ArgumentParser(
  196. description="回填 retrieve_scrape_data 的发货省市(platform_id=3)"
  197. )
  198. parser.add_argument(
  199. "--dry-run",
  200. action="store_true",
  201. help="仅预演,不写库",
  202. )
  203. parser.add_argument(
  204. "--limit",
  205. type=int,
  206. default=None,
  207. help="仅处理前 N 条目标数据(调试用)",
  208. )
  209. return parser
  210. def main() -> int:
  211. args = build_parser().parse_args()
  212. config = load_db_config()
  213. conn = connect_db(config)
  214. try:
  215. stats = backfill(
  216. conn=conn,
  217. dry_run=bool(args.dry_run),
  218. limit=args.limit,
  219. )
  220. finally:
  221. conn.close()
  222. print_summary(stats, dry_run=bool(args.dry_run))
  223. return 0
  224. if __name__ == "__main__":
  225. try:
  226. raise SystemExit(main())
  227. except KeyboardInterrupt:
  228. print("用户中断执行")
  229. raise SystemExit(130)
  230. except Exception as exc:
  231. print(f"执行失败: {exc}")
  232. raise SystemExit(1)