| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715 |
- import argparse
- import os
- import re
- import sys
- from collections import Counter
- from datetime import datetime
- from pathlib import Path
- from typing import Dict, List, Optional, Tuple
- import pymysql
- from dotenv import load_dotenv
- from openpyxl import Workbook, load_workbook
- from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
- from openpyxl.utils import get_column_letter
- """运行命令
- # 1) 导出待人工补录模板
- python pdd1/process_shop/manual_non_flagship_backfill.py export
- # 2) 先做回填预演(不写库,推荐先跑)
- python pdd1/process_shop/manual_non_flagship_backfill.py backfill --input pdd1/process_shop/non_flagship_manual_template_20260421.xlsx --dry-run
- # 3) 正式回填(写库)文件日期得修改
- python pdd1/process_shop/manual_non_flagship_backfill.py backfill --input pdd1/process_shop/non_flagship_manual_template_20260421.xlsx
- """
- BASE_DIR = Path(__file__).resolve().parent
- ENV_PATH = BASE_DIR / ".env.process"
- TABLE_DEFAULT = "pdd_shop_info_middle"
- TARGET_FIELDS = [
- "contact_address",
- "qualification_number",
- "business_license_company",
- "business_license_address",
- ]
- EXPORT_COLUMNS = [
- "id",
- "shop",
- "store_url",
- "scrape_date",
- "contact_address",
- "qualification_number",
- "business_license_company",
- "business_license_address",
- "manual_note",
- ]
- REPORT_COLUMNS = [
- "excel_row",
- "id",
- "shop",
- "status",
- "reason",
- "updated_fields",
- "already_filled_fields",
- "contact_address",
- "qualification_number",
- "business_license_company",
- "business_license_address",
- "manual_note",
- ]
- STATUS_ORDER = [
- "updated",
- "skipped_no_input",
- "skipped_not_found",
- "skipped_non_flagship",
- "skipped_already_filled",
- "error",
- ]
- THIN_BORDER = Border(
- left=Side(style="thin", color="D9D9D9"),
- right=Side(style="thin", color="D9D9D9"),
- top=Side(style="thin", color="D9D9D9"),
- bottom=Side(style="thin", color="D9D9D9"),
- )
- HEADER_FILL = PatternFill("solid", fgColor="1F4E78")
- HEADER_FONT = Font(bold=True, color="FFFFFF")
- ROW_FILL_A = PatternFill("solid", fgColor="FFFFFF")
- ROW_FILL_B = PatternFill("solid", fgColor="F7FBFF")
- EDITABLE_FILL = PatternFill("solid", fgColor="FFF2CC")
- READONLY_FILL = PatternFill("solid", fgColor="F2F2F2")
- STATUS_FILL_MAP = {
- "updated": PatternFill("solid", fgColor="E2F0D9"),
- "skipped_no_input": PatternFill("solid", fgColor="FFF2CC"),
- "skipped_not_found": PatternFill("solid", fgColor="FCE4D6"),
- "skipped_non_flagship": PatternFill("solid", fgColor="EDEDED"),
- "skipped_already_filled": PatternFill("solid", fgColor="D9E1F2"),
- "error": PatternFill("solid", fgColor="F8CBAD"),
- }
- TEMPLATE_WIDTH_MAP = {
- "id": 10,
- "shop": 30,
- "store_url": 48,
- "scrape_date": 14,
- "contact_address": 28,
- "qualification_number": 28,
- "business_license_company": 30,
- "business_license_address": 30,
- "manual_note": 32,
- }
- REPORT_WIDTH_MAP = {
- "excel_row": 10,
- "id": 10,
- "shop": 28,
- "status": 24,
- "reason": 50,
- "updated_fields": 26,
- "already_filled_fields": 30,
- "contact_address": 24,
- "qualification_number": 24,
- "business_license_company": 28,
- "business_license_address": 28,
- "manual_note": 30,
- }
- def normalize_text(value) -> str:
- if value is None:
- return ""
- return str(value).strip()
- def format_date(value) -> str:
- if value is None:
- return ""
- if hasattr(value, "strftime"):
- return value.strftime("%Y-%m-%d")
- return normalize_text(value)
- def validate_table_name(table_name: str) -> str:
- if not table_name:
- return TABLE_DEFAULT
- if not re.match(r"^[A-Za-z0-9_]+$", table_name):
- raise ValueError(f"非法表名: {table_name}")
- return table_name
- def load_db_config() -> Tuple[Dict[str, object], str]:
- if not ENV_PATH.is_file():
- raise FileNotFoundError(f"找不到配置文件: {ENV_PATH}")
- load_dotenv(ENV_PATH)
- db_name = normalize_text(os.getenv("DB_DATABASE"))
- if not db_name:
- raise ValueError("DB_DATABASE 不能为空")
- config = {
- "host": normalize_text(os.getenv("DB_HOST")) or "localhost",
- "port": int(normalize_text(os.getenv("DB_PORT")) or "3306"),
- "user": normalize_text(os.getenv("DB_USERNAME")) or "root",
- "password": os.getenv("DB_PASSWORD", ""),
- "db_name": db_name,
- }
- table_name = validate_table_name(
- normalize_text(os.getenv("DB_TABLENAME")) or TABLE_DEFAULT
- )
- return config, table_name
- def connect_db(config: Dict[str, object]) -> pymysql.connections.Connection:
- return pymysql.connect(
- host=config["host"],
- port=int(config["port"]),
- user=config["user"],
- password=config["password"],
- database=config["db_name"],
- charset="utf8mb4",
- cursorclass=pymysql.cursors.DictCursor,
- connect_timeout=10,
- autocommit=False,
- )
- def build_missing_condition() -> str:
- checks = [f"COALESCE(TRIM(`{field}`), '') = ''" for field in TARGET_FIELDS]
- return " OR ".join(checks)
- def resolve_output_path(path_str: Optional[str], default_name: str) -> Path:
- if path_str:
- out = Path(path_str).expanduser()
- if not out.is_absolute():
- out = (Path.cwd() / out).resolve()
- else:
- out = (BASE_DIR / default_name).resolve()
- out.parent.mkdir(parents=True, exist_ok=True)
- return out
- def style_header(ws, headers: List[str]) -> None:
- alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
- for idx, _ in enumerate(headers, start=1):
- cell = ws.cell(row=1, column=idx)
- cell.fill = HEADER_FILL
- cell.font = HEADER_FONT
- cell.alignment = alignment
- cell.border = THIN_BORDER
- ws.freeze_panes = "A2"
- ws.row_dimensions[1].height = 24
- def auto_fit_columns(ws, rows: List[List[object]], min_width: int = 12, max_width: int = 60) -> None:
- if not rows:
- return
- col_count = len(rows[0])
- for col_idx in range(col_count):
- max_len = 0
- for row in rows:
- value = row[col_idx] if col_idx < len(row) else ""
- text = normalize_text(value)
- max_len = max(max_len, len(text))
- width = min(max(min_width, max_len + 2), max_width)
- col_letter = ws.cell(row=1, column=col_idx + 1).column_letter
- ws.column_dimensions[col_letter].width = width
- def apply_data_style(ws, headers: List[str], row_count: int, sheet_name: str) -> None:
- header_to_col = {name: idx + 1 for idx, name in enumerate(headers)}
- editable_cols = {
- header_to_col[col]
- for col in TARGET_FIELDS + ["manual_note"]
- if col in header_to_col
- }
- status_col = header_to_col.get("status")
- center_cols = {"id", "excel_row", "scrape_date", "status"}
- for row_idx in range(2, row_count + 2):
- base_fill = ROW_FILL_A if row_idx % 2 == 0 else ROW_FILL_B
- ws.row_dimensions[row_idx].height = 20
- for col_idx, col_name in enumerate(headers, start=1):
- cell = ws.cell(row=row_idx, column=col_idx)
- cell.border = THIN_BORDER
- if col_name in center_cols:
- cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
- else:
- cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)
- cell.fill = base_fill
- if sheet_name == "manual_fill":
- if col_idx in editable_cols:
- cell.fill = EDITABLE_FILL
- else:
- cell.fill = READONLY_FILL
- if sheet_name == "backfill_report" and status_col:
- status_cell = ws.cell(row=row_idx, column=status_col)
- status_text = normalize_text(status_cell.value)
- fill = STATUS_FILL_MAP.get(status_text)
- if fill:
- status_cell.fill = fill
- def apply_filter(ws, headers: List[str], row_count: int) -> None:
- last_col = get_column_letter(len(headers))
- end_row = max(2, row_count + 1)
- ws.auto_filter.ref = f"A1:{last_col}{end_row}"
- def apply_column_width_hints(ws, headers: List[str], sheet_name: str) -> None:
- width_map = TEMPLATE_WIDTH_MAP if sheet_name == "manual_fill" else REPORT_WIDTH_MAP
- for idx, col_name in enumerate(headers, start=1):
- hint = width_map.get(col_name)
- if hint:
- ws.column_dimensions[get_column_letter(idx)].width = hint
- def fetch_export_rows(
- conn: pymysql.connections.Connection, table_name: str, today_only: bool
- ) -> List[Dict]:
- where_parts = [
- "`shop` NOT LIKE %s",
- f"({build_missing_condition()})",
- ]
- params: List[object] = ["%旗舰店%"]
- if today_only:
- where_parts.append("`scrape_date` = CURDATE()")
- sql = f"""
- SELECT
- `id`,
- `shop`,
- `store_url`,
- `scrape_date`,
- `contact_address`,
- `qualification_number`,
- `business_license_company`,
- `business_license_address`
- FROM `{table_name}`
- WHERE {" AND ".join(where_parts)}
- ORDER BY `id` ASC
- """
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return cur.fetchall()
- def write_xlsx(path: Path, headers: List[str], data_rows: List[List[object]], sheet_name: str) -> None:
- wb = Workbook()
- ws = wb.active
- ws.title = sheet_name
- ws.append(headers)
- for row in data_rows:
- ws.append(row)
- style_header(ws, headers)
- auto_fit_columns(ws, [headers] + data_rows, min_width=10, max_width=58)
- apply_column_width_hints(ws, headers, sheet_name)
- apply_data_style(ws, headers, len(data_rows), sheet_name)
- apply_filter(ws, headers, len(data_rows))
- wb.save(path)
- def export_template(output_path: Optional[str]) -> int:
- config, table_name = load_db_config()
- conn = connect_db(config)
- try:
- rows = fetch_export_rows(conn, table_name, today_only=True)
- export_scope = "today"
- if not rows:
- rows = fetch_export_rows(conn, table_name, today_only=False)
- export_scope = "fallback_all_missing"
- finally:
- conn.close()
- default_name = f"non_flagship_manual_template_{datetime.now().strftime('%Y%m%d')}.xlsx"
- out_path = resolve_output_path(output_path, default_name)
- write_rows: List[List[object]] = []
- for item in rows:
- write_rows.append(
- [
- item.get("id"),
- normalize_text(item.get("shop")),
- normalize_text(item.get("store_url")),
- format_date(item.get("scrape_date")),
- normalize_text(item.get("contact_address")),
- normalize_text(item.get("qualification_number")),
- normalize_text(item.get("business_license_company")),
- normalize_text(item.get("business_license_address")),
- "",
- ]
- )
- write_xlsx(
- path=out_path,
- headers=EXPORT_COLUMNS,
- data_rows=write_rows,
- sheet_name="manual_fill",
- )
- print(f"导出完成: {out_path}")
- print(f"导出范围: {export_scope}")
- print(f"导出条数: {len(write_rows)}")
- return 0
- def read_excel_rows(input_path: Path) -> List[Dict]:
- wb = load_workbook(filename=input_path, data_only=True)
- ws = wb.active
- header_cells = next(ws.iter_rows(min_row=1, max_row=1, values_only=True), None)
- if not header_cells:
- raise ValueError("Excel 为空,缺少表头")
- headers = [normalize_text(v) for v in header_cells]
- header_to_index = {name: idx for idx, name in enumerate(headers) if name}
- required = ["id"] + TARGET_FIELDS
- missing = [col for col in required if col not in header_to_index]
- if missing:
- raise ValueError(f"Excel 缺少必需列: {', '.join(missing)}")
- rows: List[Dict] = []
- for excel_row, values in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2):
- normalized_values = list(values) if values else []
- if not normalized_values:
- continue
- if all(normalize_text(v) == "" for v in normalized_values):
- continue
- record = {"excel_row": excel_row}
- for name, idx in header_to_index.items():
- record[name] = normalized_values[idx] if idx < len(normalized_values) else None
- rows.append(record)
- return rows
- def fetch_db_row(
- cur: pymysql.cursors.DictCursor, table_name: str, row_id: int
- ) -> Optional[Dict]:
- sql = f"""
- SELECT
- `id`,
- `shop`,
- `contact_address`,
- `qualification_number`,
- `business_license_company`,
- `business_license_address`
- FROM `{table_name}`
- WHERE `id` = %s
- LIMIT 1
- """
- cur.execute(sql, (row_id,))
- return cur.fetchone()
- def update_db_row(
- conn: pymysql.connections.Connection,
- cur: pymysql.cursors.DictCursor,
- table_name: str,
- row_id: int,
- updates: Dict[str, str],
- ) -> None:
- set_sql = ", ".join([f"`{key}` = %s" for key in updates.keys()])
- sql = f"UPDATE `{table_name}` SET {set_sql}, `update_time` = NOW() WHERE `id` = %s"
- params = list(updates.values()) + [row_id]
- cur.execute(sql, params)
- conn.commit()
- def make_report_row(
- excel_row: int,
- row_id,
- shop: str,
- status: str,
- reason: str,
- updated_fields: List[str],
- already_filled_fields: List[str],
- input_fields: Dict[str, str],
- manual_note: str,
- ) -> Dict:
- return {
- "excel_row": excel_row,
- "id": row_id,
- "shop": shop,
- "status": status,
- "reason": reason,
- "updated_fields": ",".join(updated_fields),
- "already_filled_fields": ",".join(already_filled_fields),
- "contact_address": input_fields.get("contact_address", ""),
- "qualification_number": input_fields.get("qualification_number", ""),
- "business_license_company": input_fields.get("business_license_company", ""),
- "business_license_address": input_fields.get("business_license_address", ""),
- "manual_note": manual_note,
- }
- def backfill_from_excel(
- input_path: str,
- report_path: Optional[str],
- dry_run: bool,
- ) -> int:
- xlsx_path = Path(input_path).expanduser()
- if not xlsx_path.is_absolute():
- xlsx_path = (Path.cwd() / xlsx_path).resolve()
- if not xlsx_path.is_file():
- raise FileNotFoundError(f"输入文件不存在: {xlsx_path}")
- excel_rows = read_excel_rows(xlsx_path)
- if not excel_rows:
- print(f"输入文件无可处理数据: {xlsx_path}")
- default_report = (
- f"non_flagship_backfill_report_{datetime.now().strftime('%Y%m%d')}.xlsx"
- )
- out_report = resolve_output_path(report_path, default_report)
- write_xlsx(out_report, REPORT_COLUMNS, [], "backfill_report")
- print(f"已生成空报告: {out_report}")
- return 0
- config, table_name = load_db_config()
- conn = connect_db(config)
- report_rows: List[Dict] = []
- stats = Counter()
- try:
- with conn.cursor() as cur:
- for row in excel_rows:
- excel_row = row.get("excel_row")
- row_id_raw = row.get("id")
- manual_note = normalize_text(row.get("manual_note"))
- input_fields = {
- field: normalize_text(row.get(field))
- for field in TARGET_FIELDS
- }
- try:
- row_id = int(normalize_text(row_id_raw))
- except Exception:
- status = "error"
- reason = "id 非法,无法转换为整数"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id_raw,
- shop=normalize_text(row.get("shop")),
- status=status,
- reason=reason,
- updated_fields=[],
- already_filled_fields=[],
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- continue
- if all(v == "" for v in input_fields.values()):
- status = "skipped_no_input"
- reason = "四个目标字段均为空"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id,
- shop=normalize_text(row.get("shop")),
- status=status,
- reason=reason,
- updated_fields=[],
- already_filled_fields=[],
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- continue
- db_row = fetch_db_row(cur, table_name, row_id)
- if not db_row:
- status = "skipped_not_found"
- reason = "数据库不存在该 id"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id,
- shop=normalize_text(row.get("shop")),
- status=status,
- reason=reason,
- updated_fields=[],
- already_filled_fields=[],
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- continue
- db_shop = normalize_text(db_row.get("shop"))
- if "旗舰店" in db_shop:
- status = "skipped_non_flagship"
- reason = "数据库记录属于旗舰店"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id,
- shop=db_shop,
- status=status,
- reason=reason,
- updated_fields=[],
- already_filled_fields=[],
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- continue
- updates: Dict[str, str] = {}
- already_filled_fields: List[str] = []
- for field in TARGET_FIELDS:
- excel_val = input_fields[field]
- if excel_val == "":
- continue
- db_val = normalize_text(db_row.get(field))
- if db_val == "":
- updates[field] = excel_val
- else:
- already_filled_fields.append(field)
- if not updates:
- status = "skipped_already_filled"
- reason = "数据库对应字段已存在非空值,按仅补空策略跳过"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id,
- shop=db_shop,
- status=status,
- reason=reason,
- updated_fields=[],
- already_filled_fields=already_filled_fields,
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- continue
- try:
- if not dry_run:
- update_db_row(conn, cur, table_name, row_id, updates)
- status = "updated"
- reason = "dry-run 未写库,仅预览更新" if dry_run else "更新成功"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id,
- shop=db_shop,
- status=status,
- reason=reason,
- updated_fields=list(updates.keys()),
- already_filled_fields=already_filled_fields,
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- except Exception as row_exc:
- conn.rollback()
- status = "error"
- reason = f"更新异常: {row_exc}"
- stats[status] += 1
- report_rows.append(
- make_report_row(
- excel_row=excel_row,
- row_id=row_id,
- shop=db_shop,
- status=status,
- reason=reason,
- updated_fields=list(updates.keys()),
- already_filled_fields=already_filled_fields,
- input_fields=input_fields,
- manual_note=manual_note,
- )
- )
- finally:
- conn.close()
- default_report = f"non_flagship_backfill_report_{datetime.now().strftime('%Y%m%d')}.xlsx"
- out_report = resolve_output_path(report_path, default_report)
- report_data_rows = [
- [row.get(col, "") for col in REPORT_COLUMNS]
- for row in report_rows
- ]
- write_xlsx(
- path=out_report,
- headers=REPORT_COLUMNS,
- data_rows=report_data_rows,
- sheet_name="backfill_report",
- )
- print(f"回填报告: {out_report}")
- print(f"处理行数: {len(report_rows)}")
- for status in STATUS_ORDER:
- if stats.get(status, 0):
- print(f"{status}: {stats[status]}")
- return 0
- def build_parser() -> argparse.ArgumentParser:
- parser = argparse.ArgumentParser(
- description="非旗舰店人工补录导出与回填工具(Excel)"
- )
- sub = parser.add_subparsers(dest="command", required=True)
- export_parser = sub.add_parser("export", help="导出人工补录模板")
- export_parser.add_argument(
- "--output",
- help="导出文件路径(xlsx)",
- default=None,
- )
- backfill_parser = sub.add_parser("backfill", help="读取 Excel 回填数据库")
- backfill_parser.add_argument(
- "--input",
- required=True,
- help="人工填写后的 Excel 文件路径",
- )
- backfill_parser.add_argument(
- "--report",
- default=None,
- help="回填报告文件路径(xlsx)",
- )
- backfill_parser.add_argument(
- "--dry-run",
- action="store_true",
- help="只预览不写库",
- )
- return parser
- def main() -> int:
- parser = build_parser()
- args = parser.parse_args()
- if args.command == "export":
- return export_template(args.output)
- if args.command == "backfill":
- return backfill_from_excel(
- input_path=args.input,
- report_path=args.report,
- dry_run=bool(args.dry_run),
- )
- parser.print_help()
- return 1
- if __name__ == "__main__":
- try:
- raise SystemExit(main())
- except KeyboardInterrupt:
- print("用户中断执行")
- raise SystemExit(130)
- except Exception as exc:
- print(f"执行失败: {exc}")
- raise SystemExit(1)
|