import argparse import os import re import sys from collections import Counter from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple import pymysql from dotenv import load_dotenv from openpyxl import Workbook, load_workbook from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter """运行命令 # 1) 导出待人工补录模板 python pdd1/process_shop/manual_non_flagship_backfill.py export # 2) 先做回填预演(不写库,推荐先跑) python pdd1/process_shop/manual_non_flagship_backfill.py backfill --input pdd1/process_shop/non_flagship_manual_template_20260421.xlsx --dry-run # 3) 正式回填(写库)文件日期得修改 python pdd1/process_shop/manual_non_flagship_backfill.py backfill --input pdd1/process_shop/non_flagship_manual_template_20260421.xlsx """ BASE_DIR = Path(__file__).resolve().parent ENV_PATH = BASE_DIR / ".env.process" TABLE_DEFAULT = "pdd_shop_info_middle" TARGET_FIELDS = [ "contact_address", "qualification_number", "business_license_company", "business_license_address", ] EXPORT_COLUMNS = [ "id", "shop", "store_url", "scrape_date", "contact_address", "qualification_number", "business_license_company", "business_license_address", "manual_note", ] REPORT_COLUMNS = [ "excel_row", "id", "shop", "status", "reason", "updated_fields", "already_filled_fields", "contact_address", "qualification_number", "business_license_company", "business_license_address", "manual_note", ] STATUS_ORDER = [ "updated", "skipped_no_input", "skipped_not_found", "skipped_non_flagship", "skipped_already_filled", "error", ] THIN_BORDER = Border( left=Side(style="thin", color="D9D9D9"), right=Side(style="thin", color="D9D9D9"), top=Side(style="thin", color="D9D9D9"), bottom=Side(style="thin", color="D9D9D9"), ) HEADER_FILL = PatternFill("solid", fgColor="1F4E78") HEADER_FONT = Font(bold=True, color="FFFFFF") ROW_FILL_A = PatternFill("solid", fgColor="FFFFFF") ROW_FILL_B = PatternFill("solid", fgColor="F7FBFF") EDITABLE_FILL = PatternFill("solid", fgColor="FFF2CC") READONLY_FILL = PatternFill("solid", fgColor="F2F2F2") STATUS_FILL_MAP = { "updated": PatternFill("solid", fgColor="E2F0D9"), "skipped_no_input": PatternFill("solid", fgColor="FFF2CC"), "skipped_not_found": PatternFill("solid", fgColor="FCE4D6"), "skipped_non_flagship": PatternFill("solid", fgColor="EDEDED"), "skipped_already_filled": PatternFill("solid", fgColor="D9E1F2"), "error": PatternFill("solid", fgColor="F8CBAD"), } TEMPLATE_WIDTH_MAP = { "id": 10, "shop": 30, "store_url": 48, "scrape_date": 14, "contact_address": 28, "qualification_number": 28, "business_license_company": 30, "business_license_address": 30, "manual_note": 32, } REPORT_WIDTH_MAP = { "excel_row": 10, "id": 10, "shop": 28, "status": 24, "reason": 50, "updated_fields": 26, "already_filled_fields": 30, "contact_address": 24, "qualification_number": 24, "business_license_company": 28, "business_license_address": 28, "manual_note": 30, } def normalize_text(value) -> str: if value is None: return "" return str(value).strip() def format_date(value) -> str: if value is None: return "" if hasattr(value, "strftime"): return value.strftime("%Y-%m-%d") return normalize_text(value) def validate_table_name(table_name: str) -> str: if not table_name: return TABLE_DEFAULT if not re.match(r"^[A-Za-z0-9_]+$", table_name): raise ValueError(f"非法表名: {table_name}") return table_name def load_db_config() -> Tuple[Dict[str, object], str]: if not ENV_PATH.is_file(): raise FileNotFoundError(f"找不到配置文件: {ENV_PATH}") load_dotenv(ENV_PATH) db_name = normalize_text(os.getenv("DB_DATABASE")) if not db_name: raise ValueError("DB_DATABASE 不能为空") config = { "host": normalize_text(os.getenv("DB_HOST")) or "localhost", "port": int(normalize_text(os.getenv("DB_PORT")) or "3306"), "user": normalize_text(os.getenv("DB_USERNAME")) or "root", "password": os.getenv("DB_PASSWORD", ""), "db_name": db_name, } table_name = validate_table_name( normalize_text(os.getenv("DB_TABLENAME")) or TABLE_DEFAULT ) return config, table_name def connect_db(config: Dict[str, object]) -> pymysql.connections.Connection: return pymysql.connect( host=config["host"], port=int(config["port"]), user=config["user"], password=config["password"], database=config["db_name"], charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, connect_timeout=10, autocommit=False, ) def build_missing_condition() -> str: checks = [f"COALESCE(TRIM(`{field}`), '') = ''" for field in TARGET_FIELDS] return " OR ".join(checks) def resolve_output_path(path_str: Optional[str], default_name: str) -> Path: if path_str: out = Path(path_str).expanduser() if not out.is_absolute(): out = (Path.cwd() / out).resolve() else: out = (BASE_DIR / default_name).resolve() out.parent.mkdir(parents=True, exist_ok=True) return out def style_header(ws, headers: List[str]) -> None: alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) for idx, _ in enumerate(headers, start=1): cell = ws.cell(row=1, column=idx) cell.fill = HEADER_FILL cell.font = HEADER_FONT cell.alignment = alignment cell.border = THIN_BORDER ws.freeze_panes = "A2" ws.row_dimensions[1].height = 24 def auto_fit_columns(ws, rows: List[List[object]], min_width: int = 12, max_width: int = 60) -> None: if not rows: return col_count = len(rows[0]) for col_idx in range(col_count): max_len = 0 for row in rows: value = row[col_idx] if col_idx < len(row) else "" text = normalize_text(value) max_len = max(max_len, len(text)) width = min(max(min_width, max_len + 2), max_width) col_letter = ws.cell(row=1, column=col_idx + 1).column_letter ws.column_dimensions[col_letter].width = width def apply_data_style(ws, headers: List[str], row_count: int, sheet_name: str) -> None: header_to_col = {name: idx + 1 for idx, name in enumerate(headers)} editable_cols = { header_to_col[col] for col in TARGET_FIELDS + ["manual_note"] if col in header_to_col } status_col = header_to_col.get("status") center_cols = {"id", "excel_row", "scrape_date", "status"} for row_idx in range(2, row_count + 2): base_fill = ROW_FILL_A if row_idx % 2 == 0 else ROW_FILL_B ws.row_dimensions[row_idx].height = 20 for col_idx, col_name in enumerate(headers, start=1): cell = ws.cell(row=row_idx, column=col_idx) cell.border = THIN_BORDER if col_name in center_cols: cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) else: cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True) cell.fill = base_fill if sheet_name == "manual_fill": if col_idx in editable_cols: cell.fill = EDITABLE_FILL else: cell.fill = READONLY_FILL if sheet_name == "backfill_report" and status_col: status_cell = ws.cell(row=row_idx, column=status_col) status_text = normalize_text(status_cell.value) fill = STATUS_FILL_MAP.get(status_text) if fill: status_cell.fill = fill def apply_filter(ws, headers: List[str], row_count: int) -> None: last_col = get_column_letter(len(headers)) end_row = max(2, row_count + 1) ws.auto_filter.ref = f"A1:{last_col}{end_row}" def apply_column_width_hints(ws, headers: List[str], sheet_name: str) -> None: width_map = TEMPLATE_WIDTH_MAP if sheet_name == "manual_fill" else REPORT_WIDTH_MAP for idx, col_name in enumerate(headers, start=1): hint = width_map.get(col_name) if hint: ws.column_dimensions[get_column_letter(idx)].width = hint def fetch_export_rows( conn: pymysql.connections.Connection, table_name: str, today_only: bool ) -> List[Dict]: where_parts = [ "`shop` NOT LIKE %s", f"({build_missing_condition()})", ] params: List[object] = ["%旗舰店%"] if today_only: where_parts.append("`scrape_date` = CURDATE()") sql = f""" SELECT `id`, `shop`, `store_url`, `scrape_date`, `contact_address`, `qualification_number`, `business_license_company`, `business_license_address` FROM `{table_name}` WHERE {" AND ".join(where_parts)} ORDER BY `id` ASC """ with conn.cursor() as cur: cur.execute(sql, params) return cur.fetchall() def write_xlsx(path: Path, headers: List[str], data_rows: List[List[object]], sheet_name: str) -> None: wb = Workbook() ws = wb.active ws.title = sheet_name ws.append(headers) for row in data_rows: ws.append(row) style_header(ws, headers) auto_fit_columns(ws, [headers] + data_rows, min_width=10, max_width=58) apply_column_width_hints(ws, headers, sheet_name) apply_data_style(ws, headers, len(data_rows), sheet_name) apply_filter(ws, headers, len(data_rows)) wb.save(path) def export_template(output_path: Optional[str]) -> int: config, table_name = load_db_config() conn = connect_db(config) try: rows = fetch_export_rows(conn, table_name, today_only=True) export_scope = "today" if not rows: rows = fetch_export_rows(conn, table_name, today_only=False) export_scope = "fallback_all_missing" finally: conn.close() default_name = f"non_flagship_manual_template_{datetime.now().strftime('%Y%m%d')}.xlsx" out_path = resolve_output_path(output_path, default_name) write_rows: List[List[object]] = [] for item in rows: write_rows.append( [ item.get("id"), normalize_text(item.get("shop")), normalize_text(item.get("store_url")), format_date(item.get("scrape_date")), normalize_text(item.get("contact_address")), normalize_text(item.get("qualification_number")), normalize_text(item.get("business_license_company")), normalize_text(item.get("business_license_address")), "", ] ) write_xlsx( path=out_path, headers=EXPORT_COLUMNS, data_rows=write_rows, sheet_name="manual_fill", ) print(f"导出完成: {out_path}") print(f"导出范围: {export_scope}") print(f"导出条数: {len(write_rows)}") return 0 def read_excel_rows(input_path: Path) -> List[Dict]: wb = load_workbook(filename=input_path, data_only=True) ws = wb.active header_cells = next(ws.iter_rows(min_row=1, max_row=1, values_only=True), None) if not header_cells: raise ValueError("Excel 为空,缺少表头") headers = [normalize_text(v) for v in header_cells] header_to_index = {name: idx for idx, name in enumerate(headers) if name} required = ["id"] + TARGET_FIELDS missing = [col for col in required if col not in header_to_index] if missing: raise ValueError(f"Excel 缺少必需列: {', '.join(missing)}") rows: List[Dict] = [] for excel_row, values in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2): normalized_values = list(values) if values else [] if not normalized_values: continue if all(normalize_text(v) == "" for v in normalized_values): continue record = {"excel_row": excel_row} for name, idx in header_to_index.items(): record[name] = normalized_values[idx] if idx < len(normalized_values) else None rows.append(record) return rows def fetch_db_row( cur: pymysql.cursors.DictCursor, table_name: str, row_id: int ) -> Optional[Dict]: sql = f""" SELECT `id`, `shop`, `contact_address`, `qualification_number`, `business_license_company`, `business_license_address` FROM `{table_name}` WHERE `id` = %s LIMIT 1 """ cur.execute(sql, (row_id,)) return cur.fetchone() def update_db_row( conn: pymysql.connections.Connection, cur: pymysql.cursors.DictCursor, table_name: str, row_id: int, updates: Dict[str, str], ) -> None: set_sql = ", ".join([f"`{key}` = %s" for key in updates.keys()]) sql = f"UPDATE `{table_name}` SET {set_sql}, `update_time` = NOW() WHERE `id` = %s" params = list(updates.values()) + [row_id] cur.execute(sql, params) conn.commit() def make_report_row( excel_row: int, row_id, shop: str, status: str, reason: str, updated_fields: List[str], already_filled_fields: List[str], input_fields: Dict[str, str], manual_note: str, ) -> Dict: return { "excel_row": excel_row, "id": row_id, "shop": shop, "status": status, "reason": reason, "updated_fields": ",".join(updated_fields), "already_filled_fields": ",".join(already_filled_fields), "contact_address": input_fields.get("contact_address", ""), "qualification_number": input_fields.get("qualification_number", ""), "business_license_company": input_fields.get("business_license_company", ""), "business_license_address": input_fields.get("business_license_address", ""), "manual_note": manual_note, } def backfill_from_excel( input_path: str, report_path: Optional[str], dry_run: bool, ) -> int: xlsx_path = Path(input_path).expanduser() if not xlsx_path.is_absolute(): xlsx_path = (Path.cwd() / xlsx_path).resolve() if not xlsx_path.is_file(): raise FileNotFoundError(f"输入文件不存在: {xlsx_path}") excel_rows = read_excel_rows(xlsx_path) if not excel_rows: print(f"输入文件无可处理数据: {xlsx_path}") default_report = ( f"non_flagship_backfill_report_{datetime.now().strftime('%Y%m%d')}.xlsx" ) out_report = resolve_output_path(report_path, default_report) write_xlsx(out_report, REPORT_COLUMNS, [], "backfill_report") print(f"已生成空报告: {out_report}") return 0 config, table_name = load_db_config() conn = connect_db(config) report_rows: List[Dict] = [] stats = Counter() try: with conn.cursor() as cur: for row in excel_rows: excel_row = row.get("excel_row") row_id_raw = row.get("id") manual_note = normalize_text(row.get("manual_note")) input_fields = { field: normalize_text(row.get(field)) for field in TARGET_FIELDS } try: row_id = int(normalize_text(row_id_raw)) except Exception: status = "error" reason = "id 非法,无法转换为整数" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id_raw, shop=normalize_text(row.get("shop")), status=status, reason=reason, updated_fields=[], already_filled_fields=[], input_fields=input_fields, manual_note=manual_note, ) ) continue if all(v == "" for v in input_fields.values()): status = "skipped_no_input" reason = "四个目标字段均为空" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id, shop=normalize_text(row.get("shop")), status=status, reason=reason, updated_fields=[], already_filled_fields=[], input_fields=input_fields, manual_note=manual_note, ) ) continue db_row = fetch_db_row(cur, table_name, row_id) if not db_row: status = "skipped_not_found" reason = "数据库不存在该 id" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id, shop=normalize_text(row.get("shop")), status=status, reason=reason, updated_fields=[], already_filled_fields=[], input_fields=input_fields, manual_note=manual_note, ) ) continue db_shop = normalize_text(db_row.get("shop")) if "旗舰店" in db_shop: status = "skipped_non_flagship" reason = "数据库记录属于旗舰店" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id, shop=db_shop, status=status, reason=reason, updated_fields=[], already_filled_fields=[], input_fields=input_fields, manual_note=manual_note, ) ) continue updates: Dict[str, str] = {} already_filled_fields: List[str] = [] for field in TARGET_FIELDS: excel_val = input_fields[field] if excel_val == "": continue db_val = normalize_text(db_row.get(field)) if db_val == "": updates[field] = excel_val else: already_filled_fields.append(field) if not updates: status = "skipped_already_filled" reason = "数据库对应字段已存在非空值,按仅补空策略跳过" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id, shop=db_shop, status=status, reason=reason, updated_fields=[], already_filled_fields=already_filled_fields, input_fields=input_fields, manual_note=manual_note, ) ) continue try: if not dry_run: update_db_row(conn, cur, table_name, row_id, updates) status = "updated" reason = "dry-run 未写库,仅预览更新" if dry_run else "更新成功" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id, shop=db_shop, status=status, reason=reason, updated_fields=list(updates.keys()), already_filled_fields=already_filled_fields, input_fields=input_fields, manual_note=manual_note, ) ) except Exception as row_exc: conn.rollback() status = "error" reason = f"更新异常: {row_exc}" stats[status] += 1 report_rows.append( make_report_row( excel_row=excel_row, row_id=row_id, shop=db_shop, status=status, reason=reason, updated_fields=list(updates.keys()), already_filled_fields=already_filled_fields, input_fields=input_fields, manual_note=manual_note, ) ) finally: conn.close() default_report = f"non_flagship_backfill_report_{datetime.now().strftime('%Y%m%d')}.xlsx" out_report = resolve_output_path(report_path, default_report) report_data_rows = [ [row.get(col, "") for col in REPORT_COLUMNS] for row in report_rows ] write_xlsx( path=out_report, headers=REPORT_COLUMNS, data_rows=report_data_rows, sheet_name="backfill_report", ) print(f"回填报告: {out_report}") print(f"处理行数: {len(report_rows)}") for status in STATUS_ORDER: if stats.get(status, 0): print(f"{status}: {stats[status]}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="非旗舰店人工补录导出与回填工具(Excel)" ) sub = parser.add_subparsers(dest="command", required=True) export_parser = sub.add_parser("export", help="导出人工补录模板") export_parser.add_argument( "--output", help="导出文件路径(xlsx)", default=None, ) backfill_parser = sub.add_parser("backfill", help="读取 Excel 回填数据库") backfill_parser.add_argument( "--input", required=True, help="人工填写后的 Excel 文件路径", ) backfill_parser.add_argument( "--report", default=None, help="回填报告文件路径(xlsx)", ) backfill_parser.add_argument( "--dry-run", action="store_true", help="只预览不写库", ) return parser def main() -> int: parser = build_parser() args = parser.parse_args() if args.command == "export": return export_template(args.output) if args.command == "backfill": return backfill_from_excel( input_path=args.input, report_path=args.report, dry_run=bool(args.dry_run), ) parser.print_help() return 1 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("用户中断执行") raise SystemExit(130) except Exception as exc: print(f"执行失败: {exc}") raise SystemExit(1)