shop_pipelines.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from commons.conn_mysql import MySQLPool39,MySQLPoolOnline
  2. from commons.sql_data import sql_map
  3. from commons.Logger import get_spider_logger
  4. class ShopPipeline:
  5. def __init__(self, spider_name):
  6. self.db_online = MySQLPoolOnline()
  7. self.sql_map = sql_map
  8. self.crawl_count = 0
  9. self.spider_name = spider_name
  10. self.logger = get_spider_logger(spider_name)
  11. @staticmethod
  12. def _db_int(val):
  13. if val is None or val == "":
  14. return 0
  15. try:
  16. return int(val)
  17. except (TypeError, ValueError):
  18. return 0
  19. @staticmethod
  20. def _db_decimal(val):
  21. if val is None or val is False:
  22. return 0.0
  23. if isinstance(val, (int, float)) and not isinstance(val, bool):
  24. return float(val)
  25. s = str(val).strip()
  26. if not s:
  27. return 0.0
  28. try:
  29. return float(s)
  30. except ValueError:
  31. return 0.0
  32. def storge_data(self, product):
  33. # 数据入库:表字段与 product key 一致,按 key 动态构造 SQL
  34. if not isinstance(product, dict) or not product:
  35. self.logger.warning("pipeline入库失败: product 为空或格式错误")
  36. return 0
  37. table_name = "retrieve_scrape_shop_info"
  38. columns = list(product.keys())
  39. values = tuple(product[col] for col in columns)
  40. column_sql = ", ".join(f"`{col}`" for col in columns)
  41. placeholder_sql = ", ".join(["%s"] * len(columns))
  42. # idx_platform_shop 为 (platform, shop) 唯一键;重复则跳过,不更新
  43. sql = (
  44. f"INSERT IGNORE INTO `{table_name}` ({column_sql}) VALUES ({placeholder_sql})"
  45. )
  46. affected_rows = self.db_online.execute(sql, values)
  47. if affected_rows > 0:
  48. self.crawl_count += 1
  49. self.logger.info(
  50. "shop pipeline入库成功 spider=%s total=%s shop=%s",
  51. self.spider_name,
  52. self.crawl_count,
  53. product.get("shop"),
  54. )
  55. else:
  56. self.logger.info(
  57. "shop pipeline跳过入库(店铺已存在) spider=%s shop=%s",
  58. self.spider_name,
  59. product.get("shop"),
  60. )
  61. return affected_rows
  62. if __name__ == '__main__':
  63. pass