| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import re
- from commons.conn_mysql import MySQLPoolOnline
- import time
- from pathlib import Path
- import pandas as pd
- def export_data(db_name, platname, conn_db=None, output_path=None, sheet_name="Sheet1"):
- """
- 导出指定库/关键词/日期的数据到 xlsx。
- - 表头使用 biaotou
- - 若查询结果为 None/空,则写入空行(仅保留表头)
- - 默认输出为带日期文件名(同一天同名),保证每次运行覆盖写入
- - 按 link_url 去重
- """
- if conn_db is None:
- conn_db = MySQLPoolOnline()
- scrape_date = time.strftime("%Y-%m-%d")
- sql_account = f""" select `product_name`, `min_price`, `manufacture_date`,`number`, `shipment_province_name`,`shipment_city_name`,`company_name`,`expiry_date`,`store_name`, `scrape_date`, `approval_number`,`is_sold_out`, `link_url`,`product_specs`
- from `{db_name}`
- where platform_id = %s """
- biaotou = [
- "药品",
- "规格",
- "批准文号",
- "省份",
- "城市",
- "商铺",
- "厂家",
- "生产日期",
- "有效日期",
- "最低价格",
- "盒数",
- "抓取时间",
- "状态",
- "链接",
- ]
- account_list = conn_db.select_data(sql_account, (platname,)) # 修正:使用 platname 而不是 platform
- rows = []
- seen_urls = set() # 用于记录已经处理过的链接
- for account_dict in (account_list or []):
- link_url = account_dict.get("link_url", "") or ""
- # 如果链接已存在,则跳过这条数据
- if link_url in seen_urls:
- continue
- # 将新链接加入集合
- seen_urls.add(link_url)
- data_row = [
- account_dict.get("product_name", ""),
- account_dict.get("product_specs", ""),
- account_dict.get("approval_number", "") or "",
- account_dict.get("shipment_province_name", "") or "",
- account_dict.get("shipment_city_name", "") or "",
- account_dict.get("store_name", "") or "",
- account_dict.get("company_name", "") or "",
- account_dict.get("manufacture_date", "") or "",
- account_dict.get("expiry_date", "") or "",
- account_dict.get("min_price", "") or "",
- account_dict.get("number", 1),
- account_dict.get("scrape_date", "") or "",
- account_dict.get("is_sold_out", "") or "",
- link_url,
- ]
- rows.append(data_row)
- # 若数据为 None/空,则填入空(仅保留表头,数据行为空字符串)
- if not rows:
- rows = [["" for _ in biaotou]]
- # 写入前按“规格”排序(空规格放最后)
- spec_col_idx = biaotou.index("规格")
- def spec_sort_key(row):
- spec = ""
- if isinstance(row, (list, tuple)) and len(row) > spec_col_idx:
- spec = row[spec_col_idx] or ""
- spec = str(spec).strip()
- if not spec:
- # 空规格永远放到最后
- return (1, "")
- # 直接按字符串升序
- return (0, spec)
- rows = sorted(rows, key=spec_sort_key)
- # 优化平台名称映射
- plat_name_map = {
- 1: "淘宝",
- 2: "京东",
- 3: "拼多多",
- 4: "美团",
- 5: "药师帮",
- 6: "一药城",
- 7: "药九九",
- 11: "药房网"
- }
- plat_name = plat_name_map.get(platname, f"平台{platname}")
- if output_path is None:
- # 带日期文件名:同一天同名,保证每次运行覆盖写入
- output_path = Path.cwd() / f"{plat_name}_{scrape_date}.xlsx"
- else:
- output_path = Path(output_path)
- output_path.parent.mkdir(parents=True, exist_ok=True)
- df = pd.DataFrame(rows, columns=biaotou)
- df.to_excel(output_path, index=False, sheet_name=sheet_name)
- # 打印去重统计信息
- print(f"原始数据: {len(account_list) if account_list else 0} 条, 去重后: {len(rows)} 条")
- return output_path
- if __name__ == '__main__':
- platforms = [1, 2, 5, 6, 11, 3, 4, 7]
- conn_db = MySQLPoolOnline()
- db_name = "retrieve_scrape_data"
- for platform in platforms:
- out = export_data(db_name, platform, conn_db=conn_db)
- print(f"{db_name} 导出完成: {out}")
|