import random import re import signal import socket import sys import time from decimal import Decimal, InvalidOperation from urllib.parse import quote from DrissionPage import ChromiumPage, ChromiumOptions import json from commons.Logger import get_spider_logger from commons.conn_mysql import MySQLPoolOnline from pipelines.drug_pipelines import DrugPipeline from commons.feishu_webhook import send_text logger = get_spider_logger("jd") chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" FETCH_TIMEOUT_FIRST = 5 FETCH_TIMEOUT_SCROLL = 6 LISTEN_CLEAR_ROUNDS = 3 LISTEN_CLEAR_TIMEOUT = 0.45 # 「下一页」是否在视口内(条件略宽) _JS_NEXT_BTN_IN_VIEWPORT = """ var el = arguments[0]; if (!el) return false; var r = el.getBoundingClientRect(); var h = window.innerHeight || document.documentElement.clientHeight || 800; var w = window.innerWidth || document.documentElement.clientWidth || 1200; return r.bottom > 80 && r.top < h - 40 && r.right > 0 && r.left < w; """ class JdCrawlerV2: def __init__(self, drug_dict=None): self.driver = None self.register_signal_handler() self.db = MySQLPoolOnline() self.ip = None self.account_name = None self.platform = 2 self.pipeline = DrugPipeline("jd") self.task_dict = drug_dict or {} if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./spiders/jd/{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") logger.info("启动浏览器: account=%s, debug_port=%s", self.account_name, debug_port) self.driver = ChromiumPage(co) self.driver.listen.start("api?appid=search-pc-java") def register_signal_handler(self): def handler(signum, frame): print("\n⚠️ 程序退出") if self.driver: self.driver.quit() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def sleep(self, a, b): time.sleep(random.uniform(a, b)) def _scroll_page_down(self, delta=900): self.driver.run_js(f"window.scrollBy(0, {int(delta)});") time.sleep(random.uniform(0.3, 0.6)) def _scroll_next_into_view(self, el): if not el: return try: self.driver.run_js( "arguments[0].scrollIntoView({block:'center',behavior:'instant'});", el, ) self.sleep(1, 2) except Exception as e: logger.warning("滚动到下一页按钮失败: %s", e) try: el.scroll.to_see() except Exception: pass def _get_scroll_info(self): return self.driver.run_js(""" return { scrollY: window.scrollY || window.pageYOffset || 0, docH: Math.max(document.body.scrollHeight, document.documentElement.scrollHeight, document.body.offsetHeight), viewH: window.innerHeight || document.documentElement.clientHeight || 800 }; """) def _find_next_btn(self, timeout=0.3): try: return self.driver.ele("text=下一页", timeout=timeout) except Exception: return None def _is_next_btn_visible(self, btn): if not btn: return False try: return bool(self.driver.run_js(_JS_NEXT_BTN_IN_VIEWPORT, btn)) except Exception: return False def _human_click(self, element): """在目标节点上触发 click,避免 move_to + 无目标 actions.click() 因布局位移点到商品链接触发详情页。""" if not element: return False try: self.sleep(0.8, 2.0) try: self.driver.run_js( "arguments[0].scrollIntoView({block:'center',behavior:'instant'});", element, ) except Exception: pass self.sleep(0.2, 0.6) self.driver.run_js("arguments[0].click();", element) return True except Exception as e: logger.warning("点击失败: %s", e) try: element.click() return True except Exception: return False @staticmethod def _estimated_price(json_data): fp = json_data.get("finalPrice") if isinstance(fp, dict): return fp.get("estimatedPrice", "") or "" return "" def parse(self, data): ware_list = data.get("data", {}).get("wareList", []) if not ware_list: return try: for w in ware_list: title = w.get("wareName", "") title = re.sub(r"<[^>]*>", "", title).strip() logger.info(title) if self.product not in title: self.is_no_prodcut += 1 continue if self.brand not in title: self.is_no_prodcut += 1 continue # if self.product_desc not in title: # continue if "+[" in title: self.is_no_prodcut += 1 continue self.is_no_prodcut = 0 status = 1 if self.product_keyword: search_keyword_list = self.product_keyword.split(",") for search_keyword in search_keyword_list: if search_keyword.strip() not in title: status = 0 if status == 0: continue logger.info(f"商品名:{title}") sku_id = w.get("skuId", "") sales = w.get("totalSales", "") shop_id = w.get("shopId", "") shop_name = w.get("shopName", "") heshu_m = re.search(r"(\d+)盒", title) if heshu_m: heshu_count = int(heshu_m.group(1)) else: heshu_count = 1 final_price = self._estimated_price(w) jd_price = w.get("jdPrice", "") low_price = final_price if final_price else jd_price try: price = Decimal(str(low_price)).quantize(Decimal("0.00")) except (InvalidOperation, ValueError): price = Decimal("0.00") item_url = f"https://item.jd.com/{sku_id}.html" mall_url = f"https://mall.jd.com/index-{shop_id}.html?from=pc" # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致 now_ts = time.strftime("%Y-%m-%d %H:%M:%S") product = { "platform": self.platform, "item_id": sku_id, "enterprise_id": self.company_id, "product_name": title, "spec": self.product_desc, "one_price": "", "detail_url": item_url, "shop_name": shop_name, "anonymous_store_name": "", "shop_url": mall_url, "city_name": "", "city_id": "", "province_name": "", "province_id": "", "shipment_city_name": "", "shipment_city_id": "", "shipment_province_name": "", "shipment_province_id": "", "area_info": "", "factory_name": "", "scrape_date": time.strftime("%Y-%m-%d"), "price": price, "sales": sales, "stock_count": "", "snapshot_url": "", "approval_num": "", "produced_time": "", "deadline": "", "update_time": now_ts, "insert_time": now_ts, "number": heshu_count, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": "", "collect_config_info": json.dumps( { "sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time, } ), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 1 } try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) except Exception as e: logger.error("写入数据库失败: %s", e) @staticmethod def _response_has_ware_list(data): if not isinstance(data, dict): return False wl = data.get("data", {}).get("wareList") return bool(wl) def fetch_items_once(self, timeout=FETCH_TIMEOUT_FIRST): n = 0 for resp in self.driver.listen.steps(timeout=timeout): try: data = resp.response.body if not self._response_has_ware_list(data): continue ware_list = data["data"]["wareList"] self.parse(data) n += len(ware_list) except Exception as e: logger.warning("解析监听响应失败: %s", e) return n def clear_listen_buffer(self, rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT): try: for _ in range(rounds): resps = list(self.driver.listen.steps(timeout=timeout)) if not resps: break logger.debug("监听缓冲已清空") except Exception as e: logger.debug("清空监听缓冲失败: %s", e) def collect_full_page_items(self, max_steps=20): """单次循环:边滑动边收数据,到底 / 看见「下一页」即停。""" n = self.fetch_items_once(timeout=FETCH_TIMEOUT_FIRST) stagnant = 0 last_scroll_y = None for step in range(max_steps): next_btn = self._find_next_btn(timeout=0.3) if self._is_next_btn_visible(next_btn): n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL) return n, next_btn info = self._get_scroll_info() scroll_y = info["scrollY"] doc_h = info["docH"] view_h = info["viewH"] at_bottom = (scroll_y + view_h >= doc_h - 20) if last_scroll_y is not None and abs(scroll_y - last_scroll_y) < 8: stagnant += 1 else: stagnant = 0 last_scroll_y = scroll_y if at_bottom and stagnant >= 2: n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL) next_btn = self._find_next_btn(timeout=2) if next_btn: self._scroll_next_into_view(next_btn) return n, next_btn logger.info("已到页面底部且未发现下一页,停止滑动") return n, None self._scroll_page_down(random.randint(400, 800)) if random.random() < 0.15: self.driver.run_js(f"window.scrollBy(0, -{random.randint(60, 140)})") self.sleep(3, 5) if step % 3 == 2: n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL) n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL) next_btn = self._find_next_btn(timeout=3) if next_btn and not self._is_next_btn_visible(next_btn): self._scroll_next_into_view(next_btn) return n, next_btn def get_account(self): sql_account = f""" select `id`, `name`, `ip`, `cookie_timestamp`, `cookie_str` from `accounts_platform` where `platform` = 2 and `status` = 1 and `equipment_id` = 1 order by `cookie_timestamp` asc limit 1 """ account_list = self.db.select_data(sql_account) if not account_list: return False account_dict = account_list[0] self.ip = account_dict["ip"] self.account_name = account_dict["name"] logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip) return True def disable_account(self): update_sql = f""" UPDATE `accounts_platform` SET `status`= %s WHERE `name` = %s; """ self.db.execute(update_sql, (0, self.account_name)) def crawl(self): total = 0 keyword = self.product if self.brand: keyword = self.brand + "" + self.product if self.product_desc: keyword = keyword + " " + self.product_desc self.driver.get("https://www.jd.com/", timeout=15) time.sleep(15) # 判端是否登录 link_login = self.driver.ele("xpath=//*[@class='link-login']") if link_login: self.disable_account() send_text(f"京东:{self.account_name}账号非登录状态") self.is_success = False logger.error(f"{self.account_name}账号非登录状态") kw = quote(str(keyword or ""), safe="") self.driver.get( f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}", timeout=15 ) self.sleep(5, 8) for page in range(1, 11): if "抱歉由于访问频繁导致无法搜索" in self.driver.html: logger.error("账号无法搜索") self.success = False break if "cfe.m.jd.com/privatedomain" in self.driver.url: self.disable_account() logger.error("账号出现验证码,暂时禁用") self.success = False break logger.info(f"===== 第 {page} 页 =====") time.sleep(random.uniform(3, 5)) page_n, next_btn = self.collect_full_page_items() self.sleep(3, 5) logger.info(f"本页监听商品条数(含可能重复): {page_n}") total += page_n logger.info(f"累计监听条数: {total}") if not next_btn: next_btn = self.driver.ele("text=下一页") if not next_btn: logger.info("没有下一页(未找到)") break cls_str = next_btn.attr("class") or "" if "disabled" in cls_str: logger.info("没有下一页(已禁用)") break self.clear_listen_buffer( rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT ) if self.is_no_prodcut > 20: break self._human_click(next_btn) def run(self): # 检测账号 if not self.get_account(): logger.info("==================当前无账号可用==================") self.success = False return self.pipeline.crawl_count, self.success logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip) # 每次选取账号,立马账号使用时间 update_sql = f""" UPDATE `accounts_platform` SET `status`= %s, `cookie_timestamp`= %s WHERE `name` = %s; """ self.db.execute(update_sql, (1, int(time.time()), self.account_name)) try: self.init_browser() self.crawl() except Exception as e: self.success = False logger.exception("爬取异常: %s", e) self.sleep(3, 5) finally: if self.driver: self.driver.quit() self.driver = None return self.pipeline.crawl_count, self.success