import_data.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import re
  2. from commons.conn_mysql import MySQLPoolOnline
  3. import time
  4. from pathlib import Path
  5. import pandas as pd
  6. def export_data(db_name, platname, conn_db=None, output_path=None, sheet_name="Sheet1"):
  7. """
  8. 导出指定库/关键词/日期的数据到 xlsx。
  9. - 表头使用 biaotou
  10. - 若查询结果为 None/空,则写入空行(仅保留表头)
  11. - 默认输出为带日期文件名(同一天同名),保证每次运行覆盖写入
  12. - 按 link_url 去重
  13. """
  14. if conn_db is None:
  15. conn_db = MySQLPoolOnline()
  16. scrape_date = time.strftime("%Y-%m-%d")
  17. sql_account = f""" select `product_name`, `min_price`, `manufacture_date`,`number`, `shipment_province_name`,`shipment_city_name`,`company_name`,`expiry_date`,`store_name`, `scrape_date`, `approval_number`,`is_sold_out`, `link_url`,`product_specs`
  18. from `{db_name}`
  19. where platform_id = %s """
  20. biaotou = [
  21. "药品",
  22. "规格",
  23. "批准文号",
  24. "省份",
  25. "城市",
  26. "商铺",
  27. "厂家",
  28. "生产日期",
  29. "有效日期",
  30. "最低价格",
  31. "盒数",
  32. "抓取时间",
  33. "状态",
  34. "链接",
  35. ]
  36. account_list = conn_db.select_data(sql_account, (platname,)) # 修正:使用 platname 而不是 platform
  37. rows = []
  38. seen_urls = set() # 用于记录已经处理过的链接
  39. for account_dict in (account_list or []):
  40. link_url = account_dict.get("link_url", "") or ""
  41. # 如果链接已存在,则跳过这条数据
  42. if link_url in seen_urls:
  43. continue
  44. # 将新链接加入集合
  45. seen_urls.add(link_url)
  46. data_row = [
  47. account_dict.get("product_name", ""),
  48. account_dict.get("product_specs", ""),
  49. account_dict.get("approval_number", "") or "",
  50. account_dict.get("shipment_province_name", "") or "",
  51. account_dict.get("shipment_city_name", "") or "",
  52. account_dict.get("store_name", "") or "",
  53. account_dict.get("company_name", "") or "",
  54. account_dict.get("manufacture_date", "") or "",
  55. account_dict.get("expiry_date", "") or "",
  56. account_dict.get("min_price", "") or "",
  57. account_dict.get("number", 1),
  58. account_dict.get("scrape_date", "") or "",
  59. account_dict.get("is_sold_out", "") or "",
  60. link_url,
  61. ]
  62. rows.append(data_row)
  63. # 若数据为 None/空,则填入空(仅保留表头,数据行为空字符串)
  64. if not rows:
  65. rows = [["" for _ in biaotou]]
  66. # 写入前按“规格”排序(空规格放最后)
  67. spec_col_idx = biaotou.index("规格")
  68. def spec_sort_key(row):
  69. spec = ""
  70. if isinstance(row, (list, tuple)) and len(row) > spec_col_idx:
  71. spec = row[spec_col_idx] or ""
  72. spec = str(spec).strip()
  73. if not spec:
  74. # 空规格永远放到最后
  75. return (1, "")
  76. # 直接按字符串升序
  77. return (0, spec)
  78. rows = sorted(rows, key=spec_sort_key)
  79. # 优化平台名称映射
  80. plat_name_map = {
  81. 1: "淘宝",
  82. 2: "京东",
  83. 3: "拼多多",
  84. 4: "美团",
  85. 5: "药师帮",
  86. 6: "一药城",
  87. 7: "药九九",
  88. 11: "药房网"
  89. }
  90. plat_name = plat_name_map.get(platname, f"平台{platname}")
  91. if output_path is None:
  92. # 带日期文件名:同一天同名,保证每次运行覆盖写入
  93. output_path = Path.cwd() / f"{plat_name}_{scrape_date}.xlsx"
  94. else:
  95. output_path = Path(output_path)
  96. output_path.parent.mkdir(parents=True, exist_ok=True)
  97. df = pd.DataFrame(rows, columns=biaotou)
  98. df.to_excel(output_path, index=False, sheet_name=sheet_name)
  99. # 打印去重统计信息
  100. print(f"原始数据: {len(account_list) if account_list else 0} 条, 去重后: {len(rows)} 条")
  101. return output_path
  102. if __name__ == '__main__':
  103. platforms = [1, 2, 5, 6, 11, 3, 4, 7]
  104. conn_db = MySQLPoolOnline()
  105. db_name = "retrieve_scrape_data"
  106. for platform in platforms:
  107. out = export_data(db_name, platform, conn_db=conn_db)
  108. print(f"{db_name} 导出完成: {out}")