manual_non_flagship_backfill.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. import argparse
  2. import os
  3. import re
  4. import sys
  5. from collections import Counter
  6. from datetime import datetime
  7. from pathlib import Path
  8. from typing import Dict, List, Optional, Tuple
  9. import pymysql
  10. from dotenv import load_dotenv
  11. from openpyxl import Workbook, load_workbook
  12. from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
  13. from openpyxl.utils import get_column_letter
  14. """运行命令
  15. # 1) 导出待人工补录模板
  16. python pdd1/process_shop/manual_non_flagship_backfill.py export
  17. # 2) 先做回填预演(不写库,推荐先跑)
  18. python pdd1/process_shop/manual_non_flagship_backfill.py backfill --input pdd1/process_shop/non_flagship_manual_template_20260421.xlsx --dry-run
  19. # 3) 正式回填(写库)文件日期得修改
  20. python pdd1/process_shop/manual_non_flagship_backfill.py backfill --input pdd1/process_shop/non_flagship_manual_template_20260421.xlsx
  21. """
  22. BASE_DIR = Path(__file__).resolve().parent
  23. ENV_PATH = BASE_DIR / ".env.process"
  24. TABLE_DEFAULT = "pdd_shop_info_middle"
  25. TARGET_FIELDS = [
  26. "contact_address",
  27. "qualification_number",
  28. "business_license_company",
  29. "business_license_address",
  30. ]
  31. EXPORT_COLUMNS = [
  32. "id",
  33. "shop",
  34. "store_url",
  35. "scrape_date",
  36. "contact_address",
  37. "qualification_number",
  38. "business_license_company",
  39. "business_license_address",
  40. "manual_note",
  41. ]
  42. REPORT_COLUMNS = [
  43. "excel_row",
  44. "id",
  45. "shop",
  46. "status",
  47. "reason",
  48. "updated_fields",
  49. "already_filled_fields",
  50. "contact_address",
  51. "qualification_number",
  52. "business_license_company",
  53. "business_license_address",
  54. "manual_note",
  55. ]
  56. STATUS_ORDER = [
  57. "updated",
  58. "skipped_no_input",
  59. "skipped_not_found",
  60. "skipped_non_flagship",
  61. "skipped_already_filled",
  62. "error",
  63. ]
  64. THIN_BORDER = Border(
  65. left=Side(style="thin", color="D9D9D9"),
  66. right=Side(style="thin", color="D9D9D9"),
  67. top=Side(style="thin", color="D9D9D9"),
  68. bottom=Side(style="thin", color="D9D9D9"),
  69. )
  70. HEADER_FILL = PatternFill("solid", fgColor="1F4E78")
  71. HEADER_FONT = Font(bold=True, color="FFFFFF")
  72. ROW_FILL_A = PatternFill("solid", fgColor="FFFFFF")
  73. ROW_FILL_B = PatternFill("solid", fgColor="F7FBFF")
  74. EDITABLE_FILL = PatternFill("solid", fgColor="FFF2CC")
  75. READONLY_FILL = PatternFill("solid", fgColor="F2F2F2")
  76. STATUS_FILL_MAP = {
  77. "updated": PatternFill("solid", fgColor="E2F0D9"),
  78. "skipped_no_input": PatternFill("solid", fgColor="FFF2CC"),
  79. "skipped_not_found": PatternFill("solid", fgColor="FCE4D6"),
  80. "skipped_non_flagship": PatternFill("solid", fgColor="EDEDED"),
  81. "skipped_already_filled": PatternFill("solid", fgColor="D9E1F2"),
  82. "error": PatternFill("solid", fgColor="F8CBAD"),
  83. }
  84. TEMPLATE_WIDTH_MAP = {
  85. "id": 10,
  86. "shop": 30,
  87. "store_url": 48,
  88. "scrape_date": 14,
  89. "contact_address": 28,
  90. "qualification_number": 28,
  91. "business_license_company": 30,
  92. "business_license_address": 30,
  93. "manual_note": 32,
  94. }
  95. REPORT_WIDTH_MAP = {
  96. "excel_row": 10,
  97. "id": 10,
  98. "shop": 28,
  99. "status": 24,
  100. "reason": 50,
  101. "updated_fields": 26,
  102. "already_filled_fields": 30,
  103. "contact_address": 24,
  104. "qualification_number": 24,
  105. "business_license_company": 28,
  106. "business_license_address": 28,
  107. "manual_note": 30,
  108. }
  109. def normalize_text(value) -> str:
  110. if value is None:
  111. return ""
  112. return str(value).strip()
  113. def format_date(value) -> str:
  114. if value is None:
  115. return ""
  116. if hasattr(value, "strftime"):
  117. return value.strftime("%Y-%m-%d")
  118. return normalize_text(value)
  119. def validate_table_name(table_name: str) -> str:
  120. if not table_name:
  121. return TABLE_DEFAULT
  122. if not re.match(r"^[A-Za-z0-9_]+$", table_name):
  123. raise ValueError(f"非法表名: {table_name}")
  124. return table_name
  125. def load_db_config() -> Tuple[Dict[str, object], str]:
  126. if not ENV_PATH.is_file():
  127. raise FileNotFoundError(f"找不到配置文件: {ENV_PATH}")
  128. load_dotenv(ENV_PATH)
  129. db_name = normalize_text(os.getenv("DB_DATABASE"))
  130. if not db_name:
  131. raise ValueError("DB_DATABASE 不能为空")
  132. config = {
  133. "host": normalize_text(os.getenv("DB_HOST")) or "localhost",
  134. "port": int(normalize_text(os.getenv("DB_PORT")) or "3306"),
  135. "user": normalize_text(os.getenv("DB_USERNAME")) or "root",
  136. "password": os.getenv("DB_PASSWORD", ""),
  137. "db_name": db_name,
  138. }
  139. table_name = validate_table_name(
  140. normalize_text(os.getenv("DB_TABLENAME")) or TABLE_DEFAULT
  141. )
  142. return config, table_name
  143. def connect_db(config: Dict[str, object]) -> pymysql.connections.Connection:
  144. return pymysql.connect(
  145. host=config["host"],
  146. port=int(config["port"]),
  147. user=config["user"],
  148. password=config["password"],
  149. database=config["db_name"],
  150. charset="utf8mb4",
  151. cursorclass=pymysql.cursors.DictCursor,
  152. connect_timeout=10,
  153. autocommit=False,
  154. )
  155. def build_missing_condition() -> str:
  156. checks = [f"COALESCE(TRIM(`{field}`), '') = ''" for field in TARGET_FIELDS]
  157. return " OR ".join(checks)
  158. def resolve_output_path(path_str: Optional[str], default_name: str) -> Path:
  159. if path_str:
  160. out = Path(path_str).expanduser()
  161. if not out.is_absolute():
  162. out = (Path.cwd() / out).resolve()
  163. else:
  164. out = (BASE_DIR / default_name).resolve()
  165. out.parent.mkdir(parents=True, exist_ok=True)
  166. return out
  167. def style_header(ws, headers: List[str]) -> None:
  168. alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
  169. for idx, _ in enumerate(headers, start=1):
  170. cell = ws.cell(row=1, column=idx)
  171. cell.fill = HEADER_FILL
  172. cell.font = HEADER_FONT
  173. cell.alignment = alignment
  174. cell.border = THIN_BORDER
  175. ws.freeze_panes = "A2"
  176. ws.row_dimensions[1].height = 24
  177. def auto_fit_columns(ws, rows: List[List[object]], min_width: int = 12, max_width: int = 60) -> None:
  178. if not rows:
  179. return
  180. col_count = len(rows[0])
  181. for col_idx in range(col_count):
  182. max_len = 0
  183. for row in rows:
  184. value = row[col_idx] if col_idx < len(row) else ""
  185. text = normalize_text(value)
  186. max_len = max(max_len, len(text))
  187. width = min(max(min_width, max_len + 2), max_width)
  188. col_letter = ws.cell(row=1, column=col_idx + 1).column_letter
  189. ws.column_dimensions[col_letter].width = width
  190. def apply_data_style(ws, headers: List[str], row_count: int, sheet_name: str) -> None:
  191. header_to_col = {name: idx + 1 for idx, name in enumerate(headers)}
  192. editable_cols = {
  193. header_to_col[col]
  194. for col in TARGET_FIELDS + ["manual_note"]
  195. if col in header_to_col
  196. }
  197. status_col = header_to_col.get("status")
  198. center_cols = {"id", "excel_row", "scrape_date", "status"}
  199. for row_idx in range(2, row_count + 2):
  200. base_fill = ROW_FILL_A if row_idx % 2 == 0 else ROW_FILL_B
  201. ws.row_dimensions[row_idx].height = 20
  202. for col_idx, col_name in enumerate(headers, start=1):
  203. cell = ws.cell(row=row_idx, column=col_idx)
  204. cell.border = THIN_BORDER
  205. if col_name in center_cols:
  206. cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
  207. else:
  208. cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)
  209. cell.fill = base_fill
  210. if sheet_name == "manual_fill":
  211. if col_idx in editable_cols:
  212. cell.fill = EDITABLE_FILL
  213. else:
  214. cell.fill = READONLY_FILL
  215. if sheet_name == "backfill_report" and status_col:
  216. status_cell = ws.cell(row=row_idx, column=status_col)
  217. status_text = normalize_text(status_cell.value)
  218. fill = STATUS_FILL_MAP.get(status_text)
  219. if fill:
  220. status_cell.fill = fill
  221. def apply_filter(ws, headers: List[str], row_count: int) -> None:
  222. last_col = get_column_letter(len(headers))
  223. end_row = max(2, row_count + 1)
  224. ws.auto_filter.ref = f"A1:{last_col}{end_row}"
  225. def apply_column_width_hints(ws, headers: List[str], sheet_name: str) -> None:
  226. width_map = TEMPLATE_WIDTH_MAP if sheet_name == "manual_fill" else REPORT_WIDTH_MAP
  227. for idx, col_name in enumerate(headers, start=1):
  228. hint = width_map.get(col_name)
  229. if hint:
  230. ws.column_dimensions[get_column_letter(idx)].width = hint
  231. def fetch_export_rows(
  232. conn: pymysql.connections.Connection, table_name: str, today_only: bool
  233. ) -> List[Dict]:
  234. where_parts = [
  235. "`shop` NOT LIKE %s",
  236. f"({build_missing_condition()})",
  237. ]
  238. params: List[object] = ["%旗舰店%"]
  239. if today_only:
  240. where_parts.append("`scrape_date` = CURDATE()")
  241. sql = f"""
  242. SELECT
  243. `id`,
  244. `shop`,
  245. `store_url`,
  246. `scrape_date`,
  247. `contact_address`,
  248. `qualification_number`,
  249. `business_license_company`,
  250. `business_license_address`
  251. FROM `{table_name}`
  252. WHERE {" AND ".join(where_parts)}
  253. ORDER BY `id` ASC
  254. """
  255. with conn.cursor() as cur:
  256. cur.execute(sql, params)
  257. return cur.fetchall()
  258. def write_xlsx(path: Path, headers: List[str], data_rows: List[List[object]], sheet_name: str) -> None:
  259. wb = Workbook()
  260. ws = wb.active
  261. ws.title = sheet_name
  262. ws.append(headers)
  263. for row in data_rows:
  264. ws.append(row)
  265. style_header(ws, headers)
  266. auto_fit_columns(ws, [headers] + data_rows, min_width=10, max_width=58)
  267. apply_column_width_hints(ws, headers, sheet_name)
  268. apply_data_style(ws, headers, len(data_rows), sheet_name)
  269. apply_filter(ws, headers, len(data_rows))
  270. wb.save(path)
  271. def export_template(output_path: Optional[str]) -> int:
  272. config, table_name = load_db_config()
  273. conn = connect_db(config)
  274. try:
  275. rows = fetch_export_rows(conn, table_name, today_only=True)
  276. export_scope = "today"
  277. if not rows:
  278. rows = fetch_export_rows(conn, table_name, today_only=False)
  279. export_scope = "fallback_all_missing"
  280. finally:
  281. conn.close()
  282. default_name = f"non_flagship_manual_template_{datetime.now().strftime('%Y%m%d')}.xlsx"
  283. out_path = resolve_output_path(output_path, default_name)
  284. write_rows: List[List[object]] = []
  285. for item in rows:
  286. write_rows.append(
  287. [
  288. item.get("id"),
  289. normalize_text(item.get("shop")),
  290. normalize_text(item.get("store_url")),
  291. format_date(item.get("scrape_date")),
  292. normalize_text(item.get("contact_address")),
  293. normalize_text(item.get("qualification_number")),
  294. normalize_text(item.get("business_license_company")),
  295. normalize_text(item.get("business_license_address")),
  296. "",
  297. ]
  298. )
  299. write_xlsx(
  300. path=out_path,
  301. headers=EXPORT_COLUMNS,
  302. data_rows=write_rows,
  303. sheet_name="manual_fill",
  304. )
  305. print(f"导出完成: {out_path}")
  306. print(f"导出范围: {export_scope}")
  307. print(f"导出条数: {len(write_rows)}")
  308. return 0
  309. def read_excel_rows(input_path: Path) -> List[Dict]:
  310. wb = load_workbook(filename=input_path, data_only=True)
  311. ws = wb.active
  312. header_cells = next(ws.iter_rows(min_row=1, max_row=1, values_only=True), None)
  313. if not header_cells:
  314. raise ValueError("Excel 为空,缺少表头")
  315. headers = [normalize_text(v) for v in header_cells]
  316. header_to_index = {name: idx for idx, name in enumerate(headers) if name}
  317. required = ["id"] + TARGET_FIELDS
  318. missing = [col for col in required if col not in header_to_index]
  319. if missing:
  320. raise ValueError(f"Excel 缺少必需列: {', '.join(missing)}")
  321. rows: List[Dict] = []
  322. for excel_row, values in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2):
  323. normalized_values = list(values) if values else []
  324. if not normalized_values:
  325. continue
  326. if all(normalize_text(v) == "" for v in normalized_values):
  327. continue
  328. record = {"excel_row": excel_row}
  329. for name, idx in header_to_index.items():
  330. record[name] = normalized_values[idx] if idx < len(normalized_values) else None
  331. rows.append(record)
  332. return rows
  333. def fetch_db_row(
  334. cur: pymysql.cursors.DictCursor, table_name: str, row_id: int
  335. ) -> Optional[Dict]:
  336. sql = f"""
  337. SELECT
  338. `id`,
  339. `shop`,
  340. `contact_address`,
  341. `qualification_number`,
  342. `business_license_company`,
  343. `business_license_address`
  344. FROM `{table_name}`
  345. WHERE `id` = %s
  346. LIMIT 1
  347. """
  348. cur.execute(sql, (row_id,))
  349. return cur.fetchone()
  350. def update_db_row(
  351. conn: pymysql.connections.Connection,
  352. cur: pymysql.cursors.DictCursor,
  353. table_name: str,
  354. row_id: int,
  355. updates: Dict[str, str],
  356. ) -> None:
  357. set_sql = ", ".join([f"`{key}` = %s" for key in updates.keys()])
  358. sql = f"UPDATE `{table_name}` SET {set_sql}, `update_time` = NOW() WHERE `id` = %s"
  359. params = list(updates.values()) + [row_id]
  360. cur.execute(sql, params)
  361. conn.commit()
  362. def make_report_row(
  363. excel_row: int,
  364. row_id,
  365. shop: str,
  366. status: str,
  367. reason: str,
  368. updated_fields: List[str],
  369. already_filled_fields: List[str],
  370. input_fields: Dict[str, str],
  371. manual_note: str,
  372. ) -> Dict:
  373. return {
  374. "excel_row": excel_row,
  375. "id": row_id,
  376. "shop": shop,
  377. "status": status,
  378. "reason": reason,
  379. "updated_fields": ",".join(updated_fields),
  380. "already_filled_fields": ",".join(already_filled_fields),
  381. "contact_address": input_fields.get("contact_address", ""),
  382. "qualification_number": input_fields.get("qualification_number", ""),
  383. "business_license_company": input_fields.get("business_license_company", ""),
  384. "business_license_address": input_fields.get("business_license_address", ""),
  385. "manual_note": manual_note,
  386. }
  387. def backfill_from_excel(
  388. input_path: str,
  389. report_path: Optional[str],
  390. dry_run: bool,
  391. ) -> int:
  392. xlsx_path = Path(input_path).expanduser()
  393. if not xlsx_path.is_absolute():
  394. xlsx_path = (Path.cwd() / xlsx_path).resolve()
  395. if not xlsx_path.is_file():
  396. raise FileNotFoundError(f"输入文件不存在: {xlsx_path}")
  397. excel_rows = read_excel_rows(xlsx_path)
  398. if not excel_rows:
  399. print(f"输入文件无可处理数据: {xlsx_path}")
  400. default_report = (
  401. f"non_flagship_backfill_report_{datetime.now().strftime('%Y%m%d')}.xlsx"
  402. )
  403. out_report = resolve_output_path(report_path, default_report)
  404. write_xlsx(out_report, REPORT_COLUMNS, [], "backfill_report")
  405. print(f"已生成空报告: {out_report}")
  406. return 0
  407. config, table_name = load_db_config()
  408. conn = connect_db(config)
  409. report_rows: List[Dict] = []
  410. stats = Counter()
  411. try:
  412. with conn.cursor() as cur:
  413. for row in excel_rows:
  414. excel_row = row.get("excel_row")
  415. row_id_raw = row.get("id")
  416. manual_note = normalize_text(row.get("manual_note"))
  417. input_fields = {
  418. field: normalize_text(row.get(field))
  419. for field in TARGET_FIELDS
  420. }
  421. try:
  422. row_id = int(normalize_text(row_id_raw))
  423. except Exception:
  424. status = "error"
  425. reason = "id 非法,无法转换为整数"
  426. stats[status] += 1
  427. report_rows.append(
  428. make_report_row(
  429. excel_row=excel_row,
  430. row_id=row_id_raw,
  431. shop=normalize_text(row.get("shop")),
  432. status=status,
  433. reason=reason,
  434. updated_fields=[],
  435. already_filled_fields=[],
  436. input_fields=input_fields,
  437. manual_note=manual_note,
  438. )
  439. )
  440. continue
  441. if all(v == "" for v in input_fields.values()):
  442. status = "skipped_no_input"
  443. reason = "四个目标字段均为空"
  444. stats[status] += 1
  445. report_rows.append(
  446. make_report_row(
  447. excel_row=excel_row,
  448. row_id=row_id,
  449. shop=normalize_text(row.get("shop")),
  450. status=status,
  451. reason=reason,
  452. updated_fields=[],
  453. already_filled_fields=[],
  454. input_fields=input_fields,
  455. manual_note=manual_note,
  456. )
  457. )
  458. continue
  459. db_row = fetch_db_row(cur, table_name, row_id)
  460. if not db_row:
  461. status = "skipped_not_found"
  462. reason = "数据库不存在该 id"
  463. stats[status] += 1
  464. report_rows.append(
  465. make_report_row(
  466. excel_row=excel_row,
  467. row_id=row_id,
  468. shop=normalize_text(row.get("shop")),
  469. status=status,
  470. reason=reason,
  471. updated_fields=[],
  472. already_filled_fields=[],
  473. input_fields=input_fields,
  474. manual_note=manual_note,
  475. )
  476. )
  477. continue
  478. db_shop = normalize_text(db_row.get("shop"))
  479. if "旗舰店" in db_shop:
  480. status = "skipped_non_flagship"
  481. reason = "数据库记录属于旗舰店"
  482. stats[status] += 1
  483. report_rows.append(
  484. make_report_row(
  485. excel_row=excel_row,
  486. row_id=row_id,
  487. shop=db_shop,
  488. status=status,
  489. reason=reason,
  490. updated_fields=[],
  491. already_filled_fields=[],
  492. input_fields=input_fields,
  493. manual_note=manual_note,
  494. )
  495. )
  496. continue
  497. updates: Dict[str, str] = {}
  498. already_filled_fields: List[str] = []
  499. for field in TARGET_FIELDS:
  500. excel_val = input_fields[field]
  501. if excel_val == "":
  502. continue
  503. db_val = normalize_text(db_row.get(field))
  504. if db_val == "":
  505. updates[field] = excel_val
  506. else:
  507. already_filled_fields.append(field)
  508. if not updates:
  509. status = "skipped_already_filled"
  510. reason = "数据库对应字段已存在非空值,按仅补空策略跳过"
  511. stats[status] += 1
  512. report_rows.append(
  513. make_report_row(
  514. excel_row=excel_row,
  515. row_id=row_id,
  516. shop=db_shop,
  517. status=status,
  518. reason=reason,
  519. updated_fields=[],
  520. already_filled_fields=already_filled_fields,
  521. input_fields=input_fields,
  522. manual_note=manual_note,
  523. )
  524. )
  525. continue
  526. try:
  527. if not dry_run:
  528. update_db_row(conn, cur, table_name, row_id, updates)
  529. status = "updated"
  530. reason = "dry-run 未写库,仅预览更新" if dry_run else "更新成功"
  531. stats[status] += 1
  532. report_rows.append(
  533. make_report_row(
  534. excel_row=excel_row,
  535. row_id=row_id,
  536. shop=db_shop,
  537. status=status,
  538. reason=reason,
  539. updated_fields=list(updates.keys()),
  540. already_filled_fields=already_filled_fields,
  541. input_fields=input_fields,
  542. manual_note=manual_note,
  543. )
  544. )
  545. except Exception as row_exc:
  546. conn.rollback()
  547. status = "error"
  548. reason = f"更新异常: {row_exc}"
  549. stats[status] += 1
  550. report_rows.append(
  551. make_report_row(
  552. excel_row=excel_row,
  553. row_id=row_id,
  554. shop=db_shop,
  555. status=status,
  556. reason=reason,
  557. updated_fields=list(updates.keys()),
  558. already_filled_fields=already_filled_fields,
  559. input_fields=input_fields,
  560. manual_note=manual_note,
  561. )
  562. )
  563. finally:
  564. conn.close()
  565. default_report = f"non_flagship_backfill_report_{datetime.now().strftime('%Y%m%d')}.xlsx"
  566. out_report = resolve_output_path(report_path, default_report)
  567. report_data_rows = [
  568. [row.get(col, "") for col in REPORT_COLUMNS]
  569. for row in report_rows
  570. ]
  571. write_xlsx(
  572. path=out_report,
  573. headers=REPORT_COLUMNS,
  574. data_rows=report_data_rows,
  575. sheet_name="backfill_report",
  576. )
  577. print(f"回填报告: {out_report}")
  578. print(f"处理行数: {len(report_rows)}")
  579. for status in STATUS_ORDER:
  580. if stats.get(status, 0):
  581. print(f"{status}: {stats[status]}")
  582. return 0
  583. def build_parser() -> argparse.ArgumentParser:
  584. parser = argparse.ArgumentParser(
  585. description="非旗舰店人工补录导出与回填工具(Excel)"
  586. )
  587. sub = parser.add_subparsers(dest="command", required=True)
  588. export_parser = sub.add_parser("export", help="导出人工补录模板")
  589. export_parser.add_argument(
  590. "--output",
  591. help="导出文件路径(xlsx)",
  592. default=None,
  593. )
  594. backfill_parser = sub.add_parser("backfill", help="读取 Excel 回填数据库")
  595. backfill_parser.add_argument(
  596. "--input",
  597. required=True,
  598. help="人工填写后的 Excel 文件路径",
  599. )
  600. backfill_parser.add_argument(
  601. "--report",
  602. default=None,
  603. help="回填报告文件路径(xlsx)",
  604. )
  605. backfill_parser.add_argument(
  606. "--dry-run",
  607. action="store_true",
  608. help="只预览不写库",
  609. )
  610. return parser
  611. def main() -> int:
  612. parser = build_parser()
  613. args = parser.parse_args()
  614. if args.command == "export":
  615. return export_template(args.output)
  616. if args.command == "backfill":
  617. return backfill_from_excel(
  618. input_path=args.input,
  619. report_path=args.report,
  620. dry_run=bool(args.dry_run),
  621. )
  622. parser.print_help()
  623. return 1
  624. if __name__ == "__main__":
  625. try:
  626. raise SystemExit(main())
  627. except KeyboardInterrupt:
  628. print("用户中断执行")
  629. raise SystemExit(130)
  630. except Exception as exc:
  631. print(f"执行失败: {exc}")
  632. raise SystemExit(1)