소스 검색

药师帮快照

zhuoyuncheng 2 주 전
부모
커밋
594849eb1a
1개의 변경된 파일387개의 추가작업 그리고 0개의 파일을 삭제
  1. 387 0
      spiders/yaoshibang/ysb_snapshot_crawl.py

+ 387 - 0
spiders/yaoshibang/ysb_snapshot_crawl.py

@@ -0,0 +1,387 @@
+import random
+import signal
+import socket
+import sys
+import time
+import base64
+import math
+import requests
+from commons.conn_mysql import MySQLPoolOnline
+from DrissionPage import ChromiumPage, ChromiumOptions
+from commons.Logger import logger
+from oss_upload.oss_upload import AliyunOSSUploader
+from commons.config import YSB_ACCOUNT, YSB_PASSWORD
+CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
+CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
+
+SLIDER_OFFSET_FIX = 10
+
+chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+
+
+class YaoShiBangSnapshot:
+    def __init__(self, product=None):
+        self.product = product
+        self.driver = None
+        self.account_name = "ysbang_1"
+        self.platform = 5
+        self.db_online = MySQLPoolOnline()
+        self.ossuploader = AliyunOSSUploader()
+        self._register_signal_handler()
+
+    def _register_signal_handler(self):
+        def handler(signum, frame):
+            logger.info("收到退出信号,正在关闭浏览器...")
+            self._quit_browser()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, handler)
+        if hasattr(signal, "SIGTERM"):
+            signal.signal(signal.SIGTERM, handler)
+
+    def _quit_browser(self):
+        if self.driver:
+            try:
+                self.driver.quit()
+            except Exception:
+                pass
+            self.driver = None
+
+    @staticmethod
+    def _get_free_port():
+        """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(("127.0.0.1", 0))
+            return s.getsockname()[1]
+
+    def init_browser(self):
+        co = ChromiumOptions().set_browser_path(chrome_path)
+        debug_port = self._get_free_port()
+        co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}")
+
+        co.set_local_port(debug_port)
+        co.set_argument(f"--remote-debugging-port={debug_port}")
+        co.set_argument("--remote-debugging-address=127.0.0.1")
+        # co.set_argument("--disable-blink-features=AutomationControlled")
+        co.set_argument("--disable-dev-shm-usage")
+        co.set_argument("--no-first-run")  # 避免首次运行弹窗
+        co.set_argument("--no-default-browser-check")  # 避免默认浏览器检查
+        self.driver = ChromiumPage(co)
+
+    def _solve_slider_captcha(self):
+        """检测并处理易盾滑块验证码,成功返回 True。"""
+        self.driver.wait.doc_loaded()
+        time.sleep(2)
+
+        yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
+        if not yidun:
+            return True
+
+        logger.info("检测到滑块验证码,开始处理")
+        jpg_bytes = yidun.get_screenshot(as_bytes="jpg")
+
+        distance = self._call_captcha_api(jpg_bytes)
+        if distance is None:
+            logger.error("验证码识别失败")
+            return False
+
+        logger.info("滑块距离: %s", distance)
+        slider = self.driver.ele(
+            "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
+        )
+        if not slider:
+            logger.error("未找到滑块元素")
+            return False
+
+        try:
+            drag_distance = float(distance) + SLIDER_OFFSET_FIX
+        except (TypeError, ValueError):
+            logger.error("滑块距离非数字: %r", distance)
+            return False
+
+        if not math.isfinite(drag_distance) or drag_distance <= 0:
+            logger.error("滑块距离无效: %s", drag_distance)
+            return False
+        self._simulate_slider_drag(slider, drag_distance)
+        time.sleep(3)
+        return True
+
+    def _call_captcha_api(self, image_bytes):
+        """调用云码平台识别滑块距离,失败返回 None。"""
+        try:
+            b64 = base64.b64encode(image_bytes).decode()
+            resp = requests.post(
+                CAPTCHA_API_URL,
+                json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
+                headers={"Content-Type": "application/json"},
+                timeout=15,
+            ).json()
+            logger.info("验证码 API 返回: %s", resp)
+            if not isinstance(resp, dict):
+                return None
+            data = resp.get("data")
+            if isinstance(data, dict):
+                dist = data.get("data")
+            else:
+                dist = data
+            if dist is None:
+                logger.error("验证码 API 未返回距离字段: %s", resp)
+                return None
+            try:
+                d = float(dist)
+            except (TypeError, ValueError):
+                logger.error("验证码距离无法解析为数字: %r", dist)
+                return None
+            if not math.isfinite(d):
+                logger.error("验证码距离非有限数值: %r", dist)
+                return None
+            return d
+        except Exception as e:
+            logger.exception("验证码 API 调用失败: %s", e)
+            return None
+
+    @staticmethod
+    def _generate_human_track(distance):
+        try:
+            distance = float(distance)
+        except (TypeError, ValueError):
+            return []
+        if distance <= 0 or not math.isfinite(distance):
+            return []
+        tracks = []
+        current = 0
+        mid = distance * 0.7
+        t = 0.2
+        v = 0
+        move_points = []
+
+        while current < mid:
+            a = random.uniform(2, 4)
+            v0 = v
+            v = v0 + a * t
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        while current < distance:
+            a = -random.uniform(0.5, 1.5)
+            v0 = v
+            v = v0 + a * t
+            if v < 0.5:
+                v = 0.5
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        total_points = len(move_points)
+        for i, move in enumerate(move_points):
+            y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
+
+            if i < total_points * 0.3:
+                duration = random.uniform(0.01, 0.03)
+            elif i > total_points * 0.7:
+                duration = random.uniform(0.03, 0.08)
+            else:
+                duration = random.uniform(0.02, 0.05)
+
+            if random.random() < 0.05:
+                duration += random.uniform(0.05, 0.1)
+
+            tracks.append((move, y_offset, duration))
+
+        if random.random() < 0.7:
+            tracks.append((-random.randint(1, 3), 0, 0.05))
+
+        return tracks
+
+    def _simulate_slider_drag(self, slider_element, target_distance):
+        if target_distance <= 0:
+            logger.warning("滑块目标距离无效: %s", target_distance)
+            return
+        self.driver.actions.move_to(slider_element).hold()
+        for offset_x, offset_y, duration in self._generate_human_track(target_distance):
+            self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
+        self.driver.actions.release()
+
+    def _is_logged_in(self):
+        # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项
+        title = self.driver.ele(
+            "xpath=//*[contains(text(),'广西好药师大药房连锁有限公司天峨远大药店')]",
+            timeout=5,
+        )
+        return bool(title)
+
+    def login(self):
+        logger.info("开始登录药师帮")
+        self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
+        if not input_name:
+            logger.error("未找到账号输入框")
+            return False
+        input_name.input(YSB_ACCOUNT)
+        time.sleep(random.uniform(1.5, 2.5))
+
+        input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
+        if not input_pass:
+            logger.error("未找到密码输入框")
+            return False
+        input_pass.input(YSB_PASSWORD)
+        time.sleep(random.uniform(1.5, 2.5))
+
+        login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
+        if not login_btn:
+            logger.error("未找到登录按钮")
+            return False
+
+        login_btn.click()
+        time.sleep(3)
+        for i in range(3):
+            self._solve_slider_captcha()
+            time.sleep(3)
+
+            if self._is_logged_in():
+                logger.info("登录成功")
+                return True
+
+        logger.error("登录后未检测到目标店铺名,登录可能失败")
+        return False
+
+    def get_snapshot(self, detail_url, row_id):
+        self.driver.get(detail_url, timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+        self._dismiss_popup_before_screenshot()
+
+        ele = self.driver.ele("xpath=//div[@class='drug-shopping-wrap']", timeout=8)
+        if not ele:
+            ele = self.driver.ele("xpath=//div[@class='drug-info']", timeout=5)
+        if not ele:
+            logger.warning("未找到详情区域元素,跳过截图 row_id=%s", row_id)
+            return ""
+
+        try:
+            jpg_bytes = ele.get_screenshot(as_bytes="jpg")
+            if not jpg_bytes:
+                logger.warning("截图为空 row_id=%s", row_id)
+                return ""
+            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(row_id))
+        except Exception:
+            logger.exception("截图或 OSS 上传失败 row_id=%s url=%s", row_id, detail_url)
+            return ""
+        if not img_url:
+            logger.warning("OSS 未返回有效地址 row_id=%s", row_id)
+            return ""
+        logger.info("截图上传完成 row_id=%s url=%s", row_id, img_url)
+        time.sleep(random.uniform(0.5, 1.5))
+        return img_url
+
+    def _dismiss_popup_before_screenshot(self):
+        """截图前关闭或隐藏营销弹窗,避免遮挡。"""
+        close_locs = [
+            "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]",
+            "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]",
+            "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]",
+            "xpath=//button[contains(@class,'close')]",
+            "xpath=//span[text()='×']",
+            "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]",
+        ]
+        for loc in close_locs:
+            try:
+                btn = self.driver.ele(loc, timeout=0.5)
+                if btn:
+                    btn.click()
+                    time.sleep(0.2)
+            except Exception:
+                pass
+
+        try:
+            # 兜底:隐藏常见高层弹窗和遮罩
+            self.driver.run_js(
+                """
+                const sels = [
+                  '[class*="modal"]',
+                  '[class*="popup"]',
+                  '[class*="dialog"]',
+                  '[class*="mask"]',
+                  '[class*="overlay"]'
+                ];
+                for (const s of sels) {
+                  document.querySelectorAll(s).forEach(el => {
+                    const style = getComputedStyle(el);
+                    const z = parseInt(style.zIndex || '0', 10);
+                    if (z >= 999 && style.display !== 'none') {
+                      el.style.display = 'none';
+                    }
+                  });
+                }
+                document.body.style.overflow = 'auto';
+                """
+            )
+            time.sleep(0.2)
+        except Exception:
+            pass
+
+    def _save_snapshot_url(self, row_id, img_url):
+        """上传成功后回写库,避免下次任务重复拉取同一批。"""
+        if row_id is None or not img_url:
+            return
+        sql = (
+            "UPDATE `retrieve_process_lowprice_product` "
+            "SET `snapshot_url` = %s WHERE `id` = %s AND `platform` = %s"
+        )
+        n = self.db_online.execute(sql, (img_url, row_id, self.platform))
+        if n <= 0:
+            logger.warning("snapshot_url 回写未影响行数 id=%s platform=%s", row_id, self.platform)
+
+    def search(self, data_list):
+        self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        if not self._is_logged_in():
+            if not self.login():
+                return False
+
+        ok, fail = 0, 0
+        for data in data_list:
+            row_id = data.get("id")
+            link_url = data.get("link_url")
+            if not link_url:
+                logger.warning("缺少 link_url,跳过 id=%s", row_id)
+                fail += 1
+                continue
+            img_url = self.get_snapshot(link_url, row_id)
+            if img_url:
+                self._save_snapshot_url(row_id, img_url)
+                ok += 1
+            else:
+                fail += 1
+        logger.info("快照任务结束 成功=%s 失败=%s 总计=%s", ok, fail, len(data_list))
+        return ok > 0
+
+    def run(self):
+        date_str = time.strftime("%Y-%m-%d")
+        sql = """
+             SELECT `id`,`link_url` FROM `retrieve_process_lowprice_product` 
+            WHERE `platform`=%s AND `snapshot_url` IS NULL AND `scrape_date`=%s 
+            LIMIT 100 """
+
+        data_list = self.db_online.select_data(sql, (self.platform, date_str))
+        if not data_list:
+            logger.info("当前不需要更新快照")
+            return
+
+        try:
+            self.init_browser()
+            self.search(data_list)
+        except Exception as e:
+            logger.exception("运行异常: %s", e)
+        finally:
+            self._quit_browser()
+
+
+if __name__ == "__main__":
+    YaoShiBangSnapshot().run()