import requests import base64 import cv2 import uiautomator2 as u2 import time import subprocess import re import random import datetime import json from aip import AipOcr from apscheduler.schedulers.blocking import BlockingScheduler # from db_mysql import mysqlClient import threading from collections import deque import numpy as np import secrets import os import math import schedule # import pyperclip from config import Config from logger import setup_logger import logging from contextlib import contextmanager from typing import Dict, Any # from database import MySQLClient # 配置日志 # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') setup_logger("mt_spider") # 初始化日志 class SpiderMonitor(threading.Thread): """全局弹窗监控线程(增强版)""" def __init__(self, spider_instance): super().__init__(daemon=True) self.spider = spider_instance self.running = True self.pausing = threading.Event() # 主线程同步事件 self.last_verification_time = 0 self.verification_count = 0 self.MAX_VERIFICATION_RETRY = 10 self.recent_clicks = deque(maxlen=10) # 防重复点击 self.logger = logging.getLogger("SpiderMonitor") self.TOKEN = "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk" self.API_URL = "http://api.jfbym.com/api/YmServer/customApi" self.d = self.spider.d self.verification_in_progress = threading.Event() self.loggerMT = logging.getLogger() self.verification_retry_count = 0 # 当前验证码重试次数 self.last_verification_type = None # 可配置化弹窗规则 self.popup_rules = { "simple": [ ('//*[@text="确定"]', "点击确定"), ('//*[@text="允许"]', "点击允许"), ('//*[@text="关闭"]', "点击关闭"), ('//*[@resource-id="com.sankuai.meituan:id/close"]', "关闭按钮"), ('//*[@resource-id="com.sankuai.meituan:id/address_center_location_close"]', "关闭按钮"), ('//*[@resource-id="com.sankuai.meituan:id/location_close"]', "关闭按钮"), ('//*[@resource-id="com.sankuai.meituan:id/btn_close"]', "关闭按钮"), ], # "verification": [ # '//*[contains(@text, "验证")]', # '//*[contains(@text, "滑块")]', # '//*[contains(@text, "依次点击")]', # '//*[contains(@text, "请点击")]', # '//*[contains(@text, "拖动滑块刚")]', #这个需要拖动滑块至最右边,然后再截图 # '//*[contains(@text, "请输入图片中的内容")]', # '//*[contains(@text, "用最短线连接")]', # '//*[contains(@text, "请按语序依次点击")]', # '//*[contains(@text, "请向右滑动滑块")]', # '//*[contains(@text, "请拖动下方滑块完成拼图")]', # '//*[contains(@resource-id, "captcha")]' # ] "verification": [ ('//*[contains(@text, "请点击")]', "click_side"), ('//*[contains(@text, "请输入图片中的内容")]', "Numbers_English"), ('//*[contains(@text, "请向右滑动滑块")]', "Swipe_right"), ('//*[contains(@text, "请依次点击下图图标")]', "Click_images"), ('//*[contains(@text, "请拖动下方滑块完成拼图")]', "slider"), ('//*[contains(@text, "拖动滑块刚")]', "complexs"), # 这个需要拖动滑块至最右边,然后再截图 ('//*[contains(@text, "请按语序依次点击")]', "Click_images"), ('//*[contains(@text, "用最短线连接")]', "Shortest_connection"), ] } def run(self): while self.running: try: handled = self.check_and_handle_popup() time.sleep(2 if handled else 1) except Exception as e: self.logger.exception("监控线程异常: %s", e) time.sleep(3) def _is_recent_click(self, xpath): """防止重复点击同一个弹窗""" key = f"{xpath}_{int(time.time())}" if key in self.recent_clicks: return True self.recent_clicks.append(key) return False @staticmethod def get_sleep_time(): # return random.randint(5, 8) return random.randint(1, 3) def human_slide(self, start_x, start_y, end_x, end_y, hold_time=0): """模拟真实人类滑动轨迹 - 连续变化的速度曲线,微小偏差""" points = [] # 随机参数 total_steps = random.randint(60, 85) # 更多步数使曲线更平滑 # 计算滑动距离 distance_x = end_x - start_x distance_y = end_y - start_y total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2) self.logger.info(f"滑块验证移动0") # 微小偏差设置 - 人类不完美的对齐 # X方向偏差:1-6像素,70%概率过冲,30%欠冲 if random.random() < 0.7: offset_x = random.randint(1, min(5, int(total_distance * 0.01))) else: offset_x = -random.randint(1, min(3, int(total_distance * 0.02))) # # Y方向微小偏差:±0-2像素 # offset_y = random.randint(-2, 2) # 实际停止位置 stop_x = end_x + offset_x stop_y = end_y # 物理参数:模拟手指滑动的物理过程 # 使用加速度、最大速度、减速度模型 accel_time_ratio = random.uniform(0.25, 0.35) # 加速阶段占总时间的比例 decel_time_ratio = random.uniform(0.25, 0.35) # 减速阶段占总时间的比例 max_speed = random.uniform(1.5, 2.2) # 最大速度倍数 # 生成轨迹 for i in range(total_steps): t = i / (total_steps - 1) # 时间进度 0-1 # 物理速度曲线:连续变化的加速度过程 if t < accel_time_ratio: # 加速阶段:从0加速到最大速度 phase_t = t / accel_time_ratio # 使用平滑的加速曲线(二次函数) speed_factor = max_speed * phase_t * phase_t elif t < 1 - decel_time_ratio: # 匀速阶段:保持最大速度 speed_factor = max_speed # 加入轻微的随机波动,模拟人类手部自然抖动 speed_factor += random.uniform(-0.05, 0.05) else: # 减速阶段:从最大速度减速到0 phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio # 使用平滑的减速曲线(二次函数,末尾更平缓) speed_factor = max_speed * (1 - phase_t * phase_t) self.logger.info(f"滑块验证移动1") # 计算位移(积分速度得到位置) # 使用贝塞尔曲线计算位置,让运动更自然 if t < accel_time_ratio: # 加速阶段的位置 phase_t = t / accel_time_ratio progress = (max_speed / 3) * phase_t * phase_t * phase_t elif t < 1 - decel_time_ratio: # 匀速阶段的位置 phase_t = (t - accel_time_ratio) / (1 - accel_time_ratio - decel_time_ratio) # 匀速阶段的位移加上加速阶段完成的位移 accel_distance = (max_speed / 3) # 加速阶段完成的位移 progress = accel_distance + (1 - 2 * accel_distance) * phase_t else: # 减速阶段的位置 phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio # 从减速起点平滑过渡到终点 progress = 1 - (max_speed / 3) * (1 - phase_t) * (1 - phase_t) * (1 - phase_t) # 限制进度在0-1之间 progress = max(0, min(1, progress)) # 添加自然的手部抖动 if t < 0.1 or t > 0.9: # 开始和结束:非常小的抖动 jitter_x = random.randint(-1, 1) jitter_y = random.randint(-1, 1) elif t < 0.3 or t > 0.7: # 过渡阶段:小抖动 jitter_x = random.randint(-2, 2) jitter_y = random.randint(-2, 2) else: # 中间快速阶段:稍大抖动 jitter_x = random.randint(-2, 2) if random.random() < 0.3 else 0 jitter_y = random.randint(-2, 2) if random.random() < 0.3 else 0 # 计算当前位置 current_x = start_x + (stop_x - start_x) * progress + jitter_x current_y = start_y + (stop_y - start_y) * progress + jitter_y self.logger.info(f"滑块验证移动2") # 确保轨迹单调性(不会回退) if points: if distance_x > 0: # 向右滑动 current_x = max(points[-1][0], current_x) elif distance_x < 0: # 向左滑动 current_x = min(points[-1][0], current_x) # 时间延迟 - 基于当前速度计算 # 速度越快,延迟越短 if t < 0.1: # 开始阶段 delay = random.uniform(0.002, 0.008) elif t < 0.9: # 中间阶段 # 延迟与速度成反比 base_delay = 0.008 speed_delay_factor = 1.0 / (speed_factor + 0.5) delay = base_delay * speed_delay_factor + random.uniform(-0.002, 0.002) delay = max(0.005, min(delay, 0.015)) else: # 结束阶段 # 逐渐增加延迟 slow_factor = 1.0 + (t - 0.9) * 10 delay = random.uniform(0.015, 0.025) * slow_factor points.append((current_x, current_y, delay)) self.logger.info(f"滑块验证移动3") # 确保最后一点是实际停止位置 if points: points[-1] = (stop_x, stop_y, 0) # 执行滑动 if points: # 按下起点 self.d.touch.down(points[0][0], points[0][1]) time.sleep(random.uniform(0.002, 0.006)) # 移动轨迹 for i, point in enumerate(points[1:]): self.d.touch.move(point[0], point[1]) self.logger.info(f"滑块验证移动{point[0]},{point[1]}") # 最后阶段可能的微小停顿(人类犹豫) # progress = (i + 1) / len(points[1:]) # if progress > 0.98: # time.sleep(random.uniform(0.001, 0.003)) time.sleep(point[2]) # 抬起手指 self.d.touch.up(points[-1][0], points[-1][1]) # 滑动后的随机延迟 hold_time = random.uniform(1, 2) time.sleep(hold_time) return points # 数英 def Numbers_English_verify(self): time.sleep(5) rotate_image_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.Image[1]' if not self.d.xpath(rotate_image_xpath).exists: print("数英图片元素不存在") rotate_img_path = "Numbers_English.png" try: rotate_image = self.d.xpath(rotate_image_xpath) rotate_image.screenshot().save(rotate_img_path) print(f"数英图片截图保存成功: {rotate_img_path}") except Exception as e: print(f"数英图片截图失败: {e}") try: with open(rotate_img_path, 'rb') as f: image_data = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" data = { "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": 10103, "image": image_data } headers = { "Content-Type": "application/json" } response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == 10000 and result.get("data", {}).get("code") == 0: Numbers_English_verify_data = result["data"] print(f"API返回: {Numbers_English_verify_data}") captcha_text = Numbers_English_verify_data.get("data") print(f"验证码: {captcha_text}") if self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]| //*[@resource-id="com.sankuai.meituan:id/titans_webview_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]').exists: self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]| //*[@resource-id="com.sankuai.meituan:id/titans_webview_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]').click() time.sleep(1) self.d.send_keys(captcha_text) time.sleep(5) self.d.xpath( '//*[@text="验证"] | //*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.widget.Button[1]').click() time.sleep(3) return True else: print("API返回错误") return False except Exception as e: print(f"数英验证码处理异常: {e}") return False # 滑块 def slider_verify(self): time.sleep(5) try: slider_slot_xpath = '//*[@resource-id="puzzleSliderDrag"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[1]' slider_main_xpath = '//*[@resource-id="puzzleImageMain"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]' slider_slot_img_path = "slider_slot.png" slider_main_img_path = "slider_main.png" if self.d.xpath(slider_slot_xpath).exists: self.d.xpath(slider_slot_xpath).screenshot().save("slider_slot.png") else: print("slider_slot_xpath not exist") self.logger.info(f"slider_slot_xpath not exist") if self.d.xpath(slider_main_xpath).exists: self.d.xpath(slider_main_xpath).screenshot().save("slider_main.png") else: print("slider_main_xpath not exist") self.logger.info(f"slider_main_xpath not exist") slider_slide_distance = 0 try: with open('slider_main.png', 'rb') as f: b = base64.b64encode(f.read()).decode() # API请求配置 url = "http://api.jfbym.com/api/YmServer/customApi" data = { "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": 22222, "image": b } headers = { "Content-Type": "application/json" } response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == 10000 and result.get("data", {}).get("code") == 0: slider_verify_data = result["data"] print(f"slider_verify_data={slider_verify_data}") slider_slide_distance = slider_verify_data["data"] print(slider_slide_distance) else: print("api 返回错误 此时滑块验证可能呈图片形式存在") except Exception as e: return { "success": False, "error_msg": f"处理异常: {str(e)}" } slider_slide_distance = float(slider_verify_data["data"]) # 获取滑块元素 try: slider_xpath = '//*[@resource-id="puzzleSliderBox"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.view.View[1]' slider = self.d.xpath(slider_xpath) slider_info = slider.info bounds = slider_info['bounds'] start_x = ((bounds['left'] + bounds['right']) / 2) + random.uniform(-4, 4) start_y = ((bounds['top'] + bounds['bottom']) / 2) + random.uniform(-3, 3) # ● end_x = start_x + slider_slide_distance + random.uniform(-3, 3) # ● end_y = start_y + random.uniform(-1, 1) # self.swipe(start_x, start_y, end_x, end_y, # duration=random.uniform(1.2, 2.0), # deviation=random.randint(20, 40)) self.human_slide(start_x, start_y, end_x, end_y) time.sleep(2) # return True except Exception as e: print(f"滑动操作时出错: {e}") return False time.sleep(2) # 检查验证码是否消失 slider_xpaths = [ '//*[@text="请拖动下方滑块完成拼图"]', ] for xpath in slider_xpaths: if self.d.xpath(xpath).exists: return False return True except Exception as e: self.logger.error(f"滑块验证失败: {e}") return False # 点击 def Click_images(self): time.sleep(5) try: # 1. 定位图标元素 Click_images_xpath = '//*[@resource-id="com.sankuai.meituan:id/titans_main_layout"] | //*[@resource-id="com.sankuai.meituan:id/h5_container"] | //*[@resource-id="root"]' # 检查元素是否存在 if not self.d.xpath(Click_images_xpath).exists: print("图标元素不存在") self.logger.info("图标元素不存在") return False # 获取图片元素在屏幕中的坐标 Click_image_element = self.d.xpath(Click_images_xpath) Click_image_element_info = Click_image_element.info bounds = Click_image_element_info['bounds'] # 计算图片左上角在屏幕中的坐标 image_left = bounds['left'] image_top = bounds['top'] image_width = bounds['right'] - bounds['left'] image_height = bounds['bottom'] - bounds['top'] print(f"图片位置: left={image_left}, top={image_top}, width={image_width}, height={image_height}") # 2. 截图 image_img_path = "Click_images.png" if self.d.xpath(Click_images_xpath).exists: self.d.xpath(Click_images_xpath).screenshot().save("Click_images.png") else: print("图标元素不存在,无法截图") self.logger.info("图标元素不存在,无法截图") return False try: with open('Click_images.png', 'rb') as f: c = base64.b64encode(f.read()).decode() # API请求配置 url = "http://api.jfbym.com/api/YmServer/customApi" data = { "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", # 注册后登录去用户中心获取token "type": 88888, # 打码类型ID "image": c # 图片base64字符串 } headers = { "Content-Type": "application/json" } # 发送请求 response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() # 检查HTTP请求是否成功 result = response.json() print(f"API返回结果: {result}") if result.get("code") == 10000 and result.get("data", {}).get("code") == 0: verify_data = result.get("data", {}) print(f"verify_data={verify_data}") # 获取坐标数据字符串,格式为:"188,165|99,128|91,209|235,116" coords_str = verify_data.get("data", "") if not coords_str: print("未返回坐标数据") return False print(f"坐标字符串: {coords_str}") # 分割坐标字符串 coords_list = coords_str.split('|') print(coords_list) # 依次点击每个坐标 for coord in coords_list: try: # 分割字符串并转换为整数(这是图片内的相对坐标) x_img_str, y_img_str = coord.split(',') x_img = int(x_img_str.strip()) y_img = int(y_img_str.strip()) print(f"图片相对坐标: x={x_img}, y={y_img}") # 转换为屏幕绝对坐标 x_screen = image_left + x_img # ● y_screen = image_top + y_img print(f"屏幕绝对坐标: x={x_screen}, y={y_screen}") # 点击转换后的屏幕坐标 self.d.click(x_screen, y_screen) time.sleep(self.get_sleep_time()) except Exception as e: print(f"处理坐标 {coord} 失败: {e}") continue time.sleep(self.get_sleep_time() * 2) # 给系统一些响应时间 return True else: error_msg = result.get("msg", "未知错误") print(f"识别失败: {error_msg}") return False except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") return False except Exception as e: print(f"识别过程出错: {e}") return False except Exception as e: self.logger.error(f"点击图标失败: {e}") return False # # 检查验证是否成功 # if not self.d.xpath('//*[@text="请依次点击下图图标"] | //*[@text="请按语序依次点击"]').exists: # print("所有坐标点击完成,验证成功") # return True # else: # print("所有坐标点击完成,但验证文本仍然存在,可能验证失败") # return False # def Click_images(self): # try: # image_xpath = '//*[@resource-id="com.sankuai.meituan:id/titans_main_layout"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]' # image_element = self.d.xpath(image_xpath) # image_element_info = image_element.info # bounds = image_element_info['bounds'] # image_left = bounds['left'] # image_top = bounds['top'] # image_width = bounds['right'] - bounds['left'] # image_height = bounds['bottom'] - bounds['top'] # print(f"图片位置: left={image_left}, top={image_top}, width={image_width}, height={image_height}") # # # image_img_path = "image.png" # if self.d.xpath(image_xpath).exists: # self.d.xpath(image_xpath).screenshot().save("image.png") # else: # print("image_xpath not exist") # try: # with open('image.png', 'rb') as f: # c = base64.b64encode(f.read()).decode() # url = "http://api.jfbym.com/api/YmServer/customApi" # data = { # "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", # "type": 50009, # "image": c # } # headers = { # "Content-Type": "application/json" # } # # 发送请求 # response = requests.post(url, headers=headers, json=data, timeout=30) # response.raise_for_status() # result = response.json() # if result.get("code") == 10000 and result.get("data", {}).get("code") == 0: # side_data = result["data"] # print(f"side_data={side_data}") # big_click_xpath = side_data["data"] # else: # print("api 返回错误") # return False # except Exception as e: # return { # "success": False, # "error_msg": f"处理异常: {str(e)}" # } # coords_str = side_data["data"] # if coords_str: # x_img_str, y_img_str = coords_str.split(',') # x_img = int(x_img_str.strip()) # y_img = int(y_img_str.strip()) # x_screen = image_left + x_img # y_screen = image_top + y_img # self.d.click(x_screen, y_screen) # time.sleep(self.get_sleep_time()) # return True # except Exception as e: # self.logger.error(f"依次点击验证失败: {e}") # return False # 向右滑动 def Swipe_right(self): time.sleep(5) start_x = 0 start_y = 0 end_x = 0 end_y = 0 distance = 0 if self.d.xpath( '//*[@resource-id="yodaBoxWrapper"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]').exists: Swipe = self.d.xpath( '//*[@resource-id="yodaBoxWrapper"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]') Swipe_info = Swipe.info bound = Swipe_info['bounds'] Swipe_distance = bound['right'] - bound['left'] if self.d.xpath( '//*[@resource-id="yodaBox"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1] ').exists: xpath = self.d.xpath( '//*[@resource-id="yodaBox"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]') xpath_info = xpath.info bounds = xpath_info['bounds'] start_x = (bounds['left'] + bounds['right']) // 2 start_y = (bounds['top'] + bounds['bottom']) // 2 distance = Swipe_distance - (bounds['right'] - bounds['left']) end_x = start_x + distance end_y = start_y print(f"滑动距离: {distance}像素") print(f"起点: ({start_x}, {start_y}), 终点: ({end_x}, {end_y})") # 确保滑块到达最右端 end_x += 10 self.Swipe_right_human_slide(start_x, start_y, end_x, end_y) time.sleep(2) return True else: print("未找到滑块元素") return False else: print("未找到轨道元素") return False def Swipe_right_human_slide(self, start_x, start_y, end_x, end_y): """模拟人类滑动轨迹""" # 生成带加速度的轨迹 points = [] total_steps = 50 distance_x = end_x - start_x distance_y = end_y - start_y previous_x = start_x # 用于记录上一个 x 坐标值 for i in range(total_steps): # 非线性进度(慢-快-慢) ratio = (i / total_steps) if ratio < 0.3: progress = 0.5 * (ratio / 0.3) ** 2 elif ratio < 0.7: progress = 0.5 + (ratio - 0.3) * 1.25 else: progress = 0.9 + 0.5 * ((ratio - 0.7) / 0.3) ** 0.5 # 添加随机抖动 # offset_x = np.random.randint(-2, 3) # offset_y = np.random.randint(-2, 3) offset_x = np.random.randint(-1, 1) # 控制抖动范围 offset_y = np.random.randint(-1, 1) x = start_x + distance_x * min(progress, 0.99) + offset_x y = start_y + distance_y * min(progress, 0.99) + offset_y # 确保 x 坐标单调递增 if x < previous_x and x < end_x: x = previous_x + 1 if x > end_x: x = end_x previous_x = x points.append((x, y)) # 变速延迟(移动越快延迟越短) delay = 0.002 + 0.01 * (1 - abs(0.5 - ratio)) time.sleep(delay) print(f"points: {points}") self.loggerMT.info(f"points: {points}") # 执行滑动轨迹 self.d.touch.down(points[0][0], points[0][1]) for point in points[1:]: self.d.touch.move(point[0], point[1]) self.d.touch.up(points[-1][0] + 2, points[-1][1]) # print(f"points: {points}") # self.loggerPdd.info(f"points: {points}") # self.d.swipe_points(points, duration=0.05) # 拖动滑块刚 def complexs(self): time.sleep(5) try: slider_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.view.View[1]' track_xpath = '//*[@text="请按照说明拖动滑块"]' if not self.d.xpath(slider_xpath).exists: print("滑块元素不存在") self.logger.info("滑块元素不存在") return False if not self.d.xpath(track_xpath).exists: print("滑轨元素不存在") self.logger.info("滑轨元素不存在") return False # slider_element = self.d.xpath(slider_xpath) slider_info = slider_element.info slider_bounds = slider_info['bounds'] slider_left = slider_bounds['left'] slider_top = slider_bounds['top'] slider_right = slider_bounds['right'] slider_bottom = slider_bounds['bottom'] slider_width = slider_right - slider_left slider_height = slider_bottom - slider_top slider_center_x = slider_left + slider_width / 2 slider_center_y = slider_top + slider_height / 2 print(f"滑块中心: ({slider_center_x}, {slider_center_y})") # track_element = self.d.xpath(track_xpath) track_info = track_element.info track_bounds = track_info['bounds'] track_left = track_bounds['left'] track_right = track_bounds['right'] track_width = track_right - track_left # 2. 滑到滑轨最右端 right_end_center_x = track_right - slider_width / 2 right_end_center_y = slider_center_y print(f"最右端滑块中心坐标: ({right_end_center_x}, {right_end_center_y})") try: self.d.touch.down(slider_center_x, slider_center_y) time.sleep(0.1) # 生成滑动轨迹 points = self.Swipe_trajectory(slider_center_x, slider_center_y, right_end_center_x, right_end_center_y) for point in points[1:]: self.d.touch.move(point[0], point[1]) time.sleep(0.002) print("滑块已到达最右端") except Exception as e: print(f"滑动到最右端失败: {e}") return capture_xpath1 = '// *[ @ text = "身份核实"] / android.view.View[1] / android.view.View[1] / android.widget.TextView[1]' capture_xpath2 = '// * [ @ text = "身份核实"] / android.view.View[1] / android.view.View[1] / android.view.View[1]' capture_element1 = self.d.xpath(capture_xpath1) capture_element2 = self.d.xpath(capture_xpath2) capture_info1 = capture_element1.info capture_info2 = capture_element2.info capture_info1_bounds = capture_info1['bounds'] capture_info2_bounds = capture_info2['bounds'] capture_label_left = capture_info1_bounds['left'] capture_label_top = capture_info1_bounds['top'] capture_label_right = capture_info1_bounds['right'] capture_label_bottom = capture_info1_bounds['bottom'] capture_left = capture_info2_bounds['left'] capture_top = capture_info2_bounds['top'] capture_right = capture_info2_bounds['right'] capture_bottom = capture_info2_bounds['bottom'] capture_label_width = capture_label_right - capture_label_left capture_label_height = capture_label_bottom - capture_label_top capture_width = capture_right - capture_left capture_height = capture_bottom - capture_top print( f"截图区域1(提示文本): left={capture_label_left}, top={capture_label_top}, width={capture_label_width}, height={capture_label_height}") print( f"截图区域2(图片): left={capture_left}, top={capture_top}, width={capture_width}, height={capture_height}") # 截图并保存--2张图 screenshot_label_path = "capture_label_area.png" screenshot_image_path = "capture_area.png" try: full_screenshot = self.d.screenshot() from PIL import Image import io if isinstance(full_screenshot, bytes): img = Image.open(io.BytesIO(full_screenshot)) else: img = full_screenshot # # 裁剪指定区域1 cropped_img_1 = img.crop( (capture_label_left, capture_label_top, capture_label_right, capture_label_bottom)) cropped_img_1.save(screenshot_label_path) print(f"截图2已保存: {screenshot_label_path}") # 裁剪指定区域2 cropped_img_2 = img.crop((capture_left, capture_top, capture_right, capture_bottom)) cropped_img_2.save(screenshot_image_path) print(f"截图2已保存: {screenshot_image_path}") except Exception as e: print(f"截图失败: {e}") self.d.touch.up(right_end_center_x, right_end_center_y) return try: with open(screenshot_label_path, 'rb') as f: label_image_data = base64.b64encode(f.read()).decode() with open(screenshot_image_path, 'rb') as f: image_data = base64.b64encode(f.read()).decode() # API请求配置 url = "http://api.jfbym.com/api/YmServer/customApi" data = { "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": 29013, "image": image_data, "label_image": label_image_data } headers = { "Content-Type": "application/json" } # 发送请求 response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() print(f"API返回结果: {result}") if result.get("code") == 10000 and result.get("data", {}).get("code") == 0: verify_data = result.get("data", {}) print(f"verify_data={verify_data}") data_str = verify_data.get("data", "") if not data_str: print("云码未返回有效的data值") # 抬起并返回 self.d.touch.up(right_end_center_x, right_end_center_y) return try: data_value = int(data_str) print(f"云码返回的像素距离: {data_value}") gray_line_target_x = 108 + data_value - 44 slider_target_center_x = gray_line_target_x print(f"滑块中心目标X坐标: {slider_target_center_x}") # 5. 计算需要往回拖动的距离 # 当前滑块在最右端,中心X = 612 current_slider_center_x = 612 back_distance = 504 - data_value - 44 print(f"需要往回拖动的距离: {back_distance}") # 执行往回拖动 slider_element = self.d.xpath(slider_xpath) slider_info = slider_element.info slider_bounds = slider_info['bounds'] current_slider_center_x = (slider_bounds['left'] + slider_bounds['right']) / 2 # 确保目标位置在滑轨范围内 min_x = track_left + slider_width / 2 max_x = track_right - slider_width / 2 slider_target_center_x = max(min_x, min(slider_target_center_x, max_x)) # 计算实际需要滑动的距离 actual_distance = slider_target_center_x - current_slider_center_x print(f"实际需要滑动的距离: {actual_distance}") # 往回拖动 try: # 往回拖动的轨迹 back_points = self.Swipe_trajectory( right_end_center_x, right_end_center_y, slider_target_center_x, right_end_center_y ) # 移动到每个点 for point in back_points[1:]: self.d.touch.move(point[0], point[1]) time.sleep(0.002) time.sleep(3) self.d.touch.up(slider_target_center_x, right_end_center_y) except Exception as e: print(f"往回拖动失败: {e}") self.d.touch.up(right_end_center_x, right_end_center_y) except ValueError as e: print(f"解析云码返回的data值失败: {e}") return False except Exception as e: print(f"计算滑块位置失败: {e}") return False else: error_msg = result.get("msg", "未知错误") print(f"识别失败: {error_msg}") return False except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") return False except Exception as e: print(f"识别过程出错: {e}") return False except Exception as e: self.logger.error(f"滑块验证失败: {e}") return False def Swipe_trajectory(self, start_x, start_y, end_x, end_y): """生成滑动轨迹点""" points = [] total_steps = 50 distance_x = end_x - start_x distance_y = end_y - start_y previous_x = start_x for i in range(total_steps): # 非线性进度(慢-快-慢) ratio = (i / total_steps) if ratio < 0.3: progress = 0.5 * (ratio / 0.3) ** 2 elif ratio < 0.7: progress = 0.5 + (ratio - 0.3) * 1.25 else: progress = 0.9 + 0.5 * ((ratio - 0.7) / 0.3) ** 0.5 # 添加随机抖动 offset_x = np.random.randint(-1, 1) offset_y = np.random.randint(-1, 1) x = start_x + distance_x * min(progress, 0.99) + offset_x y = start_y + distance_y * min(progress, 0.99) + offset_y # 确保 x 坐标单调递增 if x < previous_x and x < end_x: x = previous_x + 1 if x > end_x: x = end_x previous_x = x points.append((x, y)) return points # 最短线连接 def Shortest_connection(self): time.sleep(5) try: art_text_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]' color_points_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.view.View[1]/android.widget.Image[1]' art_text_img_path = "art_text.png" color_points_img_path = "color_points.png" if self.d.xpath(art_text_xpath).exists: self.d.xpath(art_text_xpath).screenshot().save(art_text_img_path) self.logger.info("艺术字截图成功") else: self.logger.warning("艺术字元素不存在") return False if self.d.xpath(color_points_xpath).exists: self.d.xpath(color_points_xpath).screenshot().save(color_points_img_path) self.logger.info("颜色元素截图成功") else: self.logger.warning("颜色点元素不存在") return False color_name = "" try: with open('art_text.png', 'rb') as f: c = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" data = { "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": 10118, "image": c } headers = { "Content-Type": "application/json" } response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() self.logger.info(f"云码API返回结果: {result}") if result.get("code") == 0: color_name = result.get("data", "") if "鼗" in color_name: color_name == "紫色" elif result.get("code") == 10000 and "data" in result: inner_data = result.get("data", {}) if isinstance(inner_data, dict) and inner_data.get("code") == 0: color_name = inner_data.get("data", "") if "鼗" in color_name: color_name == "紫色" elif isinstance(inner_data, str): color_name = inner_data if "鼗" in color_name: color_name == "紫色" else: self.logger.error(f"云码API返回异常: {result}") return False if not color_name: self.logger.error("未识别到颜色名称") return False self.logger.info(f"识别到的颜色名称: {color_name}") except Exception as e: self.logger.error(f"云码API调用异常: {e}") return False # 颜色的坐标 relative_coordinates = self.find_color_coordinates(color_points_img_path, color_name) if not relative_coordinates: self.logger.warning(f"未在图片中找到 {color_name} 颜色的坐标") return False color_element = self.d.xpath(color_points_xpath).info element_bounds = color_element['bounds'] element_left = element_bounds['left'] element_top = element_bounds['top'] element_width = element_bounds['right'] - element_bounds['left'] element_height = element_bounds['bottom'] - element_bounds['top'] # 读取截图 try: color_points_img = cv2.imread(color_points_img_path) if color_points_img is None: self.logger.error("无法读取截图") return False screenshot_height, screenshot_width = color_points_img.shape[:2] except Exception as e: self.logger.error(f"读取截图尺寸失败: {e}") # 如果无法读取,使用元素尺寸作为默认值 screenshot_width = element_width screenshot_height = element_height # 屏幕坐标 screen_coordinates = [] for (rx, ry) in relative_coordinates: if screenshot_width > 0 and screenshot_height > 0: # 计算缩放比例 scale_x = element_width / screenshot_width scale_y = element_height / screenshot_height sx = element_left + int(rx * scale_x) sy = element_top + int(ry * scale_y) else: # 如果无法获取截图尺寸,直接使用相对坐标加上元素起始位置 sx = element_left + rx sy = element_top + ry screen_coordinates.append((sx, sy)) self.logger.info(f"相对坐标({rx}, {ry}) -> 屏幕坐标({sx}, {sy})") # . 计算最短路径 if len(screen_coordinates) < 2: self.logger.warning("需要至少2个点才能连接") return False unvisited = screen_coordinates.copy() path = [unvisited.pop(0)] while unvisited: last_point = path[-1] nearest_idx = min(range(len(unvisited)), key=lambda i: ((last_point[0] - unvisited[i][0]) ** 2 + (last_point[1] - unvisited[i][1]) ** 2) ** 0.5) path.append(unvisited.pop(nearest_idx)) self.logger.info(f"最短路径点顺序: {path}") curved_path = self.human_like_path(path) # 连接 result = self.simulate_human_drawing(curved_path) if result: self.logger.info("最短线连接成功") time.sleep(3) return True else: self.logger.warning("最短线连接失败") return False except Exception as e: self.logger.error(f"最短线连接验证失败: {e}") return False def find_color_coordinates(self, image_path, color_name): """查找指定颜色""" self.logger.info(f"开始查找颜色: {color_name}, 图片路径: {image_path}") # 颜色HSV范围映射 color_ranges = { "红色": (([0, 120, 70], [10, 255, 255]), ([170, 120, 70], [180, 255, 255])), "红的": (([0, 120, 70], [10, 255, 255]), ([170, 120, 70], [180, 255, 255])), "绿色": (([35, 50, 50], [85, 255, 255]),), "蓝色": (([90, 50, 50], [130, 255, 255]),), "黄色": (([20, 100, 100], [30, 255, 255]),), "橙色": (([5, 100, 100], [15, 255, 255]),), "紫色": (([130, 50, 50], [160, 255, 255]),), "黑色": (([0, 0, 0], [180, 255, 50]),), "白色": (([0, 0, 200], [180, 30, 255]),), "黑色": (([0, 0, 0], [180, 255, 50]),), "褐色": (([10, 100, 20], [20, 255, 200]),), "橘色": (([5, 150, 150], [15, 255, 255])), "褐色": (([10, 50, 20], [20, 255, 150])), } if color_name not in color_ranges: self.logger.warning(f"不支持的颜色: {color_name}") return [] # 读取图像 image = cv2.imread(image_path) if image is None: self.logger.error(f"无法读取图像: {image_path}") return [] # 转换到HSV颜色空间 hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 根据颜色名称获取HSV范围 color_range = color_ranges[color_name] # 创建颜色掩码 if color_name == "红色": lower1 = np.array(color_range[0][0]) upper1 = np.array(color_range[0][1]) lower2 = np.array(color_range[1][0]) upper2 = np.array(color_range[1][1]) mask1 = cv2.inRange(hsv, lower1, upper1) mask2 = cv2.inRange(hsv, lower2, upper2) mask = cv2.bitwise_or(mask1, mask2) else: lower = np.array(color_range[0][0]) upper = np.array(color_range[0][1]) mask = cv2.inRange(hsv, lower, upper) # 形态学操作去除噪点 kernel = np.ones((3, 3), np.uint8) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) # 查找轮廓 contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # 获取每个轮廓的中心点(相对坐标) coordinates = [] min_area = 30 # 最小面积阈值 for i, contour in enumerate(contours): area = cv2.contourArea(contour) if area > min_area: # 计算轮廓的中心点 M = cv2.moments(contour) if M["m00"] != 0: cx = int(M["m10"] / M["m00"]) cy = int(M["m01"] / M["m00"]) coordinates.append((cx, cy)) self.logger.info(f"轮廓{i}: 面积={area}, 中心点=({cx}, {cy})") else: self.logger.info(f"轮廓{i}: 面积={area}, 无法计算中心点") self.logger.info(f"找到 {len(coordinates)} 个 {color_name} 坐标点") return coordinates def human_like_path(self, points): """生成模拟人类的弯曲路径""" if len(points) < 2: return points curved_path = [] for i in range(len(points) - 1): start = points[i] end = points[i + 1] # 在两点之间添加弯曲点 mid_x = (start[0] + end[0]) / 2 mid_y = (start[1] + end[1]) / 2 # 计算随机偏移,模拟人类手绘误差 if abs(end[0] - start[0]) > abs(end[1] - start[1]): # 水平方向为主,在垂直方向添加偏移 offset_x = 0 offset_y = random.uniform(-15, 15) else: # 垂直方向为主,在水平方向添加偏移 offset_x = random.uniform(-15, 15) offset_y = 0 # 控制点(在中间点添加偏移) control_x = mid_x + offset_x control_y = mid_y + offset_y # 使用二次贝塞尔曲线生成弯曲路径 curved_path.append(start) for t in np.arange(0.1, 1.0, 0.1): # 二次贝塞尔曲线公式 x = (1 - t) ** 2 * start[0] + 2 * (1 - t) * t * control_x + t ** 2 * end[0] y = (1 - t) ** 2 * start[1] + 2 * (1 - t) * t * control_y + t ** 2 * end[1] curved_path.append((int(x), int(y))) # 添加最后一个点 curved_path.append(points[-1]) return curved_path def simulate_human_drawing(self, path): """模拟人类绘制路径""" if len(path) < 2: return False try: # 获取第一个点 start_x, start_y = path[0] self.d.touch.down(start_x, start_y) time.sleep(random.uniform(0.05, 0.1)) # 依次移动到路径中的每个点 for i in range(1, len(path)): target_x, target_y = path[i] # 添加随机抖动,模拟人类手部颤抖 jitter_x = random.randint(-2, 2) jitter_y = random.randint(-2, 2) self.d.touch.move(target_x + jitter_x, target_y + jitter_y) # 添加随机延迟 delay = random.uniform(0.01, 0.03) time.sleep(delay) time.sleep(random.uniform(0.1, 0.2)) self.d.touch.up(path[-1][0], path[-1][1]) print("模拟人类绘制完成") return True except Exception as e: print(f"模拟绘制失败: {e}") return False # 空间推理 def click_side(self): try: image_xpath = '//*[@resource-id="com.sankuai.meituan:id/titans_main_layout"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]' image_element = self.d.xpath(image_xpath) image_element_info = image_element.info bounds = image_element_info['bounds'] image_left = bounds['left'] image_top = bounds['top'] image_width = bounds['right'] - bounds['left'] image_height = bounds['bottom'] - bounds['top'] print(f"图片位置: left={image_left}, top={image_top}, width={image_width}, height={image_height}") # image_img_path = "image.png" if self.d.xpath(image_xpath).exists: self.d.xpath(image_xpath).screenshot().save("image.png") else: print("image_xpath not exist") try: with open('image.png', 'rb') as f: c = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" data = { "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": 50009, "image": c } headers = { "Content-Type": "application/json" } # 发送请求 response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == 10000 and result.get("data", {}).get("code") == 0: verify_data = result["data"] print(f"verify_data={verify_data}") big_click_xpath = verify_data["data"] else: print("api 返回错误") except Exception as e: return { "success": False, "error_msg": f"处理异常: {str(e)}" } coords_str = verify_data["data"] if coords_str: x_img_str, y_img_str = coords_str.split(',') x_img = int(x_img_str.strip()) y_img = int(y_img_str.strip()) x_screen = image_left + x_img y_screen = image_top + y_img # 点击转换后的屏幕坐标 self.d.click(x_screen, y_screen) time.sleep(self.get_sleep_time()) return True except Exception as e: self.logger.error(f"空间推理验证失败: {e}") return False # 人工处理 def _handle_generic_captcha(self, xpath): """处理通用验证码""" time.sleep(5) self.logger.warning("通用验证码弹窗触发,等待人工处理...") start_time = time.time() timeout = 60 * 60 while time.time() - start_time < timeout: if not self.d.xpath(xpath).exists: self.logger.info("验证码已处理完成") return True time.sleep(2) self.logger.warning("验证码处理超时") return False def check_and_handle_popup(self): d = self.spider.d exists, captcha_type, xpath = self.is_any_verification_popup_exists(d) if not exists: for simple_xpath, desc in self.popup_rules["simple"]: if d.xpath(simple_xpath).exists and not self._is_recent_click(simple_xpath): self.logger.info(f"检测到简单弹窗: {desc}") d.xpath(simple_xpath).click() return True # 处理广告弹窗 if d.xpath('//*[contains(@text, "广告")]').exists: w, h = d.info['displayWidth'], d.info['displayHeight'] d.click(w - 50, 50) self.logger.info("关闭广告弹窗") return True return False # 开始处理 now = time.time() # 防止过于频繁触发 if now - self.last_verification_time < 30 and self.verification_in_progress.is_set(): return False self.last_verification_time = now self.verification_count += 1 if captcha_type != self.last_verification_type: self.logger.info(f"验证码类型变化: {self.last_verification_type} -> {captcha_type}") self.verification_retry_count = 0 self.last_verification_type = captcha_type self.verification_retry_count += 1 self.logger.warning( f"检测到验证码弹窗,类型: {captcha_type},重试次数: {self.verification_retry_count}/{self.MAX_VERIFICATION_RETRY}") if self.verification_retry_count > self.MAX_VERIFICATION_RETRY: self.logger.error("重试次数超限,重启应用") self._handle_verification_failure() return False self.verification_in_progress.set() self.pausing.set() self.logger.info("已设置主线程暂停事件") # 处理 try: if captcha_type == "Numbers_English": self.logger.info(f"开始处理通用数验证") result = self.Numbers_English_verify() elif captcha_type == "Swipe_right": self.logger.info(f"开始处理向右滑动") result = self.Swipe_right() elif captcha_type == "Click_images": self.logger.info(f"开始处理依次点击图片或语序") result = self.Click_images() elif captcha_type == "slider": self.logger.info(f"开始处理滑块验证") result = self.slider_verify() elif captcha_type == "complexs": self.logger.info(f"开始处理拖动滑块刚") result = self.complexs() elif captcha_type == "Shortest_connection": self.logger.info(f"开始处理最短距离连接") result = self.Shortest_connection() elif captcha_type == "click_side": self.logger.info(f"开始处理空间推理") result = self.click_side() else: self.logger.info(f"等待人工处理") result = self._handle_generic_captcha(xpath) except Exception as e: self.logger.error(f"验证码处理异常: {e}") result = False time.sleep(5) verification_cleared, remaining_type = self.wait_for_verification_clear(d, timeout=7) if verification_cleared: self.logger.info(f"第{self.verification_retry_count}次验证成功") time.sleep(3) self._handle_verification_success() return True else: self.logger.warning(f"第{self.verification_retry_count}次验证失败,仍有验证码: {remaining_type}") if self.verification_retry_count >= self.MAX_VERIFICATION_RETRY: self._handle_verification_failure() return False else: self.verification_in_progress.clear() self.pausing.clear() time.sleep(2) return self.check_and_handle_popup() ''' # 1. 处理简单弹窗 for xpath, desc in self.popup_rules["simple"]: if d.xpath(xpath).exists and not self._is_recent_click(xpath): self.logger.info("检测到弹窗: %s", desc) d.xpath(xpath).click() return True # 2. 处理验证码弹窗 for xpath in self.popup_rules["verification"]: if d.xpath(xpath).exists: now = time.time() if now - self.last_verification_time < 30: return False # 30秒内不重复触发 self.last_verification_time = now self.verification_count += 1 self.logger.warning("验证码弹窗触发,等待人工处理...") if self.verification_count > self.MAX_VERIFICATION_RETRY: self.logger.error("验证码重试超限,终止任务") self.spider.stop_all() return True self.pausing.set() # 通知主线程暂停 d.toast.show("需要人工处理验证码", 120) # 等待人工处理 start = time.time() # while time.time() - start < 120*60: # if not d.xpath(xpath).exists: # self.logger.info("验证码已处理") # d.toast.show("验证完成", 2) # self.pausing.clear() # 放行主线程 # return True # time.sleep(5) while True: if not d.xpath(xpath).exists: self.logger.info("验证码已处理") d.toast.show("验证完成", 2) self.pausing.clear() # 放行主线程 return True time.sleep(5) self.logger.warning("验证码超时,重启APP") self.spider.restart_app() return True # 3. 处理广告弹窗(点击右上角) if d.xpath('//*[contains(@text, "广告")]').exists: w, h = d.info['displayWidth'], d.info['displayHeight'] d.click(w - 50, 50) self.logger.info("关闭广告弹窗") return True return False ''' def is_any_verification_popup_exists(self, d=None): """ 检查是否存在任何类型的验证码弹窗 """ if d is None: d = self.d for xpath, captcha_type in self.popup_rules["verification"]: if d.xpath(xpath).exists: return True, captcha_type, xpath # 特殊的验证码 additional_indicators = [ ('//*[contains(@resource-id, "com.sankuai.meituan:id/yoda_activity_rootView")]', "complexs"), ('//*[contains(@text, "拖动滑块刚")]', "complexs"), ] for xpath, captcha_type in additional_indicators: if d.xpath(xpath).exists: return True, captcha_type, xpath return False, None, None def wait_for_verification_clear(self, d=None, timeout=10): """ 等待验证码完全消失 """ if d is None: d = self.d start_time = time.time() while time.time() - start_time < timeout: exists, captcha_type, _ = self.is_any_verification_popup_exists(d) if not exists: return True, None time.sleep(1) exists, captcha_type, _ = self.is_any_verification_popup_exists(d) if exists: self.logger.info(f"超时,类型: {captcha_type}") return False, captcha_type else: return True, None def _handle_verification_success(self): """验证成功后的处理""" time.sleep(5) self.verification_retry_count = 0 self.last_verification_type = None self.verification_in_progress.clear() self.pausing.clear() self.verification_count = 0 self.logger.info("验证成功,清除暂停状态") def _handle_verification_failure(self): """验证失败 - 等待人工处理""" self.logger.error("验证码处理失败,等待人工处理...") # 重置验证状态,但不重启应用 self.verification_retry_count = 0 self.last_verification_type = None self.verification_in_progress.clear() # 设置等待人工处理的超时时间(30分钟) timeout = 30 * 60 start_time = time.time() # 持续监控验证码状态,等待人工处理 while time.time() - start_time < timeout: # 检查是否还有验证码存在 exists, captcha_type, xpath = self.is_any_verification_popup_exists() if not exists: # 验证码已消失,可能是人工处理成功 self.logger.info("验证码已消失,人工处理成功") # 额外等待确保页面稳定 time.sleep(3) # 清除暂停状态,放行线程 self.pausing.clear() self.logger.info("人工处理完成,放行线程") return # 每隔10秒检查一次 time.sleep(10) # 超时处理 self.logger.warning("等待人工处理超时,尝试继续执行") # 强制清除暂停状态,放行线程 self.pausing.clear() self.logger.warning("已超时,强制清除暂停状态,放行线程") def stop(self): self.running = False def get_access_token(): AppKey = "tRK2RhyItCSh6BzyT4CNVXQa" AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh" token_url = 'https://aip.baidubce.com/oauth/2.0/token' url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}" payload = "" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) try: return response.json()['access_token'] except: return None def get_mysql(): """ 建立并返回一个到数据库的连接对象 """ import pymysql return pymysql.connect( host=Config.DB_HOST, # "localhost", # 修改后的主机 port=Config.DB_PORT, # 3306, # 添加端口号 user=Config.DB_USER, # 'root', # 修改后的用户名 password=Config.DB_PASSWORD, # 修改后的密码 db=Config.DB_NAME, # "drug_data", # 修改后的数据库名 charset='utf8mb4' ) class TaskReporter: """任务上报管理器(线程安全)""" def __init__(self): self.tasks_data = {} # 存储每个任务的数据 self.lock = threading.Lock() def start_task(self, task_id: int, start_page: int, end_page: int): """记录任务开始""" with self.lock: self.tasks_data[task_id] = { 'task_id': task_id, 'start_time': int(time.time()), 'end_time': None, 'start_page': start_page, 'end_page': end_page, 'actual_end_page': start_page, # 实际结束页数 'real_count': 0, # 实际采集数量 'status': 'running', # running, completed, failed 'finish_status': 0, # 0:未完成,1:已完成 } def update_task_progress(self, task_id: int, actual_end_page: int = None, real_count: int = None): """更新任务进度(线程安全)""" with self.lock: if task_id in self.tasks_data: if actual_end_page is not None: self.tasks_data[task_id]['actual_end_page'] = actual_end_page if real_count is not None: self.tasks_data[task_id]['real_count'] = real_count def end_task(self, task_id: int, status: str = 'completed', finish_status: int = 0, force_end_page: int = None): """记录任务结束并上报""" with self.lock: if task_id in self.tasks_data: data = self.tasks_data[task_id] data['end_time'] = int(time.time()) data['status'] = status data['finish_status'] = finish_status if force_end_page is not None: data['actual_end_page'] = force_end_page # 准备上报数据 report_data = { "collect_task_allocate_id": data['task_id'], "status": 3 if data['status'] == 'completed' else 4, "finish_status": data['finish_status'], 'real_count': data['real_count'], 'start_time': data['start_time'], 'end_time': data['end_time'], 'start_page': data['start_page'], 'end_page': data['actual_end_page'] } # 调用上报接口 self._call_report_api(report_data) def _call_report_api(self, data: Dict[str, Any]): """调用上报接口""" try: url = 'http://schedule.dfwy.tech/api/collect_equipment_execute/result_report' resp = requests.post(url, json=data, timeout=10) if resp.status_code == 200: print(f"任务 {data['collect_task_allocate_id']} 上报成功") # self.loggerMT.info(f"任务 {data['collect_task_allocate_id']} 上报成功") else: print(f"任务 {data['collect_task_allocate_id']} 上报失败: {resp.status_code}") # self.loggerMT.info(f"任务 {data['collect_task_allocate_id']} 上报失败: {resp.status_code}") except Exception as e: print(f"上报接口调用异常: {e}") # 全局上报管理器 reporter = TaskReporter() class MT: def __init__(self, key): # self.package_name = 'com.sankuai.meituan' self.package_name = Config.PACKAGE_NAME self.access_token = get_access_token() self.city2province = self.get_city_info() self.APP_ID = '116857964' self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh' self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3' self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY) # host = Config.DB_HOST #"localhost" # user = Config.DB_USER #"root" # password = Config.DB_PASSWORD #"dfwy2025" # database = Config.DB_NAME #"drug_data" # port = Config.DB_PORT#3306 # print(f'数据库配置:host:{host},user:{user},password:{password},database:{database},port:{port}') self.table_name = Config.DB_AUTO_DRUG_TABLE # "mt_drug" self.shop_table_name = Config.DB_SHOP_TABLE # print(f'数据库表名:table_name:{self.table_name},shop_table_name:{self.shop_table_name}') # self.mysql_client = mysqlClient(host, user, password, database, port) self.loggerMT = logging.getLogger() self.search_key = key # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒 self.unrelated_data = 0 # 无关数据数量 self.shop_data_num = 0 # 店铺数据数量 # === 新增:采集统计 === self.collected_count = 0 # 实际采集的商品数量 self.task_id = None # 任务ID self.start_time = None # 任务开始时间 self.current_page = 0 # 当前页码 self.task_start_page = 0 # 任务开始页码 self.task_end_page = 0 # 任务结束页码 self.task_product_name = '' # 产品名称 self.task_product_specs = '' # 品规 # ==================== def update_task_status(self, status): """更新任务状态到数据库""" if not self.task_id: return try: retrieve_conn = get_retrieve_mysql() cursor = retrieve_conn.cursor() update_time = time.time() update_sql = """ UPDATE retrieve_collect_task_allocate SET status = %s, \ update_time = %s WHERE id = %s \ """ cursor.execute(update_sql, (status, update_time, self.task_id)) retrieve_conn.commit() self.loggerMT.info(f"任务 {self.task_id} 状态更新为 {status}") except Exception as e: self.loggerMT.error(f"更新任务状态失败: {e}") finally: if 'cursor' in locals(): cursor.close() if 'retrieve_conn' in locals(): retrieve_conn.close() def stop_app(self): self.d.app_stop(self.package_name) time.sleep(5) def start_app(self): self.d.app_start(self.package_name) time.sleep(5) def restart_app(self): """ 重启app :return: """ self.stop_app() self.start_app() time.sleep(5) # 增加稳定等待时间 # 强制检查弹窗 self.monitor.check_and_handle_popup() @staticmethod def get_sleep_time(): # return random.randint(5, 8) return random.randint(1, 3) @staticmethod def get_current_date(): return datetime.datetime.now().strftime('%Y/%m/%d') @staticmethod def get_city_info(): """ 获取所有的省市数据 :return: """ file_path = '../kailin_city.json' with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) province = {province_one["id"]: province_one for province_one in data['province']} city2province = dict() city = data['city'] for city_one in city: name = city_one['name'] pid = city_one['pid'] if len(str(pid)) > 2: pid = int(re.match('^\d{2}', str(pid)).group()) city2province[name] = province[pid]['name'] return city2province # 将30分钟后采集的数据上报到服务端 def up_data_to_service(self, collect_task_allocate_id, status, finish_status, real_count, start_time, end_time, start_page, end_page): # 1. 请求地址 url = 'http://schedule.dfwy.tech/api/collect_equipment_execute/result_report' # 2. 请求参数(字典形式) json_data = { "collect_task_allocate_id": collect_task_allocate_id, "status": status, "finish_status": finish_status, 'real_count': real_count, 'start_time': start_time, 'end_time': end_time, 'start_page': start_page, 'end_page': end_page } # 3. 发送 POST 并拿到结果 # resp = requests.post(url, json=json_data, headers=headers) resp = requests.post(url, json=json_data) # 4. 打印结果 print("状态码:", resp.status_code) # print("响应体(文本):\n", resp.text) # 纯文本 print("响应体(JSON):\n", resp.json()) # 如果返回的是 JSON,可改用这行 def get_shop_name(self): """ 获取店铺名 :return: """ try: shop_name = self.d.xpath( '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text print(f'获取到店铺名:{shop_name}') return shop_name except: try: shop_name = self.d.xpath( '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text print(f'获取到店铺名2:{shop_name}') return shop_name except Exception as e: # 点击店铺曲获取店铺名称 print("点击店铺进入后获取店铺名称") self.enter_shop() shop_xpath = '//*[@resource-id="com.sankuai.meituan:id/layout_header_view"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]//android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(shop_xpath).exists: shop_name = self.d.xpath(shop_xpath).text self.swipe_back(1) return shop_name else: print(f'获取店铺名出错:{e}') return None def get_qualification_number(self): """ 获取资质编号 :return: """ try: qualification_number_str = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text qualification_number = qualification_number_str.strip('资质编号:').strip() return qualification_number except: return None def get_shop_address(self): try: xpath = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView' if self.d.xpath(xpath).exists: shop_address = self.d.xpath(xpath).text print(f'111-获取到店铺地址:{shop_address}') if '发货时间' in shop_address: print(f'店铺地址包含发货时间,再次获取店铺地址') xpath2 = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.TextView' if self.d.xpath(xpath2).exists: shop_address = self.d.xpath(xpath2).text print(f'222-获取到店铺地址:{shop_address}') else: print(f'222-xpath2获取店铺地址失败') else: shop_address = '' print(f'333-获取到店铺地址:{shop_address}') return shop_address except: print(f'获取店铺地址出错-get_shop_address') return None def enter_detail(self): self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/recycler"]/android.widget.FrameLayout[1]').click() time.sleep(self.get_sleep_time()) def save_to_database(self, data): print(f'保存数据到数据库:{data}') # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() # add_sql = "insert into delete_friend_table(delete_user_name,delete_user_id,delete_content,delete_time) value(%s,%s,%s,%s)" add_sql = f""" INSERT INTO {self.table_name} (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform, search_key) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], self.get_current_date(), data['scrape_province'], data['availability'], data['credit_code'], data['platform'])) cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'], data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], data['scrape_date'], data['scrape_province'], data['availability'], data['credit_code'], data['platform'], data['search_key'])) conn.commit() # 提交数据 # self.mysql_client.insert(self.table_name, data) print(f"存入数据库成功") # === 新增:更新采集计数 === self.collected_count += 1 if self.task_id: # 更新上报进度 reporter.update_task_progress( task_id=self.task_id, real_count=self.collected_count ) def save_shop_info_to_database(self, data): print(f'保存店铺数据到数据库:{data}') # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() add_sql = f""" INSERT INTO {self.shop_table_name} (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.execute(add_sql, (data['shop'], data['contact_address'], data['qualification_number'], data['business_license_company'], data['business_license_address'], data['scrape_date'], data['platform'])) conn.commit() # 提交数据 # self.mysql_client.insert(self.shop_table_name, data) print(f'存入店铺信息到数据库成功') def swipe_up(self): """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) def swipe_back(self, no): """ 返回 :param no: 回退次数 :return: """ for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) def drug_price(self): """ 获取药品价格 :return: """ try: price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text price = float(re.search('[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return price except Exception as e: print(f'提取价格出错-->{e}') return None def restart_uiautomator_services(self, device_id): """ 重启atx的uiautomator 服务 :param device_id: :return: """ stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop' start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d' # result = subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) # print(result.stdout) subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) def connect_devices(self, device_id): """ 连接设备 :return: """ try: self.d = u2.connect_usb(device_id) # 设置隐形等待时间 # self.d.implicitly_wait(5) self.restart_uiautomator_services(device_id) print(f'连接到设备:{device_id}') except Exception as e: print(f'{device_id} 连接错误: {e}') raise Exception(e) def get_ocr_res(self, img): try: # img地址 print(f'开始识别图片:{img}') request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license" # 二进制方式打开图片文件 f = open(img, 'rb') img = base64.b64encode(f.read()) params = {"image": img} # access_token = get_access_token() request_url = request_url + "?access_token=" + self.access_token headers = {'content-type': 'application/x-www-form-urlencoded'} response = requests.post(request_url, data=params, headers=headers) if response: res = response.json() new_dic = dict() for ite in res['words_result'].keys(): new_dic[ite] = res['words_result'][ite]['words'] print('资质数据信息', new_dic) return new_dic else: return None except: return None def remove_watermark(self, img_path): """ 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据 :param img_path: 图片路径 :return: 二进制图片数据 """ img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) endswith = os.path.splitext(img_path)[1] new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8) _, img_binary = cv2.imencode(endswith, new) return img_binary def get_ocr_res_image(self, img): try: image = self.remove_watermark(img) # image_file = open(img,'wb') # image_file.write(image) # res_image = self.client.basicAccurate(image) # 高精度 res_image = self.client.basicGeneral(image) # print(f'百度api返回结果:{res_image}') # print(res_image.get('words_result', '')) # new_dic = dict() data = res_image.get('words_result', '') print(f'百度api返回结果:{data}') # full_text = ';'.join(item['words'] for item in data) # address = '' # for item in data: # if '企业注册号' in item['words']: # print('come in 111') # reg_number = item['words'].split(':', 1)[1].strip() # elif '企业名称' in item['words']: # print('come in 222') # company_name = item['words'].split(':', 1)[1].strip() # elif '所:' in item['words']: # print('come in 333') # address = item['words'].split(':', 1)[1].strip() # # 输出结果 # print("企业注册号:", reg_number) # print("企业名称:", company_name) # print("住所:", address) return data except: return None def screenshot_the_business_license(self, qualification_number): screenshot_path = 'screenshot1.png' self.d.screenshot(screenshot_path) img = cv2.imread(screenshot_path) # 指定裁剪区域 (left, top, right, bottom) left = 0 top = 480 right = 720 bottom = 1420 cropped_img = img[top:bottom, left:right] if qualification_number: cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' else: cropped_screenshot_path = 'cropped_screenshot.png' cv2.imwrite(cropped_screenshot_path, cropped_img) return cropped_screenshot_path def screenshot_instruction(self): # 获取当前时间 current_time = datetime.datetime.now() # 格式化为时分秒 time_str = current_time.strftime("%H-%M-%S") # 生成随机的 8 位字符串 random_str = secrets.token_hex(4) # 生成 4 个字节的随机字符串,转换为 8 位十六进制字符串 print(time_str) screenshot_path = 'instructionscreenshot1-' + time_str + '-' + random_str + '.png' self.d.screenshot(screenshot_path) return screenshot_path def extract_specification(self, text): """提取药品规格信息""" # 方法1:简单去除到期信息 pattern = r'^[^【]+' match = re.search(pattern, text) if match: return match.group(0).strip() return text # 获取商品title def get_title(self): # try: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text # except: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text def _inner(): temp_search_key = self.search_key if "999" in self.search_key: if self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = self.search_key.replace("999皮炎平", "") elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30': temp_search_key = self.search_key.replace("999必无忧", "") elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': temp_search_key = self.search_key.replace("999必无忧", "") elif self.search_key == '999速复康布洛芬缓释胶囊': temp_search_key = self.search_key.replace("999速复康", "") elif self.search_key == '999选平硝酸咪康唑乳膏20g': temp_search_key = self.search_key.replace("999选平", "") elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': temp_search_key = self.search_key.replace("999皮炎平", "") else: temp_search_key = self.search_key.replace("999", "") else: if self.search_key == '史达功右美沙芬愈创甘油醚糖浆120': temp_search_key = self.search_key.replace("史达功", "") temp_search_key = temp_search_key.replace("120", "") elif self.search_key == '三九胃泰养胃舒颗粒8袋': temp_search_key = self.search_key.replace("三九胃泰", "") temp_search_key = temp_search_key.replace("8袋", "") elif self.search_key == '今维多赐多康牌蛋白粉': temp_search_key = self.search_key.replace("今维多", "") elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4': temp_search_key = self.search_key.replace("佳美舒", "") temp_search_key = temp_search_key.replace("4", "") elif self.search_key == '三九胃泰颗粒20g*10': temp_search_key = self.search_key.replace("20g*10", "") elif self.search_key == '三九胃泰颗粒20g*6袋': temp_search_key = self.search_key.replace("20g*6袋", "") elif self.search_key == '顺峰康王酮康他索乳膏': temp_search_key = self.search_key.replace("顺峰康王", "") if self.search_key == '999糠酸莫米松凝胶15': temp_search_key = temp_search_key.replace("15", "") elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = temp_search_key.replace("30", "") elif self.search_key == '999复方金银花颗粒10g': temp_search_key = temp_search_key.replace("10g", "") elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒': temp_search_key = temp_search_key.replace("15g*15袋/盒", "") elif self.search_key == '999复方氨酚烷胺胶囊6粒': temp_search_key = temp_search_key.replace("6粒", "") elif self.search_key == '999可调式生理性海水鼻腔喷雾50': temp_search_key = temp_search_key.replace("50", "") elif self.search_key == '999止泻利颗粒15g*8': temp_search_key = temp_search_key.replace("15g*8", "") elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30': temp_search_key = temp_search_key.replace("30", "") elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': temp_search_key = temp_search_key.replace("15g", "") elif self.search_key == '999复方苦参肠炎康片12片': temp_search_key = temp_search_key.replace("12片", "") elif self.search_key == '999强力枇杷露16袋': temp_search_key = temp_search_key.replace("16袋", "") elif self.search_key == '999三蛇胆川贝膏138': temp_search_key = temp_search_key.replace("138", "") elif self.search_key == '999强力枇杷露120ml': temp_search_key = temp_search_key.replace("120ml", "") elif self.search_key == '999强力枇杷露150ml': temp_search_key = temp_search_key.replace("150ml", "") elif self.search_key == '999抗病毒口服液10ml*10': temp_search_key = temp_search_key.replace("10ml*10", "") elif self.search_key == '999抗病毒口服液10ml*12': temp_search_key = temp_search_key.replace("10ml*12", "") elif self.search_key == '999糠酸莫米松乳膏10g支': temp_search_key = temp_search_key.replace("10g支", "") elif self.search_key == '999选平硝酸咪康唑乳膏20g': temp_search_key = temp_search_key.replace("20g", "") elif self.search_key == '999感冒清热颗粒(无糖)6g': temp_search_key = temp_search_key.replace("(无糖)6g", "") elif self.search_key == '999壮骨关节丸6g*20': temp_search_key = temp_search_key.replace("6g*20", "") elif self.search_key == '999正天丸6g*15': temp_search_key = temp_search_key.replace("6g*15", "") elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': temp_search_key = temp_search_key.replace("20", "") elif self.search_key == '999糠酸莫米松凝胶10': temp_search_key = temp_search_key.replace("10", "") elif self.search_key == '999板蓝根颗粒10g*20': temp_search_key = temp_search_key.replace("10g*20", "") elif self.search_key == '999复方氨酚烷胺胶囊10粒': temp_search_key = temp_search_key.replace("10粒", "") elif self.search_key == '999复方氨酚烷胺胶囊12粒': temp_search_key = temp_search_key.replace("12粒", "") elif self.search_key == '999咽炎片0.26g*12片*2板': temp_search_key = temp_search_key.replace("0.26g*12片*2板", "") elif self.search_key == '999小儿止咳糖浆120': temp_search_key = temp_search_key.replace("120", "") elif self.search_key == '999小儿止咳糖浆225': temp_search_key = temp_search_key.replace("225", "") elif self.search_key == '999小儿感冒颗粒6g*10': temp_search_key = temp_search_key.replace("6g*10", "") elif self.search_key == '999小儿感冒颗粒6g*24': temp_search_key = temp_search_key.replace("6g*24", "") elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋': temp_search_key = temp_search_key.replace("6g*10袋", "") elif self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋': temp_search_key = temp_search_key.replace("6g*20袋", "") elif self.search_key == '999小儿咽扁颗粒8g*10袋': temp_search_key = temp_search_key.replace("8g*10袋", "") elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋': temp_search_key = temp_search_key.replace("2.5g*10袋", "") elif self.search_key == '999感冒清热颗粒12g*18': temp_search_key = temp_search_key.replace("12g*18", "") # elif self.search_key == '三九胃泰颗粒': # self.search_key = '三九胃泰' #兼容三九胃泰 温胃舒颗粒 print(f'获取商品title时的搜索关键字:{temp_search_key}') # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text # 初始化 drugs_name = '' specifications = '' title = '' # 循环的获取title为了有时间来处理人机验证 for m in range(1, 6000): if self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').exists: title = self.safe_exec( lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text ) print(f"第{m}次获取title成功") break else: time.sleep(3) # return drugs_name, specifications # drugs_name = '' # specifications = '' # try: # title_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' # title_xpath_2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' # if self.d.xpath(title_xpath).exists: # title = self.d.xpath(title_xpath).text # print(f"title_xpath获取的title={title}") # if temp_search_key not in title: # return drugs_name, specifications # elif self.d.xpath(title_xpath_2).exists: # title = self.d.xpath(title_xpath_2).text # print(f"title_xpath_2获取的title={title}") # if temp_search_key not in title: # return drugs_name, specifications # else: # print('title_xpath不存在,请确认') # return drugs_name, specifications # # title = self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text # except Exception as e: # print(f"发生异常: {e}") # return drugs_name, specifications # 奇怪:有的时候title取出来的记过第一位会多一个0 # title = self.safe_exec(self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text) # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text title = title[1:] if title.startswith('0') else title print(f'获取到药品标题:{title}') # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) if self.search_key == '999赐多康大豆': return title, '1罐' if self.search_key == "999感冒清热颗粒": match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) else: match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title) if match: # drugs_name = match.group(1).strip() + match.group(2).strip() drugs_name = title specifications = match.group(3).strip() print("药品名:", drugs_name) print("规格:", specifications) # 如果品规中包含到期则需要再次的正则处理 if '到期' in specifications: specifications = self.extract_specification(specifications) # print('完整药名:', drugs_name + specifications) return drugs_name, specifications else: if title == '999抗病毒口服液10ml*12' or title == '999抗病毒口服液': drugs_name = title specifications = '10ml*12支/盒' return drugs_name, specifications elif title == '999抗病毒口服液10ml*10': drugs_name = title specifications = '10ml*10支/盒' return drugs_name, specifications elif title == '999小柴胡颗粒': drugs_name = title specifications = '10g*9袋/盒' return drugs_name, specifications elif title == '999养胃舒颗粒': drugs_name = title specifications = '10g*6袋/盒' return drugs_name, specifications elif title == '三九胃泰胶囊': drugs_name = title specifications = '0.5g*24粒/盒' return drugs_name, specifications elif title == '999补脾益肠丸': drugs_name = title specifications = '6g*15袋/盒' return drugs_name, specifications elif title == '999复方感冒灵颗粒': drugs_name = title specifications = '14g*9袋/盒' return drugs_name, specifications else: print("没有匹配到预期格式") drugs_name = title specifications = '' return drugs_name, specifications # 用 safe_exec 包装内部逻辑,确保验证码阻塞 return self.safe_exec(_inner) def enter_shop(self): """ 进店,方便提取资质环境 :return: """ # self.d.xpath('//*[@text="进店"]').click() self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) def enter_shoper(self): """ 进入商家 :return: """ is_shoper_exists = 0 for i in range(10): if self.d.xpath('//*[@text="商家"]').exists: print(f'第{i}次商家存在') is_shoper_exists = 1 break else: print(f'第{i}次商家不存在') time.sleep(self.get_sleep_time()) if is_shoper_exists == 1: self.d.xpath('//*[@text="商家"]').click() time.sleep(self.get_sleep_time()) return True else: return False # 点击查看商家资质 def scan_shoper_license(self): exist_shoper = 0 for i in range(10): if self.d.xpath('//*[@text="查看商家资质"]').exists: print(f'第{i}次查看商家资质存在') exist_shoper = 1 break else: print(f'第{i}次查看商家资质不存在') time.sleep(self.get_sleep_time()) if exist_shoper == 1: self.d.xpath('//*[@text="查看商家资质"]').click() time.sleep(self.get_sleep_time()) else: self.swipe_back(1) # 验证商品的信息是否在数据库中已存在 def data_is_exists(self, data): """ 检查指定数据是否已存在于数据库表中(仅检查存在性) 参数: data: 包含查询条件的字典,键为列名,值为条件值 返回: True: 数据存在 False: 数据不存在 None: 检查过程中出错 """ # dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, # 'platform': '美团'} # 1. 验证必要字段 required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] logging.error(f"缺少必要字段: {', '.join(missing)}") return None try: # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() # query_sql = f"SELECT * FROM {self.table_name} WHERE product = '{data['product']}' AND min_price = '{data['min_price']}' AND shop = '{data['shop']}' AND scrape_date = '{data['scrape_date']}' AND platform = '{data['platform']}'" # cur.execute(query_sql) query_sql = """ SELECT * FROM {} WHERE product = %s AND min_price = %s AND shop = %s AND scrape_date = %s AND platform = %s """.format(self.table_name) cur.execute(query_sql, ( data['product'], data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") # 验证店铺信息是否在数据库中已存在 def shop_is_exists_database(self, shop): try: # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() query_sql = """ SELECT * FROM {} WHERE shop = %s """.format(self.shop_table_name) cur.execute(query_sql, ( shop )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") def wait_if_verifying(self, monitor, timeout=120): """验证码处理期间阻塞主线程""" start = time.time() while monitor.pausing.is_set() and time.time() - start < timeout: time.sleep(1) # def safe_xpath(self, xpath, timeout=10): # """线程安全 xpath 查找""" # self.wait_if_verifying(self.monitor) # return self.d.xpath(xpath).wait(timeout=timeout) def wait_for_ready(self, monitor, timeout=86400): """进入每一页前都先等验证码""" start = time.time() while monitor.pausing.is_set() and time.time() - start < timeout: time.sleep(1) # 额外保险:如果验证码突然在这一秒才弹,再主动扫一次 monitor.check_and_handle_popup() def safe_list(self, xpath, monitor): """线程安全地拿商品列表""" self.wait_for_ready(monitor) return self.d.xpath(xpath).all() def safe_exec(self, func, *args, **kwargs): """ 万能安全壳:执行 func 前检查验证码, 若监控线程已置位 pausing,则一直阻塞直到放行。 """ # 强制等待一小段时间,让监控线程有机会检测 time.sleep(0.1) while self.monitor.pausing.is_set(): time.sleep(1) # 双重检查 # if self.monitor.pausing.is_set(): # print("检测到暂停标志,等待验证码处理...") # while self.monitor.pausing.is_set(): # time.sleep(1) # 执行前再快速检查一次 # self.monitor.check_and_handle_popup() # 执行真正逻辑 return func(*args, **kwargs) def get_next_data(self, data, target): for i, item in enumerate(data): if item['words'] == target: if i + 1 < len(data): return data[i + 1]['words'] return None def delete_instruction_screenshot(self, screenshot_path): # 删除截图文件 try: os.remove(screenshot_path) print(f"截图文件已删除:{screenshot_path}") except FileNotFoundError: print(f"文件未找到,无法删除:{screenshot_path}") except Exception as e: print(f"删除文件时出错:{e}") ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="查看详细说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="加载更多"]').click_exists() loop_page = 5 # new_list = list() new_list = [] for i in range(loop_page): self.d.xpath('//*[@text="加载更多"]').click_exists() time.sleep(0.2) if i == 0: self.d.swipe(200, 1000, 200, 300, 0.4) else: self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() time.sleep(0.2) all_tt = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all() for idx in range(1, len(all_tt) + 1): all_tt1 = self.d.xpath( f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all() # print(f'当前说明书列表数据:{all_tt1}') for tt in all_tt1: if tt.text and tt.text != '展开全文': new_list.append(tt.text) if i == 0: height = 938 else: drug_box = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info bounds = drug_box['bounds'] height = bounds['bottom'] - bounds['top'] if height < 938: # print('说明书翻页到底部') break # 展开全文 new_list = [item for item in new_list if item != '展开全文'] print(f'当前说明书列表数据:{new_list}') # expiry_date_index = next(idx for idx, i in enumerate(new_list) if i == '有效期') # manufacturer_index = next(idx for idx, i in enumerate(new_list) if i == '生产单位') # approval_number_index = next(idx for idx, i in enumerate(new_list) if i == '批准文号') # res_data = { # "有效期": new_list[expiry_date_index + 1], # "生产单位": new_list[manufacturer_index + 1], # "批准文号": new_list[approval_number_index + 1] # } res_data = { "有效期": (new_list[new_list.index("有效期") + 1]) if "有效期" in new_list and new_list.index("有效期") + 1 < len(new_list) else "", "生产单位": (new_list[new_list.index("生产单位") + 1]) if "生产单位" in new_list and new_list.index("生产单位") + 1 < len(new_list) else "", "批准文号": (new_list[new_list.index("批准文号") + 1]) if "批准文号" in new_list and new_list.index("批准文号") + 1 < len(new_list) else "" } print(f'当前说明书字典数据:{res_data}') return res_data ''' ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="查看详细说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) # 1) 先向上滑动一次,触发“加载更多”出现 self.d.swipe(200, 1000, 200, 300, 0.4) time.sleep(0.3) # 2) 再进入“出现就点”的循环 while self.d.xpath('//*[@text="加载更多"]').click_exists(timeout=1): time.sleep(0.2) self.d.swipe(200, 1000, 200, 300, 0.4) # self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) # 一次性获取所有文本 texts = [ node.text.strip() # for node in self.d.xpath('//android.widget.TextView').all() for node in self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.TextView').all() if node.text and node.text.strip() and node.text != '加载更多' ] print(f'当前说明书列表数据:{texts}') # 提取关键字段 def safe_get(key): # try: # idx = texts.index(key) # return texts[idx + 1] if idx + 1 < len(texts) else "" # except ValueError: # return "" try: idx = next(i for i, text in enumerate(texts) if text == key) return texts[idx + 1] if idx + 1 < len(texts) else "" except StopIteration: return "" res_data = { "有效期": safe_get("有效期"), "生产单位": safe_get("生产单位"), "批准文号": safe_get("批准文号") } print(f'当前说明书字典数据:{res_data}') return res_data ''' ''' def get_instructions_data(self): """ 说明书键值对采集:连续两个 TextView 为一对,精确提取 """ # 1. 进入说明书 self.d(text="说明").click() time.sleep(0.5) self.d(text="查看详细说明").click() time.sleep(0.5) # self.d(text="加载更多").click_exists(timeout=0.5) # 2. 找到说明书最外层 ScrollView(页面主体) scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.ScrollView") count = scroll_view.count print(f"找到的 ScrollView 数量: {count}") if not scroll_view.exists: return {"有效期": "", "生产单位": "", "批准文号": ""} # 3. 在 ScrollView 内再定位真正包含键值对的容器 # 绝大多数美团说明书页面对应的是 ScrollView > ViewGroup > 若干 TextView kv_container = scroll_view.child(className="android.view.ViewGroup") if not kv_container.exists: kv_container = scroll_view # 降级:直接对 ScrollView 取子孙 TextView # 4. 滑动到底并收集所有 TextView(保留顺序) all_texts = [] max_swipe = 5 last_length = 0 for _ in range(max_swipe): texts = kv_container.child(className="android.widget.TextView") #获取texts中的文本 print(f'当前说明书列表数据:{texts}') current_texts = [] self.loggerMT.info(f'说明书111') for tv in texts: try: txt = tv.get_text().strip() # txt = tv.info['text'].strip() except Exception: continue if txt and txt != "展开全文": current_texts.append(txt) self.loggerMT.info(f'说明书222') print(f'当前说明书列表数据:{current_texts}') # 去重 if current_texts: current_texts = [t for t in current_texts if t not in all_texts] all_texts.extend(current_texts) # 判断是否到底 # if not scroll_view.info.get("scrollable"): # break # 判断是否到底 if len(all_texts) == last_length: break last_length = len(all_texts) # self.d.swipe_ext("up", scale=0.7) #向上滑动一次 self.d.swipe(200, 1000, 200, 300, 0.2) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() # 5. 成对解析 res_data = {"有效期": "", "生产单位": "", "批准文号": ""} for i in range(len(all_texts) - 1): key = all_texts[i] val = all_texts[i + 1] if key in res_data: res_data[key] = val print(f'说明书文本共 {len(all_texts)} 条,提取结果: {res_data}') # time.sleep(1000000) return res_data ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) if self.d.xpath('//*[@text="查看详细说明"]').exists: self.d.xpath('//*[@text="查看详细说明"]').click() else: for i in range(8): if self.d.xpath('//*[@text="查看全部"]').exists: print('开始点击查看全部') break self.d.swipe_ext('down', 0.3) time.sleep(1) if self.d.xpath('//*[@text="查看全部"]').exists: print('开始点击查看全部2') break if self.d.xpath('//*[@text="查看全部"]').exists: self.d.xpath('//*[@text="查看全部"]').click() else: res_data = { "有效期": '', "生产单位": '', "批准文号": '' } self.loggerMT.info('获取到的说明书信息为空。') return res_data # time.sleep(random.randint(3, 5)) time.sleep(0.5) # self.d.xpath('//*[@text="加载更多"]').click_exists() # loop_page = 5 # new_list = list() # new_list = [] for ii in range(8): if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() time.sleep(0.2) break else: self.d.swipe(200, 1000, 200, 300, 0.3) for iii in range(10): if self.d.xpath('//*[@text="生产单位"]').exists and self.d.xpath('//*[@text="批准文号"]').exists: break else: self.d.swipe(200, 1300, 200, 300, 0.3) instruction_path = self.screenshot_instruction() print(f"instruction_path= {instruction_path}") time.sleep(2) ocr_res = self.get_ocr_res_image(instruction_path) # print(f'ocr_res:{ocr_res}') if ocr_res: # 获取有效期的下一个数据 validity = self.get_next_data(ocr_res, '有效期') # 获取批准文号的下一个数据 approval_number = self.get_next_data(ocr_res, '批准文号') # 获取生产单位的下一个数据 manufacturer = self.get_next_data(ocr_res, '生产单位') else: validity = '' approval_number = '' manufacturer = '' # print("有效期:", validity) # print("批准文号:", approval_number) # print("生产单位:", manufacturer) res_data = { "有效期": validity, "生产单位": manufacturer, "批准文号": approval_number } print(f"res_data={res_data}") time.sleep(1) self.delete_instruction_screenshot(instruction_path) return res_data def has_instructions(self): """ 是否有说明书 :return: """ # 没有说明书的无法采集具体数据 time.sleep(self.get_sleep_time()) is_has_instructions = False for i in range(8): if self.d.xpath('//*[@text="说明"]').exists: print(f"第{i}次有说明书1") is_has_instructions = True break self.d.swipe_ext('down', 0.3) time.sleep(1) # detail_info = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info # bounds = detail_info['bounds'] # height = bounds['bottom'] - bounds['top'] # if self.d.xpath('//*[@text="进店"]').exists and height > 100: if self.d.xpath('//*[@text="说明"]').exists: is_has_instructions = True print(f"第{i}次有说明书2") break # is_has_instructions = self.d.xpath('//*[@text="说明"]').exists return is_has_instructions def has_shop(self): """ 是否有进店按钮 :return: """ # self.d.swipe_ext('up', 0.1) time.sleep(self.get_sleep_time()) is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists return is_has_enter_shop # 获取商品对应的店铺信息 def get_license_info_ex(self): # self.enter_shop() self.safe_exec(self.enter_shop) # self.enter_shoper() result = self.safe_exec(self.enter_shoper) if result == False: license_info_data = {'contact_address': '', 'qualification_number': '', 'business_license_company': '', 'business_license_address': ''} return license_info_data for i in range(10): if self.d.xpath('//*[@text="查看商家资质"]').exists: print(f"第{i}次有商家资质") break else: print(f"第{i}次没有商家资质") time.sleep(self.get_sleep_time()) # 获取地址 # contact_address = self.get_shop_address() contact_address = self.safe_exec(self.get_shop_address) # time.sleep(50000) ### # self.scan_shoper_license() self.safe_exec(self.scan_shoper_license) # 获取资质编码 # qualification_number = self.get_qualification_number() qualification_number = self.safe_exec(self.get_qualification_number) # qualification_number 不为None继续下一步 if qualification_number: # 营业执照公司名称 business_license_company = '' # 营业执照地址 business_license_address = '' self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) cropped_screenshot_path = self.screenshot_the_business_license(qualification_number) print(f'cropped_screenshot_path:{cropped_screenshot_path}') # if qualification_number: # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' # else: # cropped_screenshot_path = 'cropped_screenshot.png' # ocr_res = self.get_ocr_res('cropped_screenshot.png') ocr_res = self.get_ocr_res(cropped_screenshot_path) print(f'ocr_res:{ocr_res}') # 获取ocr_res 中的地址、单位名称 if ocr_res: if '单位名称' in ocr_res.keys(): business_license_company = ocr_res['单位名称'] if '地址' in ocr_res.keys(): business_license_address = ocr_res['地址'] license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number, 'business_license_company': business_license_company, 'business_license_address': business_license_address} else: license_info_data = {'contact_address': contact_address, 'qualification_number': '', 'business_license_company': '', 'business_license_address': ''} return license_info_data """暂不用该功能 def get_license_info(self): self.enter_shop() self.enter_shoper() self.scan_shoper_license() # 获取资质编码 qualification_number = self.get_qualification_number() if qualification_number: table_license_info = self.get_table_license_info(qualification_number) if table_license_info: return { '单位名称': table_license_info[0], '地址': table_license_info[1], '社会信用代码': table_license_info[2] } else: # operate_no = random.randint(0, 1) self.d.click(0.603, 0.27) # if operate_no == 0: # self.d.xpath('//*[@text="营业执照"]').click() # else: # self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) self.screenshot_the_business_license() ocr_res = self.get_ocr_res('cropped_screenshot.png') return ocr_res # operate_no = random.randint(0, 1) self.d.click(0.603, 0.27) # if operate_no == 0: # self.d.xpath('//*[@text="营业执照"]').click() # else: # self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) self.screenshot_the_business_license() ocr_res = self.get_ocr_res('cropped_screenshot.png') return ocr_res """ def distinct_target(self): result = False position_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]' position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]' is_position = self.d.xpath(position_xpath).exists is_position2 = self.d.xpath(position_xpath2).exists xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' is_position5 = self.d.xpath(xpath).exists is_position6 = self.d.xpath(xpath2).exists is_position7 = self.d.xpath(xpath3).exists is_position8 = self.d.xpath(xpath4).exists # print(f"is_position = {is_position}") # print(f"is_position2 = {is_position2}") if is_position or is_position2 or is_position5 or is_position6 or is_position7 or is_position8: result = True return result # return is_position def click_element_with_retry(self, xpath, max_retries=5, timeout=5): """ 带重试机制的点击函数 """ for attempt in range(max_retries): try: if self.d.xpath(xpath).exists: self.d.xpath(xpath).click() print(f"第{attempt + 1}次尝试点击成功") return True else: print(f"第{attempt + 1}次尝试:元素不存在") except Exception as e: print(f"第{attempt + 1}次尝试失败: {e}") if attempt < max_retries - 1: time.sleep(1) # 等待1秒后重试 print(f"经过{max_retries}次尝试后点击失败") return False def enter_target_page(self): self.d.xpath('//*[@content-desc="看病买药"]').click() time.sleep(self.get_sleep_time()) self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click() time.sleep(self.get_sleep_time()) self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click() time.sleep(self.get_sleep_time()) self.d.send_keys(self.search_key, clear=True) time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="搜索"]').click() time.sleep(self.get_sleep_time()) # content_frame = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]').exists # print(content_frame) # position_xpath1 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]' # position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]' # if self.d.xpath(position_xpath1).exists: # print("position_xpath1 exist") # elif self.d.xpath(position_xpath2).exists: # print("position_xpath2 exist") # else: # print("position_xpath not exist") # time.sleep(10000) # 增加点击快递送 self.click_express_send() time.sleep(self.get_sleep_time()) # 增加点击价格每次进来都需要点击,为了配合从多少页到多少页的采集 # 使用 self.click_element_with_retry('//*[@text="价格"]') # self.d.xpath('//*[@text="价格"]').click() time.sleep(self.get_sleep_time()) def click_express_send(self): # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.HorizontalScrollView[last()]' slide_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' slide_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' slide_xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' slide_xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' for i in range(1, 3): if self.d.xpath(slide_xpath).exists: bounds = self.d.xpath(slide_xpath).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动1') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break elif self.d.xpath(slide_xpath2).exists: bounds = self.d.xpath(slide_xpath2).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动2') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break elif self.d.xpath(slide_xpath3).exists: bounds = self.d.xpath(slide_xpath3).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动3') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break elif self.d.xpath(slide_xpath4).exists: bounds = self.d.xpath(slide_xpath4).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动4') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' # print(f"xpath:{xpath}") # scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.HorizontalScrollView") if self.d.xpath(xpath).exists: self.d.xpath(xpath).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath快递送成功") time.sleep(self.get_sleep_time()) break elif self.d.xpath(xpath2).exists: self.d.xpath(xpath2).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath2快递送成功") time.sleep(self.get_sleep_time()) break elif self.d.xpath(xpath3).exists: self.d.xpath(xpath3).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath3快递送成功") time.sleep(self.get_sleep_time()) break elif self.d.xpath(xpath4).exists: self.d.xpath(xpath4).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath4快递送成功") time.sleep(self.get_sleep_time()) break else: print(f"第{idx}次点击xpath或xpath2或xpath3快递送都失败") time.sleep(self.get_sleep_time()) # xpath2= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' # if self.d.xpath(xpath2).exists: # self.d.xpath(xpath2).click() # print(f"第{idx}次点击xpath2快递送成功") # time.sleep(self.get_sleep_time()) # break """暂不用该功能 def get_table_license_info(self, qualification_number): try: sql = f'select business_license_company,city,credit_code from mt_drug where credit_code = "{qualification_number}"' self.mysql_client.cur.execute(sql) res = self.mysql_client.cur.fetchone() return res except: return None """ # def get_clipboard(self): # """通过ADB获取Android手机剪贴板内容""" # try: # result = subprocess.run( # ["adb", "shell", "am", "broadcast", "-a", "clipper.get"], # capture_output=True, # text=True, # timeout=5 # ) # print(f"获取剪贴板结果: {result.stdout}") # # 解析返回信息中的剪贴板内容 # for line in result.stdout.splitlines(): # if "data=" in line: # return line.split("data=")[1].strip() # return "" # except Exception as e: # print("获取剪贴板失败:", e) # return "" # def get_clipboard(self): # """读取 Android 剪贴板(系统自带命令)""" # try: # text = subprocess.check_output( # ["adb", "shell", "cmd", "clipboard", "get"], # text=True, timeout=5, stderr=subprocess.STDOUT # ).strip() # print(f"获取剪贴板结果: {text}") # return text if text else "" # except Exception as e: # print("获取剪贴板失败:", e) # return "" def get_clipboard(self): time.sleep(1) self.loggerMT.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息 clipboard_content = self.d.clipboard if clipboard_content is None: return '' return clipboard_content.strip() # return self.d.clipboard.strip() def clear_clipboard(self): self.d.set_clipboard("", "text/plain") # def clear_clipboard(self): # """清空手机剪贴板:写入空字符串(subprocess 版)""" # try: # subprocess.run( # ["adb", "shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", " "], # check=True, # capture_output=True, # text=True, # timeout=5 # ) # except subprocess.CalledProcessError as e: # print("ADB 清空失败:", e.stderr) # def clear_clipboard(): # """清空手机剪贴板:写入空字符串""" # try: # adb_shell(["shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", ""]) # except subprocess.CalledProcessError as e: # print("ADB 清空失败:", e.output) # 获取一个商品的数据、商品对应的店铺的数据 def get_product_link(self): product_link = '' # 两种可能的“···”按钮 dots_xpaths = [ '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]', '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]', '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]' ] max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): if product_link: # 已经拿到则退出 break for xp in dots_xpaths: if self.d.xpath(xp).exists: print(f'{idx}-进入分享点点点') self.loggerMT.info(f'{idx}-进入分享点点点') # #先清空剪贴板的内容 # self.clear_clipboard() # print("清空剪贴板内容成功。") self.d.xpath(xp).click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) link_xpath = '//*[@text="复制链接"]' if self.d.xpath(link_xpath).exists: self.d.xpath(link_xpath).click() time.sleep(1) product_link = self.get_clipboard() time.sleep(0.5) print(f'{idx}-商品链接:{product_link}') self.loggerMT.info(f'{idx}-商品链接:{product_link}') break # 找到并执行后跳出内层循环 else: print(f'{idx}-商品链接:{product_link}') self.loggerMT.info(f'{idx}-商品链接:{product_link}') product_link = '' # self.d.xpath('//*[@text="复制链接"]').click_exists() # time.sleep(1) # product_link = self.get_clipboard() # time.sleep(0.5) # print(f'{idx}-商品链接:{product_link}') # self.loggerMT.info(f'{idx}-商品链接:{product_link}') # break # 找到并执行后跳出内层循环 if not product_link and idx < max_retry: time.sleep(0.5) # 最后一次不需要再等待 return product_link def integrate_data(self): # 测试说明书详情: # instructions_info = self.safe_exec(self.get_instructions_data) # time.sleep(1000000) # 测试店铺信息 # license_info = self.safe_exec(self.get_license_info_ex) # time.sleep(1000000) # 测试定位地址 # 获取链接开始 # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text # 1、点击页面的... 先判断元素是否存在 ''' if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('1-进入分享点点点111') self.loggerMT.info('1-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'1-商品链接:{product_link}') self.loggerMT.info(f'1-商品链接:{product_link}') #清空剪切板 # self.clear_clipboard() # if self.d.xpath('//*[@text="加载更多"]').click_exists(): # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() # if self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').exists: # self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').click() # #获取剪切板的数据 # product_link = self.get_clipboard() # time.sleep(0.5) # print(f'商品链接:{product_link}') # #清空剪切板 # self.clear_clipboard() # else: # print('未找到分享按钮111') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('1-进入分享点点点222') self.loggerMT.info('1-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'1-商品链接:{product_link}') self.loggerMT.info(f'1-商品链接:{product_link}') #如果为获取到product_link 则等待0.5秒再获取 if not product_link: time.sleep(0.5) if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('2-进入分享点点点111') self.loggerMT.info('2-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'2-商品链接:{product_link}') self.loggerMT.info(f'2-商品链接:{product_link}') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('2-进入分享点点点222') self.loggerMT.info('2-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'2-商品链接:{product_link}') self.loggerMT.info(f'2-商品链接:{product_link}') #如果为获取到product_link 则等待0.5秒再获取 if not product_link: time.sleep(0.5) if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('3-进入分享点点点111') self.loggerMT.info('3-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'3-商品链接:{product_link}') self.loggerMT.info(f'3-商品链接:{product_link}') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('3-进入分享点点点222') self.loggerMT.info('3-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'3-商品链接:{product_link}') self.loggerMT.info(f'3-商品链接:{product_link}') ''' # 获取链接结束 """ 整合数据 :return: """ # title_info = self.get_title() # 药品,规格 # title_info = self.safe_exec(self.get_title) # 药品,规格 product, specifications = self.safe_exec(self.get_title) # 药品,规格 if product: # product, specifications = title_info # 如果关键字包含999 则 product必须包含999 和 999后面的那段字符串 ps 999感冒灵颗粒必须包含:"999"和"感冒灵颗粒" if '999' in self.search_key: if self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = self.search_key.replace('999皮炎平', '') elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30': temp_search_key = self.search_key.replace('999必无忧', '') elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': temp_search_key = self.search_key.replace('999必无忧', '') elif self.search_key == '999速复康布洛芬缓释胶囊': temp_search_key = self.search_key.replace('999速复康', '') elif self.search_key == '999选平硝酸咪康唑乳膏20g': temp_search_key = self.search_key.replace('999选平', '') elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': temp_search_key = self.search_key.replace('999皮炎平', '') else: temp_search_key = self.search_key.replace('999', '') if self.search_key == '999糠酸莫米松凝胶15': temp_search_key = temp_search_key.replace('15', '') elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = temp_search_key.replace('30', '') elif self.search_key == '999抗病毒口服液10ml*6支/盒': temp_search_key = temp_search_key.replace("10ml*6支/盒", "") elif self.search_key == '999复方金银花颗粒10g': temp_search_key = temp_search_key.replace("10g", "") elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒': temp_search_key = temp_search_key.replace("15g*15袋/盒", "") elif self.search_key == '999复方氨酚烷胺胶囊6粒': temp_search_key = temp_search_key.replace("6粒", "") elif self.search_key == '999可调式生理性海水鼻腔喷雾50': temp_search_key = temp_search_key.replace("50", "") elif self.search_key == '999止泻利颗粒15g*8': temp_search_key = temp_search_key.replace("15g*8", "") elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30': temp_search_key = temp_search_key.replace("30", "") elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': temp_search_key = temp_search_key.replace("15g", "") elif self.search_key == '999复方苦参肠炎康片12片': temp_search_key = temp_search_key.replace("12片", "") elif self.search_key == '999强力枇杷露16袋': temp_search_key = temp_search_key.replace("16袋", "") elif self.search_key == '999三蛇胆川贝膏138': temp_search_key = temp_search_key.replace("138", "") elif self.search_key == '999抗病毒口服液10ml*12': temp_search_key = temp_search_key.replace("10ml*12", "") elif self.search_key == '999抗病毒口服液10ml*10': temp_search_key = temp_search_key.replace("10ml*10", "") elif self.search_key == '999强力枇杷露120ml': temp_search_key = temp_search_key.replace("120ml", "") elif self.search_key == '999强力枇杷露150ml': temp_search_key = temp_search_key.replace("150ml", "") elif self.search_key == '999糠酸莫米松乳膏10g支': temp_search_key = temp_search_key.replace("10g支", "") elif self.search_key == '999选平硝酸咪康唑乳膏20g': temp_search_key = temp_search_key.replace("20g", "") elif self.search_key == '999感冒清热颗粒(无糖)6g': temp_search_key = temp_search_key.replace("(无糖)6g", "") elif self.search_key == '999壮骨关节丸6g*20': temp_search_key = temp_search_key.replace("6g*20", "") elif self.search_key == '999正天丸6g*15': temp_search_key = temp_search_key.replace("6g*15", "") elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': temp_search_key = temp_search_key.replace("20", "") elif self.search_key == '999糠酸莫米松凝胶10': temp_search_key = temp_search_key.replace("10", "") elif self.search_key == '999板蓝根颗粒10g*20': temp_search_key = temp_search_key.replace("10g*20", "") elif self.search_key == '999复方氨酚烷胺胶囊10粒': temp_search_key = temp_search_key.replace("10粒", "") elif self.search_key == '999复方氨酚烷胺胶囊12粒': temp_search_key = temp_search_key.replace("12粒", "") elif self.search_key == '999咽炎片0.26g*12片*2板': temp_search_key = temp_search_key.replace("0.26g*12片*2板", "") elif self.search_key == '999小儿止咳糖浆120': temp_search_key = temp_search_key.replace("120", "") elif self.search_key == '999小儿止咳糖浆225': temp_search_key = temp_search_key.replace("225", "") elif self.search_key == '999小儿感冒颗粒6g*10': temp_search_key = temp_search_key.replace("6g*10", "") elif self.search_key == '999小儿感冒颗粒6g*24': temp_search_key = temp_search_key.replace("6g*24", "") elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋': temp_search_key = temp_search_key.replace("6g*10袋", "") elif self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋': temp_search_key = temp_search_key.replace("6g*20袋", "") elif self.search_key == '999小儿咽扁颗粒8g*10袋': temp_search_key = temp_search_key.replace("8g*10袋", "") elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋': temp_search_key = temp_search_key.replace("2.5g*10袋", "") elif self.search_key == '999感冒清热颗粒12g*18': temp_search_key = temp_search_key.replace("12g*18", "") if '999' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return else: if self.search_key == '史达功右美沙芬愈创甘油醚糖浆120': temp_search_key = self.search_key.replace('史达功', '') temp_search_key = temp_search_key.replace('120', '') if '史达功' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '三九胃泰养胃舒颗粒8袋': temp_search_key = self.search_key.replace('三九胃泰', '') temp_search_key = temp_search_key.replace('8袋', '') if '三九胃泰' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '今维多赐多康牌蛋白粉': temp_search_key = self.search_key.replace('今维多', '') if '今维多' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4': temp_search_key = self.search_key.replace('佳美舒', '') temp_search_key = temp_search_key.replace('4', '') if '佳美舒' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '三九胃泰颗粒20g*10': temp_search_key = self.search_key.replace('20g*10', '') if temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '三九胃泰颗粒20g*6袋': temp_search_key = self.search_key.replace('20g*6袋', '') if temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '顺峰康王酮康他索乳膏': temp_search_key = self.search_key.replace('顺峰康王', '') if '顺峰康王' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return else: if self.search_key not in product.replace(' ', ''): self.swipe_back(1) self.unrelated_data += 1 return # if self.search_key not in product.replace(' ', ''): # self.swipe_back(1) # self.unrelated_data += 1 # return else: self.swipe_back(1) return min_price = self.drug_price() # 最低价格 # 商品链接 product_link = self.get_product_link() # 判断是否有自营的文本,有的话不需要获取店铺的信息 if self.d.xpath('//*[@text="自营"]').exists: shop = "美团自营大药房(快递电商)" # 爬取日期 scrape_date = self.get_current_date() # scrape_date = "2025-07-18" dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团'} print(f'当前数据:{dup_data}') if self.data_is_exists(dup_data): print('存在相同数据不入库') self.swipe_back(1) return else: for i in range(8): if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名1') break self.d.swipe_ext('up', 0.3) time.sleep(1) # detail_info = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info # bounds = detail_info['bounds'] # height = bounds['bottom'] - bounds['top'] # if self.d.xpath('//*[@text="进店"]').exists and height > 100: if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名2') break shop = self.get_shop_name() # 爬取日期 scrape_date = self.get_current_date() # scrape_date = "2025-07-18" dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团'} print(f'当前数据:{dup_data}') # 获取店铺信息开始 # 暂时不获取店铺信息 start is_has_enter_shop = self.has_shop() # 需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品 shop_is_exists = self.shop_is_exists_database(shop) # 存在进店 并且店铺的名称不包含美团官方的字样 print(f"已采集{self.shop_data_num}家店铺数据") if is_has_enter_shop and '美团官方' not in shop and '美团自营' not in shop and not shop_is_exists and self.shop_data_num < 500: # license_info = self.get_license_info_ex() license_info = self.safe_exec(self.get_license_info_ex) contact_address = license_info['contact_address'] qualification_number = license_info['qualification_number'] business_license_company = license_info['business_license_company'] business_license_address = license_info['business_license_address'] save_shop_data = { 'shop': shop, 'contact_address': contact_address, 'qualification_number': qualification_number, 'scrape_date': scrape_date, 'business_license_company': business_license_company, 'business_license_address': business_license_address, 'platform': '美团' } self.save_shop_info_to_database(save_shop_data) self.shop_data_num += 1 # 店铺数据数量+1 self.swipe_back(2) else: print('不采集店铺信息') # 获取店铺信息结束 # 暂时不获取店铺信息 end if self.data_is_exists(dup_data): print('存在相同数据不入库') self.swipe_back(1) return if not shop: print('未获取到店铺名:开始回退') self.swipe_back(1) return if not shop or '自营' in shop: self.swipe_back(1) return time.sleep(self.get_sleep_time()) # 生产日期为空 manufacture_date = '' # 执政信息 # if is_has_enter_shop: # license_info = self.get_license_info() # business_license_company = license_info["单位名称"] # credit_code = license_info['社会信用代码'] # city_str = license_info['地址'] # # 先把省份啥的替换掉 # city_sub_str = re.sub(r'[u4e00-\u9fa5]+省', '', city_str) # try: # city = re.search(r'[\u4e00-\u9fa5]+?(市|区|县)', city_sub_str).group(0) # except: # city = city_sub_str # try: # province = self.city2province[city] # except: # province = '' # self.swipe_back(2) # else: # business_license_company = '' # credit_code = '' # city = '' # province = '' business_license_company = '' credit_code = '' city = '' province = '' expiry_date = '' manufacturer = '' approval_number = '' # 暂时不获取说明书信息 start # 是否存在说明书 # is_has_instructions = self.has_instructions() # 有的药品没有说明书,直接默认 if self.search_key == '今维多赐多康牌蛋白粉': expiry_date = '18个月' manufacturer = '华润圣海健康科技有限公司' approval_number = '食健备G202437001992' elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4': expiry_date = '24个月' manufacturer = '浙江华润三九众益制药有限公司' approval_number = '国药准字H20090152' elif self.search_key == '999可调式生理性海水鼻腔喷雾50': expiry_date = '3年' manufacturer = '江苏萨瑞斯医疗科技有限公司' approval_number = '苏械注准20212140025' elif self.search_key == '999蒲地蓝消炎片': expiry_date = '24个月' manufacturer = '特一药业集团股份有限公司' approval_number = '国药准字Z20063596' elif self.search_key == '999养胃舒颗粒': expiry_date = '36个月' manufacturer = '合肥华润神鹿药业有限公司' approval_number = '国药准字Z34020289' elif self.search_key == '999糠酸莫米松凝胶15': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20080010' elif self.search_key == '999黄芪精': expiry_date = '36个月' manufacturer = '台州南峰药业有限公司' approval_number = '国药准字Z33020783' elif self.search_key == '999复方感冒灵颗粒': expiry_date = '24个月' manufacturer = '华润三九(郴州)制药有限公司' approval_number = '国药准字Z43020334' elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20074155' elif self.search_key == '史达功右美沙芬愈创甘油醚糖浆120': expiry_date = '暂定24个月,具体有效期以实物说明书为准' manufacturer = '史达德药业(北京)有限公司' approval_number = '国药准字H11021837' elif self.search_key == '999速复康布洛芬缓释胶囊': expiry_date = '24个月' manufacturer = '北京红林制药有限公司' approval_number = '国药准字H20074172' elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒': expiry_date = '24个月' manufacturer = '重庆科瑞东和制药有限责任公司' approval_number = '国药准字Z50020420' elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': expiry_date = '24个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20073954' elif self.search_key == '999维生素C咀嚼片': expiry_date = '24个月' manufacturer = '甘肃成纪生物药业有限公司' approval_number = '国药准字H62021166' elif self.search_key == '999强力枇杷露120ml': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字Z36021533' elif self.search_key == '999强力枇杷露150ml': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字Z36021533' elif self.search_key == '999抗病毒口服液10ml*10' or self.search_key == '999抗病毒口服液10ml*12': expiry_date = '24个月' manufacturer = '杭州华润老桐君药业有限公司' approval_number = '国药准字Z33020518' elif self.search_key == '999精氨酸布洛芬颗粒': expiry_date = '暂定36个月' manufacturer = '华润三九(唐山)药业有限公司' approval_number = '国药准字H20070139' elif self.search_key == '999糠酸莫米松乳膏10g支': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20074090' elif self.search_key == '999选平硝酸咪康唑乳膏20g': expiry_date = '24个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20074079' elif self.search_key == '999感冒清热颗粒(无糖)6g': expiry_date = '36个月' manufacturer = '合肥华润神鹿药业有限公司' approval_number = '国药准字Z20055023' elif self.search_key == '999银菊清咽颗粒': expiry_date = '30个月' manufacturer = '合肥华润神鹿药业有限公司' approval_number = '国药准字Z20026680' elif self.search_key == '999阿奇霉素片': expiry_date = '48个月' manufacturer = '浙江华润三九众益制药有限公司' approval_number = '国药准字H20084458' elif self.search_key == '999补脾益肠丸': expiry_date = '24个月' manufacturer = '惠州市九惠制药股份有限公司' approval_number = '国药准字Z44023376' elif self.search_key == '999壮骨关节丸6g*20': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44023377' elif self.search_key == '999壮骨关节胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z20080055' elif self.search_key == '999正天丸6g*15': expiry_date = '30个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020711' elif self.search_key == '999正天胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z20010142' elif self.search_key == '三九胃泰胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020704' elif self.search_key == '三九胃泰颗粒20g*10': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020705' elif self.search_key == '999感冒灵颗粒': expiry_date = '24个月' manufacturer = '华润三九(枣庄)药业有限公司' approval_number = '国药准字Z44021940' elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': expiry_date = '36个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字H44024170' elif self.search_key == '三九胃泰颗粒20g*6袋': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020705' elif self.search_key == '顺峰康王酮康他索乳膏': expiry_date = '24个月' manufacturer = '广东华润顺峰药业有限公司' approval_number = '国药准字H10980204' elif self.search_key == '999糠酸莫米松凝胶10': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20080010' elif self.search_key == '999板蓝根颗粒10g*20': expiry_date = '36个月' manufacturer = '广东恒诚制药股份有限公司' approval_number = '国药准字Z44021520' elif self.search_key == '999复方氨酚烷胺胶囊' or self.search_key == '999复方氨酚烷胺胶囊12粒' or self.search_key == '999复方氨酚烷胺胶囊10粒' or self.search_key == '999复方氨酚烷胺胶囊6粒': expiry_date = '36个月' manufacturer = '华润三九(唐山)药业有限公司' approval_number = '国药准字H13021912' elif self.search_key == '999咽炎片0.26g*12片*2板': expiry_date = '24个月' manufacturer = '华润三九(黄石)药业有限公司' approval_number = '国药准字Z42021062' elif self.search_key == '999小儿止咳糖浆120' or self.search_key == '999小儿止咳糖浆225': expiry_date = '24个月' manufacturer = '华润三九(雅安)药业有限公司' approval_number = '国药准字Z51020675' elif self.search_key == '999小儿感冒颗粒6g*10' or self.search_key == '999小儿感冒颗粒6g*24': expiry_date = '36个月' manufacturer = '华润三九(枣庄)药业有限公司' approval_number = '国药准字Z37021392' elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋' or self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋': expiry_date = '36个月' manufacturer = '华润三九(黄石)药业有限公司' approval_number = '国药准字H42022510' elif self.search_key == '999感冒灵胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44021939' elif self.search_key == '999小儿咽扁颗粒8g*10袋': expiry_date = '24个月' manufacturer = '华润三九(黄石)药业有限公司' approval_number = '国药准字Z42021105' elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋': expiry_date = '18个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z20100067' elif self.search_key == '999感冒清热颗粒12g*18': expiry_date = '36个月' manufacturer = '山东新大陆制药有限公司' approval_number = '国药准字Z37020066' elif self.search_key == '999小柴胡颗粒': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020709' else: is_has_instructions = self.safe_exec(self.has_instructions) # 说明书等信息 if is_has_instructions: print('开始获取说明书信息') # instructions_info = self.get_instructions_data() instructions_info = self.safe_exec(self.get_instructions_data) if instructions_info['有效期'] is not None: expiry_date = instructions_info['有效期'].strip('。') if instructions_info['生产单位'] is not None: manufacturer = instructions_info['生产单位'].strip('。') if instructions_info['批准文号'] is not None: approval_number = instructions_info['批准文号'].strip('。') else: # 没有说明书不入库 print('没有获取到说明书信息') self.swipe_back(1) return # 暂时不获取说明书信息 end self.unrelated_data = 0 # 爬取省份 scrape_province = '广东' # 这里先默认广东 # 是否有货 availability = '' save_data = { 'product': product, 'min_price': min_price, 'manufacture_date': manufacture_date, 'expiry_date': expiry_date, 'shop': shop, 'business_license_company': business_license_company, 'province': province, 'city': city, 'manufacturer': manufacturer, 'specification': specifications, 'approval_number': approval_number, 'product_link': product_link, 'scrape_date': scrape_date, 'scrape_province': scrape_province, 'availability': availability, 'credit_code': credit_code, 'platform': '美团', 'search_key': self.search_key, } self.save_to_database(save_data) # time.sleep(100000) time.sleep(self.get_sleep_time()) if self.distinct_target(): print('已到达搜索列表页') else: for i in range(1): print('在详情页') self.swipe_back(1) time.sleep(self.get_sleep_time()) # 最外部有个定位按钮 if self.distinct_target(): break # 主函数 # start_page:开始页,采集用 # end_page:结束页,采集用 # task_id:上报数据用 # 添加max_duration_minutes参数 def main(self, device_id, start_page, end_page, task_id, product_name, product_specs, max_duration_minutes=None, retry_count=0): # === 新增:初始化任务信息 === self.task_id = task_id self.task_start_page = start_page self.task_end_page = end_page self.task_product_specs = product_specs self.task_product_name = product_name # self.current_page = start_page self.start_time = time.time() # === 新增:线程启动成功后更新状态为2 === if self.task_id: try: self.update_task_status(2) # 状态2: 执行中 self.loggerMT.info(f"任务 {task_id} 线程启动成功,状态已更新为2") except Exception as e: self.loggerMT.error(f"更新任务状态失败: {e}") # ===================================== # 记录任务开始 if task_id: reporter.start_task(task_id, start_page, end_page) # ======================== # task_start_time = time.time() #任务开始时间 task_scape_count = 0 # 任务采集数量初始化为0 MAX_RETRY = 3 # 最大重试次数 spider_no = 0 # 计算超时时间(秒) timeout_seconds = None if max_duration_minutes: timeout_seconds = max_duration_minutes * 60 self.connect_devices(device_id) time.sleep(self.get_sleep_time()) # self.d.toast.show("测试toast", 20) # 启动全局弹窗监控 self.monitor = SpiderMonitor(self) self.monitor.start() try: # 重新开启美团应用 self.restart_app() # 搜索关键字 # self.enter_target_page() self.safe_exec(self.enter_target_page) # === 新增:跳过前面的页面直到start_page start=== if start_page > 1: self.loggerMT.info(f"跳过前 {start_page - 1} 页,从第 {start_page} 页开始采集") current_page = 1 while current_page < start_page: # 检查是否需要暂停 if self.monitor.pausing.is_set(): self.wait_for_ready(self.monitor) # 检查是否到达底部 if self.d.xpath('//*[@text="已经到底啦"]').exists: self.loggerMT.info(f"在第 {current_page} 页已到达底部,无法继续翻页") self.loggerMT.warning(f"未能到达目标页码 {start_page},实际只到达第 {current_page} 页") if task_id: reporter.end_task( task_id=task_id, status='completed', finish_status=1, force_end_page=end_page # force_end_page=idx ) return # break # 滑动到下一页 self.d.drag(300, 1400, 300, 400, 1) time.sleep(self.get_sleep_time()) current_page += 1 # 可选:添加页码日志 self.loggerMT.debug(f"已翻到第 {current_page} 页") # 验证是否到达目标页码 if current_page < start_page: self.loggerMT.error(f"翻页失败!目标页码:{start_page},实际到达:{current_page}") # 这里可以根据需要决定是否继续执行或抛出异常 # return False 或 raise Exception else: self.loggerMT.info(f"成功翻到第 {start_page} 页,开始采集") for idx in range(start_page, end_page + 1): # === 新增:检查是否超过结束页 === if idx > end_page: self.loggerMT.info(f"已采集到指定结束页 {end_page},停止采集") if task_id: reporter.end_task( task_id=task_id, status='completed', finish_status=1, force_end_page=end_page ) return # === 新增:检查超时 === if timeout_seconds and (time.time() - self.start_time) > timeout_seconds: print(f"任务 {task_id} 达到时间限制 {max_duration_minutes} 分钟,停止采集") self.loggerMT.info(f"任务 {task_id} 达到时间限制 {max_duration_minutes} 分钟,停止采集") # 上报未完成状态 if task_id: reporter.end_task( task_id=task_id, status='completed', finish_status=0, # 0:未完成 force_end_page=self.current_page ) return # ==================== # print(f'第{idx + 1}页') print(f'第{idx}页(指定范围: {start_page}-{end_page})') self.current_page = idx # 更新当前页码 # === 新增:更新上报进度 === if task_id: reporter.update_task_progress( task_id=task_id, actual_end_page=self.current_page ) # ======================== if spider_no > 30: time.sleep(60) spider_no = 0 print('目前无关数据量: ', self.unrelated_data) # 检查是否需要暂停(验证码过多) if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY: print("频繁遇到验证码,暂停程序") # self.d.toast("请处理验证码后点击继续", 30) # 等待用户点击屏幕继续 self.d.click(0, 0) # 无效点击,等待用户操作 self.monitor.verification_count = 0 if self.unrelated_data > 20: # 连续超过20个不达标的数据则停止采集 self.loggerMT.info(f"连续20个数据不达标,品规:{self.search_key}") # === 新增:任务正常完成 === if task_id: reporter.end_task( task_id=task_id, status='completed', finish_status=1, # 1:已完成 force_end_page=end_page ) # ======================== return # 线程安全获取商品列表 # drug_lis = self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all() # drug_lis = self.safe_list('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout', self.monitor) while True: if self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').exists: break time.sleep(1) drug_lis = self.safe_exec( self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all) lis_len = len(drug_lis) print(f'当前页面共有{lis_len}个商品') for idxx, drug_one in enumerate(drug_lis, start=1): bounds = drug_one.info['bounds'] top = bounds['top'] bottom = bounds['bottom'] # height = bottom - top print(f'当前商品bottom:{bottom}') print(f'当前商品top:{top}') # if 304 <= top and bottom <= 1475: # 默认高度241的才行 if 304 <= top and bottom <= 1475: # 默认高度241的才行 1559 # print('目标-->', drug_one.info) # drug_one.click() # 获取当前元素中的属性来判断是否要点击进入采集 print(f"这页的第几个商品:{idxx}") product_title = '' price = '' shop_name = '' # 商品名称的xpath product_tittle_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' product_tittle_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(product_tittle_xpath).exists: product_title = self.d.xpath(product_tittle_xpath).text product_title = product_title[1:] if product_title.startswith('0') else product_title print(f"product_tittle_xpath列表当前商品名称:{product_title}") if self.task_product_name not in product_title or self.task_product_specs not in product_title: print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif self.d.xpath(product_tittle_xpath2).exists: product_title = self.d.xpath(product_tittle_xpath2).text product_title = product_title[1:] if product_title.startswith('0') else product_title print(f"product_tittle_xpath2列表当前商品名称:{product_title}") print(f"search_key:{self.search_key}") if self.task_product_name not in product_title or self.task_product_specs not in product_title: print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue else: print(f"列表当前商品名称不存在") # 价格 price_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' price_xpath3 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(price_xpath).exists: price_str = self.d.xpath(price_xpath).text print(f"price_xpath列表当前商品价格:{price_str}") if price_str: price = float(re.search('[\d\.]+', price_str).group()) elif self.d.xpath(price_xpath3).exists: price_str = self.d.xpath(price_xpath3).text print(f"price_xpath3列表当前商品价格:{price_str}") if price_str: price = float(re.search('[\d\.]+', price_str).group()) else: price_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(price_xpath2).exists: price_str = self.d.xpath(price_xpath2).text print(f"price_xpath2列表当前商品价格:{price_str}") if price_str: price = float(re.search('[\d\.]+', price_str).group()) else: print(f"列表当前商品价格不存在") # price_str = self.d.xpath(f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]//*[starts-with(@text,"¥")]').text print(f'列表获取到价格:{price}') # 店铺名称的xpath shop_name_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]' shop_name_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]' if self.d.xpath(shop_name_xpath).exists: shop_name = self.d.xpath(shop_name_xpath).text print(f"shop_name_xpath列表当前商品店铺名称:{shop_name}") elif self.d.xpath(shop_name_xpath2).exists: shop_name = self.d.xpath(shop_name_xpath2).text print(f"shop_name_xpath2列表当前商品店铺名称:{shop_name}") else: print(f"列表当前商品店铺名称不存在") # 如果商品的名称、价格和生产厂家都不存在则直接下一条数据。 跳过一些不是商品的数据。 if product_title == '' and price == '' and shop_name == '': continue scrape_date = self.get_current_date() if product_title and price and shop_name: # 判断数据表中是否存在 dup_data = {'product': product_title, 'min_price': price, 'shop': shop_name, 'scrape_date': scrape_date, 'platform': '美团'} if self.data_is_exists(dup_data): print('列表存在相同数据不入库') continue self.safe_exec(drug_one.click) print('点击目标药品完毕') time.sleep(2) # 采集药品信息 try: # self.integrate_data() self.safe_exec(self.integrate_data) # 检测下是否回退到列表页 if self.distinct_target(): print('回退到列表页', True) else: if self.d.xpath('//*[@text="搜索"]').exists: print("检测到搜索按钮,重新开始采集流程") if retry_count < MAX_RETRY: # 停止当前监控线程 self.monitor.stop() self.monitor.join() # 递归重启采集 return self.main(device_id, retry_count + 1) else: print("超过最大重试次数,终止程序") return else: print("无法恢复页面,终止采集") return # print('回退到列表页失败,终止采集') # return time.sleep(self.get_sleep_time()) spider_no += 1 except Exception as e: print(f'采集药品详情数据出错:{e}') # 增加阻塞的方法: if not self.distinct_target(): for i in range(1): self.swipe_back(1) # 最外部有个定位按钮 if self.distinct_target(): break if i == 0 and not self.distinct_target(): print('页面出错,退出采集') return else: continue # 翻页逻辑(如果是最后一页则不再翻页) if idx < end_page: if self.d.xpath('//*[@text="已经到底啦"]').exists: self.loggerMT.info(f'在第 {idx} 页已到达列表最底部') if task_id: reporter.end_task( task_id=task_id, status='completed', finish_status=1, force_end_page=idx ) return # 翻页 print('开始滑动') self.d.drag(300, 1400, 300, 400, 1) print('滑动结束') time.sleep(self.get_sleep_time()) # if self.d.xpath('//*[@text="已经到底啦"]').exists: # print('已经到达列表页最底部') # # === 新增:任务正常完成 === # if task_id: # reporter.end_task( # task_id=task_id, # status='completed', # finish_status=1, # 1:已完成 # force_end_page=end_page # ) # # ======================== # return # 采集完成,数据上报 if task_id: reporter.end_task( task_id=task_id, status='completed', finish_status=1, force_end_page=end_page ) ''' search_list = self.d.xpath('//android.support.v7.widget.RecyclerView').info bounds = search_list['bounds'] #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom']) # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom']) # 计算滑动距离 scroll_distance = bounds['bottom'] - bounds['top'] # 正数 start_y = 1600 end_y = start_y - scroll_distance # 向上滑动,y 坐标减小 # 确保 end_y 不小于 0 end_y = max(end_y, 304) # 留出一点边距,避免滑出屏幕 # print('滑动起点 y:', start_y, '终点 y:', end_y) # self.d.swipe(200, start_y, 200, end_y, 0.4) print('开始滑动') self.d.drag(300, 1400, 300, 400, 1) # self.safe_exec(self.d.drag, 300, 1400, 300, 400, 1) print('滑动结束') #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom']) # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom']) # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'], 0.4) time.sleep(self.get_sleep_time()) ''' except Exception as e: print(f"采集任务异常: {e}") # === 新增:异常结束上报 === if task_id: reporter.end_task( task_id=task_id, status='failed', finish_status=0, # 未完成 force_end_page=self.current_page ) # ======================== raise finally: # 确保监控线程被停止 self.monitor.stop() self.monitor.join() def unitest(self): """ 单元测试 :return: """ save_data = { 'product': "[昆中药]舒肝颗粒(低糖型)", 'min_price': 14.0, 'manufacture_date': '', 'expiry_date': '36个月', 'shop': '美团自营大药房(快递电商)', 'business_license_company': '', 'province': '', 'city': '', 'manufacturer': '昆明中药厂有限公司', 'specification': '3g*16袋/盒', 'approval_number': '国药准字Z53021161', 'product_link': '', 'scrape_date': '2025/07/09', 'scrape_province': '广东', 'availability': '', 'credit_code': '', 'platform': '美团' } self.save_to_database(save_data) time.sleep(100000) pass # retrieve database def get_retrieve_mysql(): """ 建立远端连接并返回一个到数据库的连接对象 """ import pymysql # return pymysql.connect( # host='39.108.116.125', # 修改后的主机 # port=3306, # 添加端口号 # user='drug_retrieve_master', # 修改后的用户名 # password='6Y6zS4BKjLSBAEdP', # 修改后的密码 # db='drug_retrieve_master', # 修改后的数据库名 # charset='utf8mb4' # ) return pymysql.connect( host='39.108.116.125', # 修改后的主机 port=3306, # 添加端口号 user='drug_retrieve', # 修改后的用户名 password='Pem287cwM58jNpe2', # 修改后的密码 db='drug_retrieve', # 修改后的数据库名 charset='utf8mb4' ) # def main(): # #从配置的系统里面读取采集用到的设备和搜索关键词 # #1、数据库的连接,从数据库中获取采集品规和设备adb码,启动程序进行采集,如果配置了采集时间,还需要支持到了时间终止采集,如果配置了采集的页数,需要滑动到指定的页数后再进行采集 # #2、代码要支持多线程(线程池)的管理,每个线程有自己的生命周期。 # #获取未开始的美团平台的采集任务 # retrieve_conn = get_retrieve_mysql() # cursor = retrieve_conn.cursor() # query = "SELECT id,collect_equipment_id,product_name,start_page,end_page FROM retrieve_collect_task_allocate WHERE status = 1 AND platform = 4" # cursor.execute(query) # result = cursor.fetchone() # if result: # collect_equipment_id = result [1] # product_name = result[2] # start_page = result[3] # end_page = result[4] # print(f"collect_equipment_id={collect_equipment_id}") # print(f"product_name={product_name}") # if collect_equipment_id == 0: # print("设备id不存在") # return # if product_name == '': # print("采集关键字获取失败") # return # #通过 collect_equipment_id 获取设别adb码 # device_query = "SELECT device_id FROM retrieve_collect_equipment WHERE id = %s and status = 0" # cursor.execute(device_query, (collect_equipment_id)) # device_result = cursor.fetchone() # if device_result: # device_id = device_result[0] # print(f"device_id={device_id}") # else: # # self.loggerMT.info("没有可用的设备进行数据采集") # print("没有可用的设备进行数据采集") # return # else: # # self.loggerMT.info("MT 没有要采集的品规") # print("MT 没有要采集的品规") # return # key = product_name # try: # mt = MT(key) # 用当前关键字实例化 # mt.main(device_id,start_page,end_page) # 执行一次完整采集 # logging.info(f'关键字 {key} 本轮采集完成') # except Exception as e: # # 发生异常直接跳过该关键字,继续下一轮 # logging.exception(f'关键字 {key} 采集异常:{e}') # finally: # # 关闭当前 MT 实例资源(如有需要) # if hasattr(mt, 'close'): # mt.close() class TimeoutException(Exception): pass # 如果需要并行处理(提高效率),可以使用线程池: def process_tasks_in_parallel(max_workers=12): """使用线程池并行处理多个任务""" """使用线程池并行处理多个任务,每个任务最多执行30分钟""" from concurrent.futures import ThreadPoolExecutor, as_completed import concurrent.futures # ← 新增导入 retrieve_conn = get_retrieve_mysql() cursor = retrieve_conn.cursor() query = """ SELECT id, collect_equipment_id, product_name, start_page, end_page, duration, product_specs FROM retrieve_collect_task_allocate WHERE status = 1 \ AND platform = 4 \ """ cursor.execute(query) results = cursor.fetchall() print(f"获取到的任务结果={results}") if not results: print("MT 没有要采集的品规") return # 准备任务列表 tasks = [] device_map = {} for result in results: task_id = result[0] collect_equipment_id = result[1] product_name = result[2] start_page = result[3] end_page = result[4] duration = result[5] product_specs = result[6] if collect_equipment_id != 0 and product_name and product_name.strip(): # 缓存设备查询 if collect_equipment_id not in device_map: device_query = "SELECT device_id FROM retrieve_collect_equipment WHERE id = %s AND status = 0" cursor.execute(device_query, (collect_equipment_id,)) device_result = cursor.fetchone() device_map[collect_equipment_id] = device_result[0] if device_result else None if device_map[collect_equipment_id]: # ↓ 使用数据库中的duration,如果没有设置则用默认值30分钟 duration_minutes = duration if duration is not None else 30 tasks.append({ 'task_id': task_id, 'device_id': device_map[collect_equipment_id], 'key': product_name.strip() + product_specs.strip(), 'start_page': start_page, 'end_page': end_page, 'duration_minutes': duration_minutes, # 存储执行时间限制(分钟) 'product_specs': product_specs, # 存储执行时间限制(分钟) 'product_name': product_name, # 存储执行时间限制(分钟) }) cursor.close() retrieve_conn.close() if not tasks: print("没有有效的采集任务") return print(f"准备并行处理 {len(tasks)} 个任务") def process_single_task(task): """处理单个任务的函数""" task_start_time = time.time() # ← 记录开始时间 # start_time = time.time() try: mt = MT(task['key']) # 执行采集,获取采集数量 关键数据:实际采集的数量,实际的页数 mt.main( device_id=task['device_id'], start_page=task['start_page'], end_page=task['end_page'], task_id=task['task_id'], product_name=task['product_name'], product_specs=task['product_specs'], max_duration_minutes=task['duration_minutes'] # 传入时间限制 ) return { 'task_id': task['task_id'], 'success': True, 'collected_count': mt.collected_count, 'final_page': mt.current_page } except Exception as e: print(f"任务 {task['task_id']} 执行异常: {e}") return { 'task_id': task['task_id'], 'success': False, 'error': str(e) } finally: if 'mt' in locals() and hasattr(mt, 'close'): try: mt.close() except: pass # 使用线程池并行执行 successful_tasks = 0 failed_tasks = 0 # total_execution_time = 0 # 初始化总执行时间变量 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_task = { executor.submit(process_single_task, task): task for task in tasks } # 处理完成的任务 for future in as_completed(future_to_task): task = future_to_task[future] try: task_timeout = (task['duration_minutes'] + 5) * 60 # 加5分钟缓冲 result = future.result(timeout=task_timeout) # 使用动态超时时间 if result['success']: successful_tasks += 1 print(f"任务 {result['task_id']}: 完成,采集 {result['collected_count']} 条数据") else: failed_tasks += 1 print(f"任务 {result['task_id']}: 失败,错误: {result['error']}") except concurrent.futures.TimeoutError: # ← 捕获超时异常 failed_tasks += 1 print(f"任务 {task['task_id']}: 超时(限制 {task['duration_minutes']} 分钟)") # ↓ 超时后上报数据 if task['task_id']: # 这里需要调用上报,但reporter可能没有这个任务的数据 # 更好的方式是在MT.main中已经上报了 pass except Exception as e: failed_tasks += 1 print(f"任务 {task['task_id']}: 执行异常 {e}") # if (successful_tasks + failed_tasks) > 0: # avg_time = total_execution_time / (successful_tasks + failed_tasks) # avg_minutes = avg_time / 60 # else: # avg_minutes = 0 # total_minutes = total_execution_time / 60 print(f"\n并行采集完成:") print(f"成功: {successful_tasks} 个") print(f"失败: {failed_tasks} 个") if __name__ == '__main__': # main() def run_collection(): """执行采集任务""" try: print(f"【定时任务开始】时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") process_tasks_in_parallel(max_workers=12) print(f"【定时任务结束】时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") except Exception as e: print(f"【定时任务异常】: {e}") # 设置定时任务 schedule.every(10).minutes.do(run_collection) # 立即执行一次 run_collection() print("定时任务已设置,每40分钟执行一次采集") # 循环执行 while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次 # process_tasks_in_parallel(max_workers=12) # 可以同时处理12个任务 # scheduler = BlockingScheduler() # scheduler.add_job(main, 'cron', hour=21, minute=30, misfire_grace_time=120) # try: # scheduler.start() # except (KeyboardInterrupt, SystemExit): # pass