| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- import random
- import signal
- import socket
- import sys
- import time
- import base64
- import requests
- from DrissionPage import ChromiumPage, ChromiumOptions
- from commons.Logger import logger
- from oss_upload.oss_upload import AliyunOSSUploader
- CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
- SLIDER_OFFSET_FIX = 10
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class YaoShiBangCrawl:
- def __init__(self, product=None):
- self.product = product
- self.driver = None
- self.account_name = "ysbang_1"
- self.ossuploader = AliyunOSSUploader()
- self._register_signal_handler()
- def _register_signal_handler(self):
- def handler(signum, frame):
- logger.info("收到退出信号,正在关闭浏览器...")
- self._quit_browser()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def _quit_browser(self):
- if self.driver:
- try:
- self.driver.quit()
- except Exception:
- pass
- self.driver = None
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- self.driver = ChromiumPage(co)
- def _solve_slider_captcha(self):
- """检测并处理易盾滑块验证码,成功返回 True。"""
- self.driver.wait.doc_loaded()
- time.sleep(2)
- yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
- if not yidun:
- return True
- logger.info("检测到滑块验证码,开始处理")
- jpg_bytes = yidun.get_screenshot(as_bytes="jpg")
- distance = self._call_captcha_api(jpg_bytes)
- if distance is None:
- logger.error("验证码识别失败")
- return False
- logger.info("滑块距离: %s", distance)
- slider = self.driver.ele(
- "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
- )
- if not slider:
- logger.error("未找到滑块元素")
- return False
- self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX)
- time.sleep(3)
- return True
- def _call_captcha_api(self, image_bytes):
- """调用云码平台识别滑块距离,失败返回 None。"""
- try:
- b64 = base64.b64encode(image_bytes).decode()
- resp = requests.post(
- CAPTCHA_API_URL,
- json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
- headers={"Content-Type": "application/json"},
- timeout=15,
- ).json()
- logger.info("验证码 API 返回: %s", resp)
- return resp["data"]["data"]
- except Exception as e:
- logger.exception("验证码 API 调用失败: %s", e)
- return None
- @staticmethod
- def _generate_human_track(distance):
- tracks = []
- current = 0
- mid = distance * 0.7
- t = 0.2
- v = 0
- move_points = []
- while current < mid:
- a = random.uniform(2, 4)
- v0 = v
- v = v0 + a * t
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- while current < distance:
- a = -random.uniform(0.5, 1.5)
- v0 = v
- v = v0 + a * t
- if v < 0.5:
- v = 0.5
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- total_points = len(move_points)
- for i, move in enumerate(move_points):
- y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
- if i < total_points * 0.3:
- duration = random.uniform(0.01, 0.03)
- elif i > total_points * 0.7:
- duration = random.uniform(0.03, 0.08)
- else:
- duration = random.uniform(0.02, 0.05)
- if random.random() < 0.05:
- duration += random.uniform(0.05, 0.1)
- tracks.append((move, y_offset, duration))
- if random.random() < 0.7:
- tracks.append((-random.randint(1, 3), 0, 0.05))
- return tracks
- def _simulate_slider_drag(self, slider_element, target_distance):
- self.driver.actions.move_to(slider_element).hold()
- for offset_x, offset_y, duration in self._generate_human_track(target_distance):
- self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
- self.driver.actions.release()
- def _is_logged_in(self):
- title = self.driver.ele(
- "xpath=//*[contains(text(),'广西好药师大药房连锁有限公司天峨远大药店')]",
- timeout=5,
- )
- return bool(title)
- def login(self):
- logger.info("开始登录药师帮")
- self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
- if not input_name:
- logger.error("未找到账号输入框")
- return False
- input_name.input("13097980383")
- time.sleep(random.uniform(1.5, 2.5))
- input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
- if not input_pass:
- logger.error("未找到密码输入框")
- return False
- input_pass.input("a123456")
- time.sleep(random.uniform(1.5, 2.5))
- login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
- if not login_btn:
- logger.error("未找到登录按钮")
- return False
- login_btn.click()
- time.sleep(3)
- for i in range(3):
- self._solve_slider_captcha()
- time.sleep(3)
- if self._is_logged_in():
- logger.info("登录成功")
- cookies_list = self.driver.cookies()
- cookies_dict = {c['name']: c['value'] for c in cookies_list}
- print(cookies_dict)
- return True
- logger.error("登录后未检测到目标店铺名,登录可能失败")
- return False
- def get_snapshot(self, detail_url, item_id):
- self.driver.get(detail_url, timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- self._dismiss_popup_before_screenshot()
- ele = self.driver.ele("xpath=//div[@class='drug-shopping-wrap']", timeout=8)
- if not ele:
- ele = self.driver.ele("xpath=//div[@class='drug-info']", timeout=5)
- if not ele:
- logger.warning("未找到详情区域元素,跳过截图 item_id=%s", item_id)
- return ""
- jpg_bytes = ele.get_screenshot(as_bytes="jpg")
- img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id))
- logger.info("截图上传完成: %s", img_url)
- time.sleep(random.uniform(0.5, 1.5))
- return img_url
- def _dismiss_popup_before_screenshot(self):
- """截图前关闭或隐藏营销弹窗,避免遮挡。"""
- close_locs = [
- "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]",
- "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]",
- "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]",
- "xpath=//button[contains(@class,'close')]",
- "xpath=//span[text()='×']",
- "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]",
- ]
- for loc in close_locs:
- try:
- btn = self.driver.ele(loc, timeout=0.5)
- if btn:
- btn.click()
- time.sleep(0.2)
- except Exception:
- pass
- try:
- # 兜底:隐藏常见高层弹窗和遮罩
- self.driver.run_js(
- """
- const sels = [
- '[class*="modal"]',
- '[class*="popup"]',
- '[class*="dialog"]',
- '[class*="mask"]',
- '[class*="overlay"]'
- ];
- for (const s of sels) {
- document.querySelectorAll(s).forEach(el => {
- const style = getComputedStyle(el);
- const z = parseInt(style.zIndex || '0', 10);
- if (z >= 999 && style.display !== 'none') {
- el.style.display = 'none';
- }
- });
- }
- document.body.style.overflow = 'auto';
- """
- )
- time.sleep(0.2)
- except Exception:
- pass
- def search(self):
- self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- if not self._is_logged_in():
- if not self.login():
- return False
- detail_url = "https://dian.ysbang.cn/#/drugInfo?wholesaleid=376456110&isAssemble=true&trafficType=15"
- self.get_snapshot(detail_url, "376456110")
- return True
- def run(self):
- try:
- self.init_browser()
- self.search()
- except Exception as e:
- logger.exception("运行异常: %s", e)
- finally:
- self._quit_browser()
- if __name__ == "__main__":
- YaoShiBangCrawl().run()
|