jd_auto_crawl.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. import random
  2. import re
  3. import signal
  4. import socket
  5. import sys
  6. import time
  7. from decimal import Decimal, InvalidOperation
  8. from urllib.parse import quote
  9. from DrissionPage import ChromiumPage, ChromiumOptions
  10. import json
  11. from commons.Logger import get_spider_logger
  12. from commons.conn_mysql import MySQLPoolOnline
  13. from pipelines.drug_pipelines import DrugPipeline
  14. from commons.feishu_webhook import send_text
  15. logger = get_spider_logger("jd")
  16. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  17. FETCH_TIMEOUT_FIRST = 5
  18. FETCH_TIMEOUT_SCROLL = 6
  19. LISTEN_CLEAR_ROUNDS = 3
  20. LISTEN_CLEAR_TIMEOUT = 0.45
  21. # 「下一页」是否在视口内(条件略宽)
  22. _JS_NEXT_BTN_IN_VIEWPORT = """
  23. var el = arguments[0];
  24. if (!el) return false;
  25. var r = el.getBoundingClientRect();
  26. var h = window.innerHeight || document.documentElement.clientHeight || 800;
  27. var w = window.innerWidth || document.documentElement.clientWidth || 1200;
  28. return r.bottom > 80 && r.top < h - 40 && r.right > 0 && r.left < w;
  29. """
  30. class JdCrawlerV2:
  31. def __init__(self, drug_dict=None):
  32. self.driver = None
  33. self.register_signal_handler()
  34. self.db = MySQLPoolOnline()
  35. self.ip = None
  36. self.account_name = None
  37. self.platform = 2
  38. self.pipeline = DrugPipeline("jd")
  39. self.task_dict = drug_dict or {}
  40. if self.task_dict:
  41. self.get_product_data()
  42. self.success = True
  43. self.is_no_prodcut = 0
  44. def get_product_data(self):
  45. self.task_id = self.task_dict["id"]
  46. self.company_id = self.task_dict["company_id"]
  47. self.product = self.task_dict["product_name"]
  48. self.product_desc = self.task_dict.get("product_specs", "")
  49. self.brand = self.task_dict.get("product_brand", "")
  50. self.product_keyword = self.task_dict.get("product_keyword", "")
  51. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  52. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  53. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  54. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  55. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  56. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  57. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  58. self.collect_round = self.task_dict.get("collect_round", 1)
  59. @staticmethod
  60. def _get_free_port():
  61. """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
  62. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  63. s.bind(("127.0.0.1", 0))
  64. return s.getsockname()[1]
  65. def init_browser(self):
  66. co = ChromiumOptions().set_browser_path(chrome_path)
  67. debug_port = self._get_free_port()
  68. co.set_user_data_path(f"./spiders/jd/{self.account_name}")
  69. co.set_local_port(debug_port)
  70. co.set_argument(f"--remote-debugging-port={debug_port}")
  71. co.set_argument("--remote-debugging-address=127.0.0.1")
  72. # co.set_argument("--disable-blink-features=AutomationControlled")
  73. co.set_argument("--disable-dev-shm-usage")
  74. co.set_argument("--no-first-run") # 避免首次运行弹窗
  75. co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
  76. if self.ip:
  77. proxy = self.ip.strip()
  78. if not proxy.startswith(("http://", "https://")):
  79. proxy = f"http://{proxy}"
  80. co.set_argument(f"--proxy-server={proxy}")
  81. logger.info("启动浏览器: account=%s, debug_port=%s", self.account_name, debug_port)
  82. self.driver = ChromiumPage(co)
  83. self.driver.listen.start("api?appid=search-pc-java")
  84. def register_signal_handler(self):
  85. def handler(signum, frame):
  86. print("\n⚠️ 程序退出")
  87. if self.driver:
  88. self.driver.quit()
  89. sys.exit(0)
  90. signal.signal(signal.SIGINT, handler)
  91. if hasattr(signal, "SIGTERM"):
  92. signal.signal(signal.SIGTERM, handler)
  93. def sleep(self, a, b):
  94. time.sleep(random.uniform(a, b))
  95. def _scroll_page_down(self, delta=900):
  96. self.driver.run_js(f"window.scrollBy(0, {int(delta)});")
  97. time.sleep(random.uniform(0.3, 0.6))
  98. def _scroll_next_into_view(self, el):
  99. if not el:
  100. return
  101. try:
  102. self.driver.run_js(
  103. "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
  104. el,
  105. )
  106. self.sleep(1, 2)
  107. except Exception as e:
  108. logger.warning("滚动到下一页按钮失败: %s", e)
  109. try:
  110. el.scroll.to_see()
  111. except Exception:
  112. pass
  113. def _get_scroll_info(self):
  114. return self.driver.run_js("""
  115. return {
  116. scrollY: window.scrollY || window.pageYOffset || 0,
  117. docH: Math.max(document.body.scrollHeight,
  118. document.documentElement.scrollHeight,
  119. document.body.offsetHeight),
  120. viewH: window.innerHeight || document.documentElement.clientHeight || 800
  121. };
  122. """)
  123. def _find_next_btn(self, timeout=0.3):
  124. try:
  125. return self.driver.ele("text=下一页", timeout=timeout)
  126. except Exception:
  127. return None
  128. def _is_next_btn_visible(self, btn):
  129. if not btn:
  130. return False
  131. try:
  132. return bool(self.driver.run_js(_JS_NEXT_BTN_IN_VIEWPORT, btn))
  133. except Exception:
  134. return False
  135. def _human_click(self, element):
  136. """在目标节点上触发 click,避免 move_to + 无目标 actions.click() 因布局位移点到商品链接触发详情页。"""
  137. if not element:
  138. return False
  139. try:
  140. self.sleep(0.8, 2.0)
  141. try:
  142. self.driver.run_js(
  143. "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
  144. element,
  145. )
  146. except Exception:
  147. pass
  148. self.sleep(0.2, 0.6)
  149. self.driver.run_js("arguments[0].click();", element)
  150. return True
  151. except Exception as e:
  152. logger.warning("点击失败: %s", e)
  153. try:
  154. element.click()
  155. return True
  156. except Exception:
  157. return False
  158. @staticmethod
  159. def _estimated_price(json_data):
  160. fp = json_data.get("finalPrice")
  161. if isinstance(fp, dict):
  162. return fp.get("estimatedPrice", "") or ""
  163. return ""
  164. def parse(self, data):
  165. ware_list = data.get("data", {}).get("wareList", [])
  166. if not ware_list:
  167. return
  168. try:
  169. for w in ware_list:
  170. title = w.get("wareName", "")
  171. title = re.sub(r"<[^>]*>", "", title).strip()
  172. logger.info(title)
  173. if self.product not in title:
  174. self.is_no_prodcut += 1
  175. continue
  176. if self.brand not in title:
  177. self.is_no_prodcut += 1
  178. continue
  179. # if self.product_desc not in title:
  180. # continue
  181. if "+[" in title:
  182. self.is_no_prodcut += 1
  183. continue
  184. self.is_no_prodcut = 0
  185. status = 1
  186. if self.product_keyword:
  187. search_keyword_list = self.product_keyword.split(",")
  188. for search_keyword in search_keyword_list:
  189. if search_keyword.strip() not in title:
  190. status = 0
  191. if status == 0:
  192. continue
  193. logger.info(f"商品名:{title}")
  194. sku_id = w.get("skuId", "")
  195. sales = w.get("totalSales", "")
  196. shop_id = w.get("shopId", "")
  197. shop_name = w.get("shopName", "")
  198. heshu_m = re.search(r"(\d+)盒", title)
  199. if heshu_m:
  200. heshu_count = int(heshu_m.group(1))
  201. else:
  202. heshu_count = 1
  203. final_price = self._estimated_price(w)
  204. jd_price = w.get("jdPrice", "")
  205. low_price = final_price if final_price else jd_price
  206. try:
  207. price = Decimal(str(low_price)).quantize(Decimal("0.00"))
  208. except (InvalidOperation, ValueError):
  209. price = Decimal("0.00")
  210. item_url = f"https://item.jd.com/{sku_id}.html"
  211. mall_url = f"https://mall.jd.com/index-{shop_id}.html?from=pc"
  212. # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
  213. now_ts = time.strftime("%Y-%m-%d %H:%M:%S")
  214. product = {
  215. "platform": self.platform,
  216. "item_id": sku_id,
  217. "enterprise_id": self.company_id,
  218. "product_name": title,
  219. "spec": self.product_desc,
  220. "one_price": "",
  221. "detail_url": item_url,
  222. "shop_name": shop_name,
  223. "anonymous_store_name": "",
  224. "shop_url": mall_url,
  225. "city_name": "",
  226. "city_id": "",
  227. "province_name": "",
  228. "province_id": "",
  229. "shipment_city_name": "",
  230. "shipment_city_id": "",
  231. "shipment_province_name": "",
  232. "shipment_province_id": "",
  233. "area_info": "",
  234. "factory_name": "",
  235. "scrape_date": time.strftime("%Y-%m-%d"),
  236. "price": price,
  237. "sales": sales,
  238. "stock_count": "",
  239. "snapshot_url": "",
  240. "approval_num": "",
  241. "produced_time": "",
  242. "deadline": "",
  243. "update_time": now_ts,
  244. "insert_time": now_ts,
  245. "number": heshu_count,
  246. "product_brand": self.brand or "",
  247. "collect_task_id": self.collect_task_id,
  248. "search_name": self.product,
  249. "company_name": "",
  250. "collect_config_info": json.dumps(
  251. {
  252. "sampling_cycle": self.sampling_cycle,
  253. "sampling_start_time": self.sampling_start_time,
  254. "sampling_end_time": self.sampling_end_time,
  255. }
  256. ),
  257. "account_id": self.account_id,
  258. "collect_region_id": self.collect_region_id,
  259. "collect_round": self.collect_round,
  260. "is_sold_out": 1
  261. }
  262. try:
  263. self.pipeline.storge_data(product)
  264. logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
  265. except Exception as e:
  266. logger.exception("写入数据库失败: %s", e)
  267. except Exception as e:
  268. logger.error("写入数据库失败: %s", e)
  269. @staticmethod
  270. def _response_has_ware_list(data):
  271. if not isinstance(data, dict):
  272. return False
  273. wl = data.get("data", {}).get("wareList")
  274. return bool(wl)
  275. def fetch_items_once(self, timeout=FETCH_TIMEOUT_FIRST):
  276. n = 0
  277. for resp in self.driver.listen.steps(timeout=timeout):
  278. try:
  279. data = resp.response.body
  280. if not self._response_has_ware_list(data):
  281. continue
  282. ware_list = data["data"]["wareList"]
  283. self.parse(data)
  284. n += len(ware_list)
  285. except Exception as e:
  286. logger.warning("解析监听响应失败: %s", e)
  287. return n
  288. def clear_listen_buffer(self, rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT):
  289. try:
  290. for _ in range(rounds):
  291. resps = list(self.driver.listen.steps(timeout=timeout))
  292. if not resps:
  293. break
  294. logger.debug("监听缓冲已清空")
  295. except Exception as e:
  296. logger.debug("清空监听缓冲失败: %s", e)
  297. def collect_full_page_items(self, max_steps=20):
  298. """单次循环:边滑动边收数据,到底 / 看见「下一页」即停。"""
  299. n = self.fetch_items_once(timeout=FETCH_TIMEOUT_FIRST)
  300. stagnant = 0
  301. last_scroll_y = None
  302. for step in range(max_steps):
  303. next_btn = self._find_next_btn(timeout=0.3)
  304. if self._is_next_btn_visible(next_btn):
  305. n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
  306. return n, next_btn
  307. info = self._get_scroll_info()
  308. scroll_y = info["scrollY"]
  309. doc_h = info["docH"]
  310. view_h = info["viewH"]
  311. at_bottom = (scroll_y + view_h >= doc_h - 20)
  312. if last_scroll_y is not None and abs(scroll_y - last_scroll_y) < 8:
  313. stagnant += 1
  314. else:
  315. stagnant = 0
  316. last_scroll_y = scroll_y
  317. if at_bottom and stagnant >= 2:
  318. n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
  319. next_btn = self._find_next_btn(timeout=2)
  320. if next_btn:
  321. self._scroll_next_into_view(next_btn)
  322. return n, next_btn
  323. logger.info("已到页面底部且未发现下一页,停止滑动")
  324. return n, None
  325. self._scroll_page_down(random.randint(400, 800))
  326. if random.random() < 0.15:
  327. self.driver.run_js(f"window.scrollBy(0, -{random.randint(60, 140)})")
  328. self.sleep(3, 5)
  329. if step % 3 == 2:
  330. n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
  331. n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
  332. next_btn = self._find_next_btn(timeout=3)
  333. if next_btn and not self._is_next_btn_visible(next_btn):
  334. self._scroll_next_into_view(next_btn)
  335. return n, next_btn
  336. def get_account(self):
  337. sql_account = f""" select `id`, `name`, `ip`, `cookie_timestamp`, `cookie_str` from `accounts_platform` where `platform` = 2 and `status` = 1 and `equipment_id` = 1 order by `cookie_timestamp` asc limit 1 """
  338. account_list = self.db.select_data(sql_account)
  339. if not account_list:
  340. return False
  341. account_dict = account_list[0]
  342. self.ip = account_dict["ip"]
  343. self.account_name = account_dict["name"]
  344. logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
  345. return True
  346. def disable_account(self):
  347. update_sql = f""" UPDATE `accounts_platform` SET `status`= %s WHERE `name` = %s; """
  348. self.db.execute(update_sql, (0, self.account_name))
  349. def crawl(self):
  350. total = 0
  351. keyword = self.product
  352. if self.brand:
  353. keyword = self.brand + "" + self.product
  354. if self.product_desc:
  355. keyword = keyword + " " + self.product_desc
  356. self.driver.get("https://www.jd.com/", timeout=15)
  357. time.sleep(15)
  358. # 判端是否登录
  359. link_login = self.driver.ele("xpath=//*[@class='link-login']")
  360. if link_login:
  361. self.disable_account()
  362. send_text(f"京东:{self.account_name}账号非登录状态")
  363. self.is_success = False
  364. logger.error(f"{self.account_name}账号非登录状态")
  365. kw = quote(str(keyword or ""), safe="")
  366. self.driver.get(
  367. f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}", timeout=15
  368. )
  369. self.sleep(5, 8)
  370. for page in range(1, 11):
  371. if "抱歉由于访问频繁导致无法搜索" in self.driver.html:
  372. logger.error("账号无法搜索")
  373. self.success = False
  374. break
  375. if "cfe.m.jd.com/privatedomain" in self.driver.url:
  376. self.disable_account()
  377. logger.error("账号出现验证码,暂时禁用")
  378. self.success = False
  379. break
  380. logger.info(f"===== 第 {page} 页 =====")
  381. time.sleep(random.uniform(3, 5))
  382. page_n, next_btn = self.collect_full_page_items()
  383. self.sleep(3, 5)
  384. logger.info(f"本页监听商品条数(含可能重复): {page_n}")
  385. total += page_n
  386. logger.info(f"累计监听条数: {total}")
  387. if not next_btn:
  388. next_btn = self.driver.ele("text=下一页")
  389. if not next_btn:
  390. logger.info("没有下一页(未找到)")
  391. break
  392. cls_str = next_btn.attr("class") or ""
  393. if "disabled" in cls_str:
  394. logger.info("没有下一页(已禁用)")
  395. break
  396. self.clear_listen_buffer(
  397. rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT
  398. )
  399. if self.is_no_prodcut > 20:
  400. break
  401. self._human_click(next_btn)
  402. def run(self):
  403. # 检测账号
  404. if not self.get_account():
  405. logger.info("==================当前无账号可用==================")
  406. self.success = False
  407. return self.pipeline.crawl_count, self.success
  408. logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
  409. # 每次选取账号,立马账号使用时间
  410. update_sql = f""" UPDATE `accounts_platform` SET `status`= %s, `cookie_timestamp`= %s WHERE `name` = %s; """
  411. self.db.execute(update_sql, (1, int(time.time()), self.account_name))
  412. try:
  413. self.init_browser()
  414. self.crawl()
  415. except Exception as e:
  416. self.success = False
  417. logger.exception("爬取异常: %s", e)
  418. self.sleep(3, 5)
  419. finally:
  420. if self.driver:
  421. self.driver.quit()
  422. self.driver = None
  423. return self.pipeline.crawl_count, self.success