| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- import random
- import re
- import signal
- import socket
- import sys
- import time
- from decimal import Decimal, InvalidOperation
- from urllib.parse import quote
- from DrissionPage import ChromiumPage, ChromiumOptions
- import json
- from commons.Logger import get_spider_logger
- from commons.conn_mysql import MySQLPoolOnline
- from pipelines.drug_pipelines import DrugPipeline
- from commons.feishu_webhook import send_text
- from spiders.jd.jd_captcha import handle_jd_slider_captcha
- logger = get_spider_logger("jd")
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- FETCH_TIMEOUT_FIRST = 5
- FETCH_TIMEOUT_SCROLL = 6
- LISTEN_CLEAR_ROUNDS = 3
- LISTEN_CLEAR_TIMEOUT = 0.45
- # 「下一页」是否在视口内(条件略宽)
- _JS_NEXT_BTN_IN_VIEWPORT = """
- var el = arguments[0];
- if (!el) return false;
- var r = el.getBoundingClientRect();
- var h = window.innerHeight || document.documentElement.clientHeight || 800;
- var w = window.innerWidth || document.documentElement.clientWidth || 1200;
- return r.bottom > 80 && r.top < h - 40 && r.right > 0 && r.left < w;
- """
- class JdCrawlerV2:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.register_signal_handler()
- self.db = MySQLPoolOnline()
- self.ip = None
- self.account_name = None
- self.login_username = None
- self.login_password = None
- self.platform = 2
- self.pipeline = DrugPipeline("jd")
- self.task_dict = drug_dict or {}
- self.start_page = 1
- self.end_page = 1
- if self.task_dict:
- self.get_product_data()
- self.success = True
- self.is_no_prodcut = 0
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
- self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
- self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
- self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
- self.account_id = self.task_dict.get("collect_equipment_account_id", "")
- self.collect_region_id = self.task_dict.get("collect_region_id", "")
- self.collect_round = self.task_dict.get("collect_round", 1)
- self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
- self.end_page = self._parse_page(self.task_dict.get("end_page"), 15)
- @staticmethod
- def _parse_page(value, default=1):
- try:
- page = int(value)
- return page if page >= 1 else default
- except (TypeError, ValueError):
- return default
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./spiders/jd/{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- logger.info("启动浏览器: account=%s, debug_port=%s", self.account_name, debug_port)
- self.driver = ChromiumPage(co)
- self._listen_started = False
- def _start_listen(self):
- """登录完成后再开监听,避免干扰登录页/验证码拖动。"""
- if self._listen_started or not self.driver:
- return
- self.driver.listen.start("api?appid=search-pc-java")
- self._listen_started = True
- logger.info("已启动搜索接口监听")
- def register_signal_handler(self):
- def handler(signum, frame):
- print("\n⚠️ 程序退出")
- if self.driver:
- self.driver.quit()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def sleep(self, a, b):
- time.sleep(random.uniform(a, b))
- def _scroll_page_down(self, delta=900):
- self.driver.run_js(f"window.scrollBy(0, {int(delta)});")
- time.sleep(random.uniform(0.3, 0.6))
- def _scroll_next_into_view(self, el):
- if not el:
- return
- try:
- self.driver.run_js(
- "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
- el,
- )
- self.sleep(1, 2)
- except Exception as e:
- logger.warning("滚动到下一页按钮失败: %s", e)
- try:
- el.scroll.to_see()
- except Exception:
- pass
- def _get_scroll_info(self):
- return self.driver.run_js("""
- return {
- scrollY: window.scrollY || window.pageYOffset || 0,
- docH: Math.max(document.body.scrollHeight,
- document.documentElement.scrollHeight,
- document.body.offsetHeight),
- viewH: window.innerHeight || document.documentElement.clientHeight || 800
- };
- """)
- def _find_next_btn(self, timeout=0.3):
- try:
- return self.driver.ele("text=下一页", timeout=timeout)
- except Exception:
- return None
- def _is_next_btn_visible(self, btn):
- if not btn:
- return False
- try:
- return bool(self.driver.run_js(_JS_NEXT_BTN_IN_VIEWPORT, btn))
- except Exception:
- return False
- def _human_click(self, element):
- """在目标节点上触发 click,避免 move_to + 无目标 actions.click() 因布局位移点到商品链接触发详情页。"""
- if not element:
- return False
- try:
- self.sleep(0.8, 2.0)
- try:
- self.driver.run_js(
- "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
- element,
- )
- except Exception:
- pass
- self.sleep(0.2, 0.6)
- self.driver.run_js("arguments[0].click();", element)
- return True
- except Exception as e:
- logger.warning("点击失败: %s", e)
- try:
- element.click()
- return True
- except Exception:
- return False
- @staticmethod
- def _estimated_price(json_data):
- fp = json_data.get("finalPrice")
- if isinstance(fp, dict):
- return fp.get("estimatedPrice", "") or ""
- return ""
- def parse(self, ware_list):
- for w in ware_list:
- title = w.get("wareName", "")
- title = re.sub(r"<[^>]*>", "", title).strip()
- color = w.get("color", "")
- full_title = title + " " + color
- logger.info(full_title)
- if self.product not in full_title:
- self.is_no_prodcut += 1
- continue
- if self.brand not in full_title:
- self.is_no_prodcut += 1
- continue
- if self.product_desc:
- if self.product_desc in full_title:
- crawl_product_desc = self.product_desc
- else:
- crawl_product_desc = ""
- title = full_title
- else:
- crawl_product_desc = ""
- title = full_title
- if "+[" in title:
- continue
- self.is_no_prodcut = 0
- status = 1
- if self.product_keyword:
- search_keyword_list = self.product_keyword.split(",")
- for search_keyword in search_keyword_list:
- if search_keyword.strip() not in title:
- status = 0
- if status == 0:
- continue
- logger.info(f"商品名:{title}")
- sku_id = w.get("skuId", "")
- sales = w.get("totalSales", "")
- shop_id = w.get("shopId", "")
- shop_name = w.get("shopName", "")
- heshu_m = re.search(r"(\d+)(盒|瓶)", full_title)
- if heshu_m:
- heshu_count = int(heshu_m.group(1))
- else:
- heshu_count = 1
- final_price = self._estimated_price(w)
- jd_price = w.get("jdPrice", "")
- low_price = final_price if final_price else jd_price
- try:
- price = Decimal(str(low_price)).quantize(Decimal("0.00"))
- except (InvalidOperation, ValueError):
- price = Decimal("0.00")
- item_url = f"https://item.jd.com/{sku_id}.html"
- mall_url = f"https://mall.jd.com/index-{shop_id}.html?from=pc"
- # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
- now_ts = time.strftime("%Y-%m-%d %H:%M:%S")
- product = {
- "platform": self.platform,
- "item_id": sku_id,
- "enterprise_id": self.company_id,
- "product_name": title,
- "spec": crawl_product_desc,
- "one_price": "",
- "detail_url": item_url,
- "shop_name": shop_name,
- "anonymous_store_name": "",
- "shop_url": mall_url,
- "city_name": "",
- "city_id": "",
- "province_name": "",
- "province_id": "",
- "shipment_city_name": "",
- "shipment_city_id": "",
- "shipment_province_name": "",
- "shipment_province_id": "",
- "area_info": "",
- "factory_name": "",
- "scrape_date": time.strftime("%Y-%m-%d"),
- "price": price,
- "sales": sales,
- "stock_count": "",
- "snapshot_url": "",
- "approval_num": "",
- "produced_time": "",
- "deadline": "",
- "update_time": now_ts,
- "insert_time": now_ts,
- "number": heshu_count,
- "product_brand": self.brand or "",
- "collect_task_id": self.collect_task_id,
- "search_name": self.product,
- "company_name": "",
- "collect_config_info": json.dumps(
- {
- "sampling_cycle": self.sampling_cycle,
- "sampling_start_time": self.sampling_start_time,
- "sampling_end_time": self.sampling_end_time,
- }
- ),
- "account_id": self.account_id,
- "collect_region_id": self.collect_region_id,
- "collect_round": self.collect_round,
- "is_sold_out": 0
- }
- try:
- self.pipeline.storge_data(product)
- logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
- except Exception as e:
- logger.exception("写入数据库失败: %s", e)
- @staticmethod
- def _response_has_ware_list(data):
- if not isinstance(data, dict):
- return False
- wl = data.get("data", {}).get("wareList")
- return bool(wl)
- def fetch_items_once(self, timeout=FETCH_TIMEOUT_FIRST):
- n = 0
- for resp in self.driver.listen.steps(timeout=timeout):
- try:
- data = resp.response.body
- if not self._response_has_ware_list(data):
- continue
- ware_list = data["data"]["wareList"]
- self.parse(ware_list)
- n += len(ware_list)
- except Exception as e:
- logger.warning("解析监听响应失败: %s", e)
- return n
- def clear_listen_buffer(self, rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT):
- try:
- for _ in range(rounds):
- resps = list(self.driver.listen.steps(timeout=timeout))
- if not resps:
- break
- logger.debug("监听缓冲已清空")
- except Exception as e:
- logger.debug("清空监听缓冲失败: %s", e)
- def collect_full_page_items(self, max_steps=10):
- """单次循环:边滑动边收数据,到底 / 看见「下一页」即停。"""
- n = self.fetch_items_once(timeout=FETCH_TIMEOUT_FIRST)
- stagnant = 0
- last_scroll_y = None
- for step in range(max_steps):
- next_btn = self._find_next_btn(timeout=0.3)
- if self._is_next_btn_visible(next_btn):
- n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
- return n, next_btn
- info = self._get_scroll_info()
- scroll_y = info["scrollY"]
- doc_h = info["docH"]
- view_h = info["viewH"]
- at_bottom = (scroll_y + view_h >= doc_h - 20)
- if last_scroll_y is not None and abs(scroll_y - last_scroll_y) < 8:
- stagnant += 1
- else:
- stagnant = 0
- last_scroll_y = scroll_y
- if at_bottom and stagnant >= 2:
- n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
- next_btn = self._find_next_btn(timeout=2)
- if next_btn:
- self._scroll_next_into_view(next_btn)
- return n, next_btn
- logger.info("已到页面底部且未发现下一页,停止滑动")
- return n, None
- self._scroll_page_down(random.randint(400, 800))
- if random.random() < 0.15:
- self.driver.run_js(f"window.scrollBy(0, -{random.randint(60, 140)})")
- self.sleep(0.5, 1.5)
- if step % 3 == 2:
- n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
- n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
- next_btn = self._find_next_btn(timeout=3)
- if next_btn and not self._is_next_btn_visible(next_btn):
- self._scroll_next_into_view(next_btn)
- return n, next_btn
- def get_account(self):
- sql_account = """
- SELECT *
- FROM `retrieve_collect_equipment_account`
- WHERE `id` = %s
- and `status` = 0
- """
- account_list = self.db.select_data(sql_account, self.account_id)
- if not account_list:
- return False
- account_dict = account_list[0]
- print(account_dict)
- self.ip = account_dict.get("ip")
- self.account_name = account_dict.get("username")
- self.login_username = account_dict.get("phone", "")
- self.login_password = account_dict.get("password", "")
- logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
- return True
- def disable_account(self):
- update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s WHERE `name` = %s; """
- self.db.execute(update_sql, (1, self.account_name))
- def _build_search_keyword(self):
- parts = [p for p in (self.brand, self.product, self.product_desc) if p]
- return " ".join(parts).strip() or self.product
- def _is_logged_out(self):
- return bool(self.driver.ele("xpath=//*[@class='link-login']", timeout=2))
- def perform_jd_login(self):
- """
- 使用已有浏览器实例执行京东账号密码登录(含滑块验证码)。
- 成功返回 True,失败返回 False。
- """
- username = self.login_username
- password = self.login_password
- login_url = "https://passport.jd.com/new/login.aspx"
- self.driver.get(login_url)
- input_name = self.driver.ele("xpath=//input[@id='loginname']", timeout=15)
- if not input_name:
- print("未找到用户名输入框")
- return False
- input_name.input(username)
- time.sleep(random.uniform(1.5, 2.5))
- input_pass = self.driver.ele("xpath://input[@name='nloginpwd']", timeout=5)
- if not input_pass:
- print("未找到密码输入框")
- return False
- input_pass.input(password)
- time.sleep(random.uniform(1.5, 2.5))
- login_btn = self.driver.ele("xpath://a[@id='loginsubmit']", timeout=5)
- if not login_btn:
- print("未找到登录按钮")
- return False
- login_btn.click()
- time.sleep(random.uniform(3, 5))
- if not handle_jd_slider_captcha(self.driver):
- print("滑块验证码未通过")
- return False
- return True
- def _ensure_logged_in(self):
- """未登录时自动走登录流程(账号密码 + 滑块)。"""
- if not self._is_logged_out():
- return True
- logger.info("检测到未登录,开始自动登录: %s", self.account_name)
- ok = self.perform_jd_login()
- if ok and not self._is_logged_out():
- logger.info("自动登录成功: %s", self.account_name)
- return True
- logger.error("自动登录失败: %s", self.account_name)
- return False
- def _check_page_blocked(self):
- html = self.driver.html or ""
- if "抱歉由于访问频繁导致无法搜索" in html:
- logger.error("账号无法搜索(访问频繁)")
- self.success = False
- return True
- return False
- def _jump_to_page(self, target_page):
- """跳转到指定页码,并清空跳转前的监听残留。"""
- to_page_input = self.driver.ele(
- "xpath=//div[contains(@class,'_pagination_toPageNum_')]//input[@type='text']",
- timeout=3,
- )
- if not to_page_input:
- logger.warning("未找到跳页输入框,无法跳转到第 %s 页", target_page)
- return False
- self.clear_listen_buffer()
- to_page_input.input(str(target_page))
- self.sleep(1, 2)
- self.driver.actions.key_down("enter").key_up("enter")
- self.sleep(3, 5)
- self.clear_listen_buffer()
- logger.info("已跳转到第 %s 页", target_page)
- return True
- def _go_next_page(self, next_btn):
- self.clear_listen_buffer()
- if not self._human_click(next_btn):
- logger.warning("点击下一页失败")
- return False
- self.sleep(2, 4)
- self.clear_listen_buffer()
- return True
- def crawl(self):
- total = 0
- keyword = self._build_search_keyword()
- self.driver.get("https://www.jd.com/", timeout=15)
- time.sleep(15)
- if self._is_logged_out():
- if not self.login_password or not self.login_username:
- return
- if not self._ensure_logged_in():
- self.disable_account()
- send_text(f"京东:{self.account_name}账号登录失败")
- self.success = False
- return
- self.driver.get("https://www.jd.com/", timeout=15)
- self.sleep(3, 5)
- kw = quote(str(keyword or ""), safe="")
- self.driver.get(
- f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}", timeout=15
- )
- self.sleep(5, 8)
- if self._check_page_blocked():
- return
- if not handle_jd_slider_captcha(self.driver, pause_listen=False):
- logger.warning("进入搜索页后滑块验证码处理失败")
- self.success = False
- return
- self._start_listen()
- if self.start_page > 1:
- if not self._jump_to_page(self.start_page):
- logger.warning("跳页失败,将从第 1 页开始采集")
- self.start_page = 1
- logger.info(
- "采集页码范围: %s ~ %s(共 %s 页)",
- self.start_page,
- self.end_page,
- self.end_page - self.start_page + 1,
- )
- for page_no in range(self.start_page, self.end_page + 1):
- if self._is_logged_out():
- if not self._ensure_logged_in():
- self.success = False
- break
- self.driver.get(
- f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}",
- timeout=15,
- )
- self.sleep(3, 5)
- if page_no > 1:
- self._jump_to_page(page_no)
- if not handle_jd_slider_captcha(self.driver, pause_listen=True):
- logger.warning("滑块验证码处理失败,停止采集")
- self.success = False
- break
- if self._check_page_blocked():
- break
- logger.info("===== 正在爬取第 %s 页 =====", page_no)
- search_ele = self.driver.ele("xpath=//div[@id='search-condition']", timeout=10)
- if not search_ele:
- logger.warning("未找到搜索结果区域,停止采集")
- break
- page_n, _ = self.collect_full_page_items()
- logger.info("本页监听商品条数(含可能重复): %s", page_n)
- total += page_n
- logger.info("累计监听条数: %s", total)
- if self.is_no_prodcut > 20:
- logger.info("连续无匹配商品过多,停止采集")
- break
- if page_no >= self.end_page:
- break
- next_btn = self.driver.ele("text=下一页", timeout=2)
- if not next_btn:
- logger.info("没有下一页(未找到)")
- break
- cls_str = next_btn.attr("class") or ""
- if "disabled" in cls_str:
- logger.info("没有下一页(已禁用)")
- break
- if not self._go_next_page(next_btn):
- break
- def run(self):
- # 检测账号
- if not self.get_account():
- logger.info("==================当前无账号可用==================")
- self.success = False
- return self.pipeline.crawl_count, self.success
- logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
- # # # 每次选取账号,立马账号使用时间
- update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s, `update_time`= %s WHERE `name` = %s; """
- self.db.execute(update_sql, (0, int(time.time()), self.account_name))
- try:
- self.init_browser()
- self.crawl()
- except Exception as e:
- self.success = False
- logger.exception("爬取异常: %s", e)
- self.sleep(3, 5)
- finally:
- if self.driver:
- self.driver.quit()
- self.driver = None
- return self.pipeline.crawl_count, self.success
|