import re from commons.conn_mysql import MySQLPoolOnline import time from pathlib import Path import pandas as pd def export_data(db_name, platname, conn_db=None, output_path=None, sheet_name="Sheet1"): """ 导出指定库/关键词/日期的数据到 xlsx。 - 表头使用 biaotou - 若查询结果为 None/空,则写入空行(仅保留表头) - 默认输出为带日期文件名(同一天同名),保证每次运行覆盖写入 - 按 link_url 去重 """ if conn_db is None: conn_db = MySQLPoolOnline() scrape_date = time.strftime("%Y-%m-%d") sql_account = f""" select `product_name`, `min_price`, `manufacture_date`,`number`, `shipment_province_name`,`shipment_city_name`,`company_name`,`expiry_date`,`store_name`, `scrape_date`, `approval_number`,`is_sold_out`, `link_url`,`product_specs` from `{db_name}` where platform_id = %s """ biaotou = [ "药品", "规格", "批准文号", "省份", "城市", "商铺", "厂家", "生产日期", "有效日期", "最低价格", "盒数", "抓取时间", "状态", "链接", ] account_list = conn_db.select_data(sql_account, (platname,)) # 修正:使用 platname 而不是 platform rows = [] seen_urls = set() # 用于记录已经处理过的链接 for account_dict in (account_list or []): link_url = account_dict.get("link_url", "") or "" # 如果链接已存在,则跳过这条数据 if link_url in seen_urls: continue # 将新链接加入集合 seen_urls.add(link_url) data_row = [ account_dict.get("product_name", ""), account_dict.get("product_specs", ""), account_dict.get("approval_number", "") or "", account_dict.get("shipment_province_name", "") or "", account_dict.get("shipment_city_name", "") or "", account_dict.get("store_name", "") or "", account_dict.get("company_name", "") or "", account_dict.get("manufacture_date", "") or "", account_dict.get("expiry_date", "") or "", account_dict.get("min_price", "") or "", account_dict.get("number", 1), account_dict.get("scrape_date", "") or "", account_dict.get("is_sold_out", "") or "", link_url, ] rows.append(data_row) # 若数据为 None/空,则填入空(仅保留表头,数据行为空字符串) if not rows: rows = [["" for _ in biaotou]] # 写入前按“规格”排序(空规格放最后) spec_col_idx = biaotou.index("规格") def spec_sort_key(row): spec = "" if isinstance(row, (list, tuple)) and len(row) > spec_col_idx: spec = row[spec_col_idx] or "" spec = str(spec).strip() if not spec: # 空规格永远放到最后 return (1, "") # 直接按字符串升序 return (0, spec) rows = sorted(rows, key=spec_sort_key) # 优化平台名称映射 plat_name_map = { 1: "淘宝", 2: "京东", 3: "拼多多", 4: "美团", 5: "药师帮", 6: "一药城", 7: "药九九", 11: "药房网" } plat_name = plat_name_map.get(platname, f"平台{platname}") if output_path is None: # 带日期文件名:同一天同名,保证每次运行覆盖写入 output_path = Path.cwd() / f"{plat_name}_{scrape_date}.xlsx" else: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) df = pd.DataFrame(rows, columns=biaotou) df.to_excel(output_path, index=False, sheet_name=sheet_name) # 打印去重统计信息 print(f"原始数据: {len(account_list) if account_list else 0} 条, 去重后: {len(rows)} 条") return output_path if __name__ == '__main__': platforms = [1, 2, 5, 6, 11, 3, 4, 7] conn_db = MySQLPoolOnline() db_name = "retrieve_scrape_data" for platform in platforms: out = export_data(db_name, platform, conn_db=conn_db) print(f"{db_name} 导出完成: {out}")