import os import sys script_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(script_dir) import requests import base64 import cv2 import uiautomator2 as u2 import time import subprocess import re import random import datetime import json from aip import AipOcr from apscheduler.schedulers.blocking import BlockingScheduler # from db_mysql import mysqlClient import threading from collections import deque import numpy as np import secrets import oss2 import urllib.parse from certifi.core import where # import pyperclip from config import Config from logger import setup_logger import logging # from database import MySQLClient from PIL import Image from pathlib import Path from PIL import Image, ImageDraw, ImageFont # 配置日志 # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') setup_logger("mt_spider") # 初始化日志 class SpiderMonitor(threading.Thread): """全局弹窗监控线程(增强版)""" def __init__(self, spider_instance): super().__init__(daemon=True) self.spider = spider_instance self.running = True self.pausing = threading.Event() # 主线程同步事件 self.last_verification_time = 0 self.verification_count = 0 self.MAX_VERIFICATION_RETRY = 10 self.recent_clicks = deque(maxlen=10) # 防重复点击 self.logger = logging.getLogger("SpiderMonitor") # 可配置化弹窗规则 self.popup_rules = { "simple": [ ('//*[@text="确定"]', "点击确定"), ('//*[@text="允许"]', "点击允许"), ('//*[@text="关闭"]', "点击关闭"), ('//*[@resource-id="com.sankuai.meituan:id/close"]', "关闭按钮"), ('//*[@resource-id="com.sankuai.meituan:id/address_center_location_close"]', "关闭按钮"), ('//*[@resource-id="com.sankuai.meituan:id/location_close"]', "关闭按钮"), ('//*[@resource-id="com.sankuai.meituan:id/btn_close"]', "关闭按钮"), ], "verification": [ '//*[contains(@text, "验证")]', '//*[contains(@text, "滑块")]', '//*[contains(@text, "依次点击")]', '//*[contains(@text, "请点击")]', '//*[contains(@text, "拖动滑块刚")]', # 这个需要拖动滑块至最右边,然后再截图 '//*[contains(@text, "请输入图片中的内容")]', '//*[contains(@text, "用最短线连接")]', '//*[contains(@text, "请按语序依次点击")]', '//*[contains(@text, "请向右滑动滑块")]', '//*[contains(@text, "请拖动下方滑块完成拼图")]', '//*[contains(@resource-id, "captcha")]' ] } def run(self): while self.running: try: handled = self.check_and_handle_popup() time.sleep(2 if handled else 1) except Exception as e: self.logger.exception("监控线程异常: %s", e) time.sleep(3) def _is_recent_click(self, xpath): """防止重复点击同一个弹窗""" key = f"{xpath}_{int(time.time())}" if key in self.recent_clicks: return True self.recent_clicks.append(key) return False def check_and_handle_popup(self): d = self.spider.d # 1. 处理简单弹窗 for xpath, desc in self.popup_rules["simple"]: if d.xpath(xpath).exists and not self._is_recent_click(xpath): self.logger.info("检测到弹窗: %s", desc) d.xpath(xpath).click() return True # 2. 处理验证码弹窗 for xpath in self.popup_rules["verification"]: if d.xpath(xpath).exists: now = time.time() if now - self.last_verification_time < 30: return False # 30秒内不重复触发 self.last_verification_time = now self.verification_count += 1 self.logger.warning("验证码弹窗触发,等待人工处理...") if self.verification_count > self.MAX_VERIFICATION_RETRY: self.logger.error("验证码重试超限,终止任务") self.spider.stop_all() return True self.pausing.set() # 通知主线程暂停 # d.toast.show("需要人工处理验证码", 120) # 等待人工处理 start = time.time() # while time.time() - start < 120*60: # if not d.xpath(xpath).exists: # self.logger.info("验证码已处理") # d.toast.show("验证完成", 2) # self.pausing.clear() # 放行主线程 # return True # time.sleep(5) while True: if not d.xpath(xpath).exists: self.logger.info("验证码已处理") # d.toast.show("验证完成", 2) self.pausing.clear() # 放行主线程 return True time.sleep(5) self.logger.warning("验证码超时,重启APP") self.spider.restart_app() return True # 3. 处理广告弹窗(点击右上角) if d.xpath('//*[contains(@text, "广告")]').exists: w, h = d.info['displayWidth'], d.info['displayHeight'] d.click(w - 50, 50) self.logger.info("关闭广告弹窗") return True return False def stop(self): self.running = False class MTScreenshot: def __init__(self, d, oss_config, search_key, scroll_times=4, compress_quality=7, resize_ratio=0.8): # 接收外部已连接好的u2设备实例 self.d = d self.search_key = search_key # 添加这行 # 启动全局弹窗监控 self.monitor = SpiderMonitor(self) self.monitor.start() self.loggerMT = logging.getLogger() # 日志初始化 self.logger = self._init_logger() # OSS配置与初始化(核心配置,无冗余) self.oss_config = oss_config self.oss_bucket = self._init_oss_bucket() # 截图核心参数 self.scroll_times = scroll_times self.compress_quality = compress_quality self.resize_ratio = resize_ratio # self.title_xpaths = [ # '//*[@resource-id="com.jd.lib.productdetail.feature:id/db"]', # '//*[@resource-id="com.jd.lib.productdetail.feature:id/cx"]', # '//*[@resource-id="com.jd.lib.productdetail.feature:id/cj"]' # ] def _init_logger(self): # 极简日志配置,仅保留必要输出 logger = logging.getLogger("mt_screenshot") logger.setLevel(logging.INFO) logger.handlers.clear() handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) logger.addHandler(handler) return logger def _init_oss_bucket(self): # 仅做OSS配置校验和Bucket连接,无额外功能 if not all([self.oss_config.get("access_key_id"), self.oss_config.get("access_key_secret"), self.oss_config.get("endpoint"), self.oss_config.get("bucket_name")]): self.logger.warning("OSS配置不完整,无法上传") return None try: auth = oss2.Auth(self.oss_config["access_key_id"], self.oss_config["access_key_secret"]) bucket = oss2.Bucket(auth, self.oss_config["endpoint"], self.oss_config["bucket_name"]) bucket.get_bucket_info() # 验证连接 self.logger.info("OSS Bucket连接成功") return bucket except Exception as e: self.logger.error(f"OSS Bucket连接失败: {e}") return None def _upload_to_oss(self, local_path): # 极简上传逻辑,仅返回OSS URL或None if not self.oss_bucket or not os.path.exists(local_path): return None file_name = os.path.basename(local_path) safe_name = re.sub(r'[^\w\.\-]', '_', file_name) oss_key = f"{self.oss_config.get('oss_prefix', 'scrape_data/')}{safe_name}" try: oss2.resumable_upload(self.oss_bucket, oss_key, local_path) # 生成并返回完整OSS URL oss_file_url = f"https://{self.oss_config['bucket_name']}.{self.oss_config['endpoint']}/{urllib.parse.quote(oss_key, safe='/')}" self.logger.info(f"OSS上传成功: {oss_file_url}") return oss_file_url except Exception as e: self.logger.error(f"OSS上传失败: {e}") return None # def _get_title(self): # # 仅提取标题,无冗余逻辑 # for xpath in self.title_xpaths: # elem = self.d.xpath(xpath) # if elem.exists: # info = elem.info # title = (info.get("contentDescription") or info.get("content-desc") or info.get("text") or "").strip() # return title[:50] # 限制标题长度,避免文件名过长 # return "" def safe_exec(self, func, *args, **kwargs): """ 万能安全壳:执行 func 前检查验证码, 若监控线程已置位 pausing,则一直阻塞直到放行。 """ while self.monitor.pausing.is_set(): time.sleep(1) # 执行真正逻辑 return func(*args, **kwargs) def _get_title(self): # try: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text # except: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text def _inner(): temp_search_key = self.search_key if "999" in self.search_key: if self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = self.search_key.replace("999皮炎平", "") elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30': temp_search_key = self.search_key.replace("999必无忧", "") elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': temp_search_key = self.search_key.replace("999必无忧", "") elif self.search_key == '999速复康布洛芬缓释胶囊': temp_search_key = self.search_key.replace("999速复康", "") elif self.search_key == '999选平硝酸咪康唑乳膏20g': temp_search_key = self.search_key.replace("999选平", "") elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': temp_search_key = self.search_key.replace("999皮炎平", "") else: temp_search_key = self.search_key.replace("999", "") else: if self.search_key == '史达功右美沙芬愈创甘油醚糖浆120': temp_search_key = self.search_key.replace("史达功", "") temp_search_key = temp_search_key.replace("120", "") elif self.search_key == '三九胃泰养胃舒颗粒8袋': temp_search_key = self.search_key.replace("三九胃泰", "") temp_search_key = temp_search_key.replace("8袋", "") elif self.search_key == '今维多赐多康牌蛋白粉': temp_search_key = self.search_key.replace("今维多", "") elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4': temp_search_key = self.search_key.replace("佳美舒", "") temp_search_key = temp_search_key.replace("4", "") elif self.search_key == '三九胃泰颗粒20g*10': temp_search_key = self.search_key.replace("20g*10", "") elif self.search_key == '三九胃泰颗粒20g*6袋': temp_search_key = self.search_key.replace("20g*6袋", "") elif self.search_key == '顺峰康王酮康他索乳膏': temp_search_key = self.search_key.replace("顺峰康王", "") if self.search_key == '999糠酸莫米松凝胶15': temp_search_key = temp_search_key.replace("15", "") elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = temp_search_key.replace("30", "") elif self.search_key == '999复方感冒灵颗粒15': temp_search_key = temp_search_key.replace("15", "") elif self.search_key == '999复方金银花颗粒10g': temp_search_key = temp_search_key.replace("10g", "") elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒': temp_search_key = temp_search_key.replace("15g*15袋/盒", "") elif self.search_key == '999复方氨酚烷胺胶囊6粒': temp_search_key = temp_search_key.replace("6粒", "") elif self.search_key == '999可调式生理性海水鼻腔喷雾50': temp_search_key = temp_search_key.replace("50", "") elif self.search_key == '999止泻利颗粒15g*8': temp_search_key = temp_search_key.replace("15g*8", "") elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30': temp_search_key = temp_search_key.replace("30", "") elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': temp_search_key = temp_search_key.replace("15g", "") elif self.search_key == '999复方苦参肠炎康片12片': temp_search_key = temp_search_key.replace("12片", "") elif self.search_key == '999强力枇杷露16袋': temp_search_key = temp_search_key.replace("16袋", "") elif self.search_key == '999三蛇胆川贝膏138': temp_search_key = temp_search_key.replace("138", "") elif self.search_key == '999强力枇杷露120ml': temp_search_key = temp_search_key.replace("120ml", "") elif self.search_key == '999强力枇杷露150ml': temp_search_key = temp_search_key.replace("150ml", "") elif self.search_key == '999抗病毒口服液10ml*10': temp_search_key = temp_search_key.replace("10ml*10", "") elif self.search_key == '999抗病毒口服液10ml*12': temp_search_key = temp_search_key.replace("10ml*12", "") elif self.search_key == '999糠酸莫米松乳膏10g支': temp_search_key = temp_search_key.replace("10g支", "") elif self.search_key == '999选平硝酸咪康唑乳膏20g': temp_search_key = temp_search_key.replace("20g", "") elif self.search_key == '999感冒清热颗粒(无糖)6g': temp_search_key = temp_search_key.replace("6g", "") elif self.search_key == '999壮骨关节丸6g*20': temp_search_key = temp_search_key.replace("6g*20", "") elif self.search_key == '999正天丸6g*15': temp_search_key = temp_search_key.replace("6g*15", "") elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': temp_search_key = temp_search_key.replace("20", "") elif self.search_key == '999糠酸莫米松凝胶10': temp_search_key = temp_search_key.replace("10", "") elif self.search_key == '999板蓝根颗粒10g*20': temp_search_key = temp_search_key.replace("10g*20", "") elif self.search_key == '999复方氨酚烷胺胶囊10粒': temp_search_key = temp_search_key.replace("10粒", "") elif self.search_key == '999复方氨酚烷胺胶囊12粒': temp_search_key = temp_search_key.replace("12粒", "") elif self.search_key == '999咽炎片0.26g*12片*2板': temp_search_key = temp_search_key.replace("0.26g*12片*2板", "") elif self.search_key == '999小儿止咳糖浆120': temp_search_key = temp_search_key.replace("120", "") elif self.search_key == '999小儿止咳糖浆225': temp_search_key = temp_search_key.replace("225", "") elif self.search_key == '999小儿感冒颗粒6g*10': temp_search_key = temp_search_key.replace("6g*10", "") elif self.search_key == '999小儿感冒颗粒6g*24': temp_search_key = temp_search_key.replace("6g*24", "") elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋': temp_search_key = temp_search_key.replace("6g*10袋", "") elif self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋': temp_search_key = temp_search_key.replace("6g*20袋", "") elif self.search_key == '999小儿咽扁颗粒8g*10袋': temp_search_key = temp_search_key.replace("8g*10袋", "") elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋': temp_search_key = temp_search_key.replace("2.5g*10袋", "") # elif self.search_key == '三九胃泰颗粒': # self.search_key = '三九胃泰' #兼容三九胃泰 温胃舒颗粒 print(f'获取商品title时的搜索关键字:{temp_search_key}') # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text # 初始化 drugs_name = '' specifications = '' title = '' # 循环的获取title为了有时间来处理人机验证 for m in range(1, 6000): if self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').exists: title = self.safe_exec( lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text ) self.loggerMT.info(f"第{m}次获取title成功") print(f"第{m}次获取title成功") break else: time.sleep(3) # return drugs_name, specifications # drugs_name = '' # specifications = '' # try: # title_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' # title_xpath_2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' # if self.d.xpath(title_xpath).exists: # title = self.d.xpath(title_xpath).text # print(f"title_xpath获取的title={title}") # if temp_search_key not in title: # return drugs_name, specifications # elif self.d.xpath(title_xpath_2).exists: # title = self.d.xpath(title_xpath_2).text # print(f"title_xpath_2获取的title={title}") # if temp_search_key not in title: # return drugs_name, specifications # else: # print('title_xpath不存在,请确认') # return drugs_name, specifications # # title = self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text # except Exception as e: # print(f"发生异常: {e}") # return drugs_name, specifications # 奇怪:有的时候title取出来的记过第一位会多一个0 # title = self.safe_exec(self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text) # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text title = title[1:] if title.startswith('0') else title print(f'获取到药品标题:{title}') # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) if self.search_key == '999赐多康大豆': return title, '1罐' if self.search_key == "999感冒清热颗粒": match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) else: match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title) if match: # drugs_name = match.group(1).strip() + match.group(2).strip() drugs_name = title specifications = match.group(3).strip() print("药品名:", drugs_name) print("规格:", specifications) # print('完整药名:', drugs_name + specifications) return drugs_name # , specifications else: if title == '999抗病毒口服液10ml*12' or title == '999抗病毒口服液': drugs_name = title specifications = '10ml*12支/盒' return drugs_name # , specifications elif title == '999抗病毒口服液10ml*10': drugs_name = title specifications = '10ml*10支/盒' return drugs_name # , specifications elif title == '999小柴胡颗粒': drugs_name = title specifications = '10g*9袋/盒' return drugs_name # , specifications elif title == '999养胃舒颗粒': drugs_name = title specifications = '10g*10袋/盒' return drugs_name # , specifications elif title == '三九胃泰胶囊': drugs_name = title specifications = '0.5g*24粒/盒' return drugs_name # , specifications elif title == '999补脾益肠丸': drugs_name = title specifications = '6g*15袋/盒' return drugs_name # , specifications elif title == '999感冒灵颗粒': drugs_name = title specifications = '10g*9袋/盒' return drugs_name # , specifications elif title == '999感冒灵胶囊': drugs_name = title specifications = '0.5g*12粒/盒' return drugs_name # , specifications else: print("没有匹配到预期格式") drugs_name = title specifications = '' return drugs_name # , specifications # 用 safe_exec 包装内部逻辑,确保验证码阻塞 return self.safe_exec(_inner) def _merge_screenshots(self, screens): # 仅拼接截图,无额外功能 if len(screens) == 1: return screens[0].convert('RGB') rgb_screens = [s.convert('RGB') for s in screens] total_width = rgb_screens[0].width total_height = sum(s.height for s in rgb_screens) merged_img = Image.new('RGB', (total_width, total_height)) y_offset = 0 for img in rgb_screens: merged_img.paste(img, (0, y_offset)) y_offset += img.height return merged_img def get_oss_url(self): """核心方法:截图+临时本地保存+上传OSS+上传成功删本地文件+返回OSS URL,可直接赋值给oss_file""" local_file_path = None try: # 1. 提取标题 title = self._get_title() self.logger.info(f"获取标题: {title[:20]}..." if title else "未获取到标题") # 2. 生成本地文件路径 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") safe_title = re.sub(r'[\\/*?:"<>|]', '_', title) local_dir = "../scrape_data" os.makedirs(local_dir, exist_ok=True) local_file_path = os.path.join(local_dir, f"{timestamp}_{safe_title}.jpg") # 3. 滚动截图 screen_list = [self.d.screenshot()] w, h = self.d.window_size() for i in range(self.scroll_times): # 可能滑动距离太短,截不到店名。原本是0.8 # self.d.swipe(w // 2, h * 0.9, w // 2, h * 0.1, duration=random.uniform(0.6, 1.2)) self.d.swipe(w // 2, h * 0.85, w // 2, h * 0.15, # 滑动到15% duration=random.uniform(0.8, 1.5)) time.sleep(random.uniform(2.0, 4.0)) screen_list.append(self.d.screenshot()) if self.d(textContains='商家服务').exists: # 看情况是否需要补滑 break # # ========== 自动处理“是否存储图像”弹窗 ========== # # 检测弹窗是否存在(根据弹窗的文本/控件ID定位) # # 通过“是否存储图像”文本定位弹窗 # # if self.d(text="是否存储图像").exists(timeout=2): # # # 点击“取消”(不需要系统存储截图) # # self.d(text="取消").click(timeout=2) # # self.logger.info("已自动关闭“是否存储图像”弹窗") # # #出现标题 break # ========== 滑动截图完成后,滑回初始位置 ========== # self.logger.info("开始滑回初始位置") # # for i in range(self.scroll_times): # # 反向滑动(与正向滑动方向相反) # self.d.swipe_ext('down', 0.8) # time.sleep(random.uniform(1.0, 2.0)) # print(f"第{i+1}次反向滑动,已滑回部分距离") # self.logger.info("✅ 已滑回初始页面位置") # 4. 拼接+压缩+保存 merged_img = self._merge_screenshots(screen_list) if 0.1 < self.resize_ratio < 1.0: new_size = (int(merged_img.width * self.resize_ratio), int(merged_img.height * self.resize_ratio)) resample_mode = Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS merged_img = merged_img.resize(new_size, resample_mode) # 临时保存到本地 merged_img.save(local_file_path, format='JPEG', quality=self.compress_quality) merged_img.close() # 释放长图句柄 self.logger.info(f"临时本地保存: {local_file_path}") # 5. 上传OSS oss_url = self._upload_to_oss(local_file_path) # 6. 核心:OSS上传成功后,删除本地临时文件 if oss_url is not None: try: # 先不删除,检查还有没有问题 # os.remove(local_file_path) self.logger.info(f"✅ OSS上传成功,已删除本地临时文件: {local_file_path}") # 若本地目录为空,可删除目录(按需开启) # if not os.listdir(local_dir): # os.rmdir(local_dir) # self.logger.info(f"本地目录{local_dir}为空,已删除") except Exception as e: self.logger.warning(f"⚠️ OSS上传成功,但删除本地文件失败: {e}") return oss_url except Exception as e: self.logger.error(f"截图/上传失败: {e}") return None def get_access_token(): AppKey = "tRK2RhyItCSh6BzyT4CNVXQa" AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh" token_url = 'https://aip.baidubce.com/oauth/2.0/token' url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}" payload = "" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) try: return response.json()['access_token'] except: return None def get_mysql(): """ 建立并返回一个到数据库的连接对象 """ import pymysql return pymysql.connect( host=Config.DB_HOST, port=Config.DB_PORT, user=Config.DB_USER, password=Config.DB_PASSWORD, db=Config.DB_NAME, # "drug_data", # 修改后的数据库名 charset='utf8mb4' ) class MT: def __init__(self, key): # self.package_name = 'com.sankuai.meituan' self.package_name = Config.PACKAGE_NAME self.access_token = get_access_token() self.city2province = self.get_city_info() self.APP_ID = '' self.API_KEY = '' self.SECRET_KEY = '' self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY) self.table_name = Config.DB_TABLE # "mt_drug_middle" self.shop_table_name = Config.DB_SHOP_TABLE self.loggerMT = logging.getLogger() self.search_key = key # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒 self.unrelated_data = 0 # 无关数据数量 self.shop_data_num = 0 # 店铺数据数量 self.device_id = None # 设备ID self.monitor = None # 监控线程 def cleanup(self): """ 清理资源,确保程序退出时释放所有资源 """ self.loggerMT.info("开始清理资源...") try: # 停止监控线程 if self.monitor: self.loggerMT.info("停止监控线程...") self.monitor.stop() if hasattr(self.monitor, 'join'): self.monitor.join(timeout=5) self.monitor = None # 停止应用(如果设备连接正常) if hasattr(self, 'd') and self.d is not None: try: self.loggerMT.info("停止美团应用...") # self.stop_app() except Exception as e: self.loggerMT.warning(f"停止应用时发生异常: {e}") # 清理临时文件 # self._cleanup_temp_files() # self.loggerMT.info("资源清理完成") except Exception as e: self.loggerMT.error(f"清理资源时发生异常: {e}") def _cleanup_temp_files(self): """ 清理临时文件 """ try: temp_patterns = [ "screenshot*.png", "*.jpg", "*.jpeg", "temp_*.png" ] for pattern in temp_patterns: for file in Path(".").glob(pattern): try: file.unlink() self.loggerMT.debug(f"删除临时文件: {file}") except Exception as e: self.loggerMT.warning(f"删除文件 {file} 失败: {e}") except Exception as e: self.loggerMT.warning(f"清理临时文件时发生异常: {e}") def check_device_status(self): """ 检查设备状态 :return: True表示设备正常,False表示设备异常 """ try: if not hasattr(self, 'd') or self.d is None: self.loggerMT.warning("设备未连接") return False # 检查设备信息 device_info = self.d.info if not device_info: self.loggerMT.warning("无法获取设备信息") return False # 检查屏幕状态 screen_status = self.d.info.get('screenOn', True) if not screen_status: self.loggerMT.warning("设备屏幕关闭") # 尝试唤醒屏幕 try: self.d.screen_on() time.sleep(2) except: pass self.loggerMT.debug(f"设备状态正常: {device_info.get('productName', 'Unknown')}") return True except u2.exceptions.SessionBrokenError as e: self.loggerMT.error(f"设备会话断开: {e}") return False except Exception as e: self.loggerMT.exception(f"检查设备状态时发生异常: {e}") return False def check_network_status(self, test_urls=None): """ 检查网络状态 :param test_urls: 要测试的URL列表,默认为常用服务 :return: True表示网络正常,False表示网络异常 """ if test_urls is None: test_urls = [ "https://www.baidu.com", "https://www.taobao.com", "https://www.meituan.com" ] success_count = 0 max_timeout = 10 # 最大超时时间(秒) for url in test_urls: try: self.loggerMT.debug(f"测试网络连接: {url}") response = requests.get(url, timeout=max_timeout) if response.status_code == 200: success_count += 1 self.loggerMT.debug(f"网络测试成功: {url}") else: self.loggerMT.warning(f"网络测试失败: {url} (状态码: {response.status_code})") except requests.exceptions.Timeout: self.loggerMT.warning(f"网络测试超时: {url}") except requests.exceptions.ConnectionError: self.loggerMT.warning(f"网络连接错误: {url}") except Exception as e: self.loggerMT.warning(f"网络测试异常: {url} - {e}") # 如果至少有一个测试成功,认为网络正常 if success_count > 0: self.loggerMT.info(f"网络状态正常 ({success_count}/{len(test_urls)} 个测试成功)") return True else: self.loggerMT.error("网络状态异常,所有测试都失败") return False def wait_for_network(self, max_wait_time=300, check_interval=30): """ 等待网络恢复 :param max_wait_time: 最大等待时间(秒) :param check_interval: 检查间隔(秒) :return: True表示网络恢复,False表示超时 """ start_time = time.time() wait_count = 0 self.loggerMT.info(f"等待网络恢复,最大等待时间: {max_wait_time}秒") while time.time() - start_time < max_wait_time: wait_count += 1 elapsed_time = time.time() - start_time if self.check_network_status(): self.loggerMT.info(f"网络恢复,等待时间: {elapsed_time:.1f}秒") return True remaining_time = max_wait_time - elapsed_time self.loggerMT.info( f"网络仍未恢复,已等待 {elapsed_time:.1f}秒,剩余 {remaining_time:.1f}秒 (第{wait_count}次检查)") if remaining_time > check_interval: time.sleep(check_interval) else: time.sleep(remaining_time) self.loggerMT.error(f"网络恢复超时,已等待 {max_wait_time}秒") return False def wr_re(self, mod, device_id, title=None, shop=None, prices=None, page=1): """ 写入或读取采集进度,增强异常处理和原子性操作 """ file_path = f'./ycwj/{device_id}_{self.search_key}.txt' if mod == "写": # 原子性写入:先写入临时文件,然后重命名为目标文件 temp_file = f'{file_path}.tmp' try: data = { "device_id": device_id, "title": title if title else "", "shop": shop if shop else "", "prices": f"¥{int(prices)}" if prices else "", "page": page if page else 1 } # 确保目录存在 os.makedirs(os.path.dirname(file_path), exist_ok=True) # 写入临时文件 json_str = json.dumps(data, ensure_ascii=False, indent=2) with open(temp_file, 'w', encoding='utf-8') as f: f.write(json_str) # 原子性重命名(在Windows上可能需要特殊处理) if os.path.exists(file_path): backup_file = f'{file_path}.bak' try: os.replace(file_path, backup_file) except: pass # 如果备份失败,继续 os.replace(temp_file, file_path) self.loggerMT.info(f"进度保存成功: 第{page}页 - {shop if shop else '未知店铺'}") # 清理备份文件(如果存在) backup_file = f'{file_path}.bak' if os.path.exists(backup_file): try: os.remove(backup_file) except: pass except Exception as e: self.loggerMT.error(f"保存进度失败: {e}") # 清理临时文件 if os.path.exists(temp_file): try: os.remove(temp_file) except: pass elif mod == "读": try: if not os.path.exists(file_path): self.loggerMT.info("进度文件不存在,从头开始采集") return None with open(file_path, 'r', encoding='utf-8') as f: content = f.read() data = json.loads(content) # 验证数据完整性 required_fields = ['device_id', 'title', 'shop', 'prices', 'page'] if all(field in data for field in required_fields): self.loggerMT.info(f"从进度恢复: 第{data['page']}页 - {data['shop']}") return data else: self.loggerMT.warning("进度文件数据不完整") return None except json.JSONDecodeError as e: self.loggerMT.error(f"进度文件JSON解析失败: {e}") # 尝试恢复备份文件 backup_file = f'{file_path}.bak' if os.path.exists(backup_file): self.loggerMT.info("尝试从备份文件恢复进度") try: os.replace(backup_file, file_path) return self.wr_re("读", device_id) # 递归调用 except: pass return None except Exception as e: self.loggerMT.error(f"读取进度失败: {e}") return None return None def save_progress(self, device_id, current_page, current_shop=None, current_title=None, current_price=None): """ 保存当前采集进度 :param device_id: 设备ID :param current_page: 当前页码 :param current_shop: 当前店铺名(可选) :param current_title: 当前商品标题(可选) :param current_price: 当前价格(可选) :return: True表示保存成功,False表示保存失败 """ try: # 使用安全执行器保存进度 result = self.safe_exec( self.wr_re, "写", device_id, current_title, current_shop, current_price, current_page, max_retries=20, retry_delay=1 ) if result is None: # wr_re方法在"写"模式下返回None self.loggerMT.debug(f"进度保存成功: 第{current_page}页") return True else: self.loggerMT.warning("进度保存返回意外结果") return False except Exception as e: self.loggerMT.error(f"保存进度时发生异常: {e}") return False def auto_save_progress(self, device_id, page_num, shop_name="", product_title="", price=""): """ 自动保存进度,根据策略决定是否保存 :return: True表示已保存,False表示跳过保存 """ # 保存策略: # 1. 每采集完一个商品就保存 # 2. 每5页保存一次(作为备份) # 3. 异常发生时强制保存 try: # 总是保存当前进度 success = self.save_progress( device_id, page_num, shop_name if shop_name else "", product_title if product_title else "", price if price else "" ) if success: # 每5页额外记录一个检查点 if page_num % 5 == 0: checkpoint_file = f'./ycwj/{device_id}_{self.search_key}_checkpoint_{page_num}.txt' try: with open(checkpoint_file, 'w') as f: f.write(f"检查点: 第{page_num}页, 时间: {datetime.datetime.now()}") self.loggerMT.debug(f"创建检查点: {checkpoint_file}") except: pass # 检查点保存失败不影响主流程 return success except Exception as e: self.loggerMT.warning(f"自动保存进度失败: {e}") return False def _cleanup_checkpoint_files(self, device_id, search_key): """ 清理检查点文件 """ try: pattern = f'./ycwj/{device_id}_{search_key}_checkpoint_*.txt' for checkpoint_file in Path(".").glob(pattern): try: checkpoint_file.unlink() self.loggerMT.debug(f"清理检查点文件: {checkpoint_file}") except Exception as e: self.loggerMT.warning(f"清理检查点文件失败 {checkpoint_file}: {e}") except Exception as e: self.loggerMT.warning(f"清理检查点文件时发生异常: {e}") def li_or_lo(self, key): if key == "升序": self.d.xpath('//*[@text="价格"]').click() time.sleep(self.get_sleep_time()) if key == "降序": self.d.xpath('//*[@text="价格"]').click() time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="价格"]').click() time.sleep(self.get_sleep_time()) def get_prices(self): """ 获取价格列表,增强异常处理 :return: 价格列表,如果获取失败返回空列表 """ price_list = [] try: # 检查设备连接 if not hasattr(self, 'd') or self.d is None: self.loggerMT.warning("设备未连接,无法获取价格") return price_list # 查找价格元素 price_elements = self.d.xpath('//android.widget.TextView[contains(@text,"¥")]').all() if not price_elements: self.loggerMT.debug("未找到价格元素") return price_list # 处理价格元素 for price_element in price_elements: try: price_text = price_element.text if not price_text: continue # 过滤优惠信息 if "优惠" in price_text or "以优惠" in price_text: continue # 提取价格数值 price_value = price_text.replace("¥", "").strip() # 转换为数值 if "." in price_value: price_list.append(float(price_value)) else: price_list.append(int(price_value)) except (ValueError, AttributeError) as e: self.loggerMT.warning( f"价格解析失败: {price_element.text if hasattr(price_element, 'text') else 'unknown'}, 错误: {e}") continue except Exception as e: self.loggerMT.warning(f"处理价格元素时发生异常: {e}") continue self.loggerMT.debug(f"获取到价格列表: {price_list}") return price_list except u2.exceptions.UiObjectNotFoundError as e: self.loggerMT.warning(f"未找到价格UI元素: {e}") return price_list except u2.exceptions.SessionBrokenError as e: self.loggerMT.error(f"设备会话断开,无法获取价格: {e}") return price_list except Exception as e: self.loggerMT.exception(f"获取价格时发生未预料异常: {e}") return price_list def slide_n(self): recycler = self.d.xpath('//android.support.v7.widget.RecyclerView') if recycler.wait(timeout=8): # 最多等10秒 search_list = recycler.info # 操作 else: print("超时,列表没出现") if self.d.xpath('//*[@text="已经到底啦"]').exists: print('已经到达列表页最底部') return bounds = search_list['bounds'] # # 计算滑动距离 scroll_distance = bounds['bottom'] - bounds['top'] # 正数 start_y = 1600 end_y = start_y - scroll_distance # 向上滑动,y 坐标减小 # # 确保 end_y 不小于 0 end_y = max(end_y, 304) # 留出一点边距,避免滑出屏幕 # # print('滑动起点 y:', start_y, '终点 y:', end_y) # # self.d.swipe(200, start_y, 200, end_y, 0.4) print('开始滑动') self.d.drag(300, 1400, 300, 400, 1) print('滑动结束') def stop_app(self): self.d.app_stop(self.package_name) time.sleep(5) def start_app(self): self.d.app_start(self.package_name) time.sleep(5) def restart_app(self): """ 重启app :return: """ self.stop_app() self.start_app() @staticmethod def get_sleep_time(): # return random.randint(5, 8) return random.randint(1, 3) @staticmethod def get_current_date(): return datetime.datetime.now().strftime('%Y/%m/%d') @staticmethod def get_city_info(): """ 获取所有的省市数据 :return: """ file_path = 'D:\work\dfwy_spider\drug_data\mt\kailin_city.json' with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) province = {province_one["id"]: province_one for province_one in data['province']} city2province = dict() city = data['city'] for city_one in city: name = city_one['name'] pid = city_one['pid'] if len(str(pid)) > 2: pid = int(re.match('^\d{2}', str(pid)).group()) city2province[name] = province[pid]['name'] return city2province def get_shop_name(self): """ 获取店铺名 :return: """ max_retries = 2 shop_name = None for attempt in range(max_retries): try: # 检查设备连接 if not hasattr(self, 'd') or self.d is None: self.loggerMT.warning("设备未连接,无法获取店铺名") return None # 第一种xpath尝试 try: shop_name = self.d.xpath( '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text if shop_name and shop_name.strip(): self.loggerMT.info(f'获取到店铺名: {shop_name}') return shop_name.strip() except u2.exceptions.UiObjectNotFoundError: pass # 继续尝试下一种方法 # 第二种xpath尝试 try: shop_name = self.d.xpath( '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text if shop_name and shop_name.strip(): self.loggerMT.info(f'获取到店铺名(备用方法): {shop_name}') return shop_name.strip() except u2.exceptions.UiObjectNotFoundError: pass # 继续尝试下一种方法 # 第三种方法:点击店铺进入后获取 if attempt == max_retries - 1: # 最后一次尝试 self.loggerMT.info("尝试点击店铺进入后获取店铺名称") try: self.enter_shop() shop_xpath = '//*[@resource-id="com.sankuai.meituan:id/layout_header_view"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]//android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(shop_xpath).exists: shop_name = self.d.xpath(shop_xpath).text if shop_name and shop_name.strip(): self.loggerMT.info(f'通过店铺页面获取到店铺名: {shop_name}') self.swipe_back(1) return shop_name.strip() except Exception as e: self.loggerMT.warning(f"通过店铺页面获取店铺名失败: {e}") # 确保返回上一页 try: self.swipe_back(1) except: pass # 如果未找到,等待后重试 if attempt < max_retries - 1: wait_time = 2 * (attempt + 1) self.loggerMT.debug(f"未找到店铺名,等待{wait_time}秒后重试...") time.sleep(wait_time) except u2.exceptions.SessionBrokenError as e: self.loggerMT.error(f"设备会话断开,无法获取店铺名: {e}") return None except Exception as e: self.loggerMT.warning(f"获取店铺名时发生异常(尝试{attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(2 * (attempt + 1)) self.loggerMT.warning("无法获取店铺名,所有方法都尝试失败") return None def safe_find_element(self, xpath, max_retries=20, retry_delay=2, timeout=10, default_return=None): """ 安全查找UI元素,支持重试和异常处理 :param xpath: 要查找的xpath :param max_retries: 最大重试次数 :param retry_delay: 重试延迟(秒) :param timeout: 每次查找的超时时间(秒) :param default_return: 查找失败时的默认返回值 :return: 找到的元素或默认返回值 """ for attempt in range(max_retries): try: # 检查设备连接 if not hasattr(self, 'd') or self.d is None: self.loggerMT.warning("设备未连接,无法查找元素") return default_return # 查找元素 element = self.d.xpath(xpath) if element.wait(timeout=timeout): return element else: self.loggerMT.debug(f"未找到元素: {xpath} (尝试 {attempt + 1}/{max_retries})") except u2.exceptions.SessionBrokenError as e: self.loggerMT.error(f"设备会话断开,无法查找元素: {e}") return default_return except Exception as e: self.loggerMT.warning(f"查找元素时发生异常: {e} (尝试 {attempt + 1}/{max_retries})") # 如果不是最后一次尝试,等待后重试 if attempt < max_retries - 1: wait_time = retry_delay * (attempt + 1) self.loggerMT.debug(f"等待 {wait_time} 秒后重试查找元素...") time.sleep(wait_time) self.loggerMT.warning(f"无法找到元素: {xpath},已达到最大重试次数") return default_return def safe_get_element_text(self, xpath, max_retries=20, retry_delay=2, timeout=10, default_return=None): """ 安全获取UI元素的文本内容 :param xpath: 要查找的xpath :param max_retries: 最大重试次数 :param retry_delay: 重试延迟(秒) :param timeout: 每次查找的超时时间(秒) :param default_return: 查找失败时的默认返回值 :return: 元素的文本内容或默认返回值 """ element = self.safe_find_element(xpath, max_retries, retry_delay, timeout, None) if element is None: return default_return try: text = element.text if text and text.strip(): return text.strip() else: return default_return except Exception as e: self.loggerMT.warning(f"获取元素文本时发生异常: {e}") return default_return def get_qualification_number(self): """ 获取资质编号,增强异常处理 :return: 资质编号,如果获取失败返回None """ try: # 使用安全查找方法 xpath = '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]' qualification_number_str = self.safe_get_element_text(xpath, max_retries=20, retry_delay=3, default_return="") if qualification_number_str: qualification_number = qualification_number_str.strip('资质编号:').strip() if qualification_number: self.loggerMT.info(f'获取到资质编号: {qualification_number}') return qualification_number self.loggerMT.warning("未找到资质编号或资质编号为空") return None except Exception as e: self.loggerMT.exception(f"获取资质编号时发生异常: {e}") return None def get_shop_address(self): try: xpath = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView' if self.d.xpath(xpath).exists: shop_address = self.d.xpath(xpath).text print(f'111-获取到店铺地址:{shop_address}') if '发货时间' in shop_address: print(f'店铺地址包含发货时间,再次获取店铺地址') xpath2 = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.TextView' if self.d.xpath(xpath2).exists: shop_address = self.d.xpath(xpath2).text print(f'222-获取到店铺地址:{shop_address}') else: print(f'222-xpath2获取店铺地址失败') else: shop_address = '' print(f'333-获取到店铺地址:{shop_address}') return shop_address except: print(f'获取店铺地址出错-get_shop_address') return None def enter_detail(self): self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/recycler"]/android.widget.FrameLayout[1]').click() time.sleep(self.get_sleep_time()) def save_to_database(self, data): print(f'保存数据到数据库:{data}') # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() # add_sql = "insert into delete_friend_table(delete_user_name,delete_user_id,delete_content,delete_time) value(%s,%s,%s,%s)" add_sql = f""" INSERT INTO {self.table_name} (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform, search_key, sales, inventory, snapshot_url) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], self.get_current_date(), data['scrape_province'], data['availability'], data['credit_code'], data['platform'])) cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'], data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], data['scrape_date'], data['scrape_province'], data['availability'], data['credit_code'], data['platform'], data['search_key'], data['sales'], data['inventory'], data['snapshot_url'])) conn.commit() # 提交数据 # self.mysql_client.insert(self.table_name, data) print(f"存入数据库成功") def save_shop_info_to_database(self, data): print(f'保存店铺数据到数据库:{data}') # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() add_sql = f""" INSERT INTO {self.shop_table_name} (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.execute(add_sql, (data['shop'], data['contact_address'], data['qualification_number'], data['business_license_company'], data['business_license_address'], data['scrape_date'], data['platform'])) conn.commit() # 提交数据 # self.mysql_client.insert(self.shop_table_name, data) print(f'存入店铺信息到数据库成功') def swipe_up(self): """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) def swipe_back(self, no): """ 返回 :param no: 回退次数 :return: """ for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) def drug_price(self): """ 获取药品价格 :return: """ try: price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text price = float(re.search(r'[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return price except Exception as e: print(f'提取价格出错-->{e}') return None def drug_sale_num(self): """ 获取药品销量 :return: """ try: sales_element = self.d.xpath('//*[starts-with(@text,"已售")]') if sales_element.exists: sales_num_str = self.d.xpath('//*[starts-with(@text,"已售")]').text sales_num_str = sales_num_str.replace("已售", "").strip() # price = float(re.search(r'[\d\.]+', price_str).group()) print(f'获取到已售数量:{sales_num_str}') return sales_num_str return None except Exception as e: print(f'提取已售数量出错-->{e}') return None def restart_uiautomator_services(self, device_id): """ 重启atx的uiautomator 服务 :param device_id: :return: """ stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop' start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d' subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) def connect_devices(self, device_id): """ 连接设备,支持重试机制 :return: True表示连接成功,False表示连接失败 """ max_retries = 3 retry_delay = 5 # 秒 for attempt in range(max_retries): try: self.loggerMT.info(f'尝试连接设备 {device_id},第 {attempt + 1} 次尝试') self.d = u2.connect_usb(device_id) # 测试连接是否有效 device_info = self.d.info self.loggerMT.info(f'设备连接成功: {device_info}') self.restart_uiautomator_services(device_id) self.oss_config = { "access_key_id": 'LTAI5tDwjfteBvivYN41r8sJ', "access_key_secret": 'yowuOGi2nYYnrqGpO3qcz94C4brcPp', "endpoint": "oss-cn-shenzhen.aliyuncs.com", "bucket_name": "zhijiayun-jiansuo", "oss_prefix": "scrape_data/" } print(f'✅ 成功连接到设备: {device_id}') self.loggerMT.info(f'✅ 成功连接到设备: {device_id}') return True except Exception as e: error_msg = f'{device_id} 连接错误 (尝试 {attempt + 1}/{max_retries}): {e}' print(f'⚠️ {error_msg}') self.loggerMT.error(error_msg) if attempt < max_retries - 1: print(f'等待 {retry_delay} 秒后重试...') time.sleep(retry_delay) retry_delay *= 2 # 指数退避 else: print(f'❌ 设备 {device_id} 连接失败,已达到最大重试次数') self.loggerMT.error(f'设备 {device_id} 连接失败,已达到最大重试次数') return False return False def get_ocr_res(self, img): try: # img地址 print(f'开始识别图片:{img}') request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license" # 二进制方式打开图片文件 f = open(img, 'rb') img = base64.b64encode(f.read()) params = {"image": img} # access_token = get_access_token() request_url = request_url + "?access_token=" + self.access_token headers = {'content-type': 'application/x-www-form-urlencoded'} response = requests.post(request_url, data=params, headers=headers) if response: res = response.json() new_dic = dict() for ite in res['words_result'].keys(): new_dic[ite] = res['words_result'][ite]['words'] print('资质数据信息', new_dic) return new_dic else: return None except: return None def remove_watermark(self, img_path): """ 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据 :param img_path: 图片路径 :return: 二进制图片数据 """ img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) endswith = os.path.splitext(img_path)[1] new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8) _, img_binary = cv2.imencode(endswith, new) return img_binary def get_ocr_res_image(self, img): try: image = self.remove_watermark(img) res_image = self.client.basicGeneral(image) data = res_image.get('words_result', '') print(f'百度api返回结果:{data}') return data except: return None def screenshot_the_business_license(self, qualification_number): screenshot_path = 'screenshot1.png' self.d.screenshot(screenshot_path) img = cv2.imread(screenshot_path) # 指定裁剪区域 (left, top, right, bottom) left = 0 top = 480 right = 720 bottom = 1420 cropped_img = img[top:bottom, left:right] # 创建目录 SCREENSHOT_DIR = Path('screenshot') # 注意这里的变化和py文件同一级目录即可 SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) if qualification_number: # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' cropped_screenshot_path = SCREENSHOT_DIR / f'{qualification_number}.png' else: cropped_screenshot_path = 'cropped_screenshot.png' cv2.imwrite(cropped_screenshot_path, cropped_img) return cropped_screenshot_path def screenshot_instruction(self): # 获取当前时间 current_time = datetime.datetime.now() # 格式化为时分秒 time_str = current_time.strftime("%H-%M-%S") # 生成随机的 8 位字符串 random_str = secrets.token_hex(4) # 生成 4 个字节的随机字符串,转换为 8 位十六进制字符串 print(time_str) screenshot_path = 'instructionscreenshot1-' + time_str + '-' + random_str + '.png' self.d.screenshot(screenshot_path) return screenshot_path def extract_specification(self, text): """提取药品规格信息""" # 方法1:简单去除到期信息 pattern = r'^[^【]+' match = re.search(pattern, text) if match: return match.group(0).strip() return text # 获取商品title def get_title(self): # try: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text # except: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text def _inner(): temp_search_key = self.search_key if "天力士" in self.search_key: temp_search_key = '复方丹参滴丸' # elif self.search_key == '三九胃泰颗粒': # self.search_key = '三九胃泰' #兼容三九胃泰 温胃舒颗粒 print(f'获取商品title时的搜索关键字:{temp_search_key}') # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text # 初始化 drugs_name = '' specifications = '' title = '' # 循环的获取title为了有时间来处理人机验证 for m in range(1, 6000): if self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').exists: title = self.safe_exec( lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text ) print(f"第{m}次获取title成功") break else: time.sleep(1) # return drugs_name, specifications title = title[1:] if title.startswith('0') else title print(f'获取到药品标题:{title}') # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) if self.search_key == '999赐多康大豆': return title, '1罐' if self.search_key == "999感冒清热颗粒": match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) else: match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title) if match: # drugs_name = match.group(1).strip() + match.group(2).strip() drugs_name = title specifications = match.group(3).strip() print("药品名:", drugs_name) print("规格:", specifications) # 如果品规中包含到期则需要再次的正则处理 if '到期' in specifications: specifications = self.extract_specification(specifications) # print('完整药名:', drugs_name + specifications) return drugs_name, specifications else: if title == '999抗病毒口服液10ml*12' or title == '999抗病毒口服液': drugs_name = title specifications = '10ml*12支/盒' return drugs_name, specifications elif title == '999抗病毒口服液10ml*10': drugs_name = title specifications = '10ml*10支/盒' return drugs_name, specifications elif title == '999小柴胡颗粒': drugs_name = title specifications = '10g*9袋/盒' return drugs_name, specifications elif title == '999养胃舒颗粒': drugs_name = title specifications = '10g*10袋/盒' return drugs_name, specifications elif title == '三九胃泰胶囊': drugs_name = title specifications = '0.5g*24粒/盒' return drugs_name, specifications elif title == '999补脾益肠丸': drugs_name = title specifications = '6g*15袋/盒' return drugs_name, specifications elif title == '999复方感冒灵颗粒': drugs_name = title specifications = '14g*9袋/盒' return drugs_name, specifications else: print("没有匹配到预期格式") drugs_name = title specifications = '' return drugs_name, specifications # 用 safe_exec 包装内部逻辑,确保验证码阻塞 return self.safe_exec(_inner) def enter_shop(self): """ 进店,方便提取资质环境 :return: """ # self.d.xpath('//*[@text="进店"]').click() self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) def enter_shoper(self): """ 进入商家 :return: """ is_shoper_exists = 0 for i in range(10): if self.d.xpath('//*[@text="商家"]').exists: print(f'第{i}次商家存在') is_shoper_exists = 1 break else: print(f'第{i}次商家不存在') time.sleep(self.get_sleep_time()) if is_shoper_exists == 1: self.d.xpath('//*[@text="商家"]').click() time.sleep(self.get_sleep_time()) return True else: return False # 点击查看商家资质 def scan_shoper_license(self): exist_shoper = 0 for i in range(10): if self.d.xpath('//*[@text="查看商家资质"]').exists: print(f'第{i}次查看商家资质存在') exist_shoper = 1 break else: print(f'第{i}次查看商家资质不存在') time.sleep(self.get_sleep_time()) if exist_shoper == 1: self.d.xpath('//*[@text="查看商家资质"]').click() time.sleep(self.get_sleep_time()) else: self.swipe_back(1) # 验证商品的信息是否在数据库中已存在 def data_is_exists(self, data): """ 检查指定数据是否已存在于数据库表中(仅检查存在性) 参数: data: 包含查询条件的字典,键为列名,值为条件值 返回: True: 数据存在 False: 数据不存在 None: 检查过程中出错 """ # dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, # 'platform': '美团'} # 1. 验证必要字段 required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] logging.error(f"缺少必要字段: {', '.join(missing)}") return None try: # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() # query_sql = f"SELECT * FROM {self.table_name} WHERE product = '{data['product']}' AND min_price = '{data['min_price']}' AND shop = '{data['shop']}' AND scrape_date = '{data['scrape_date']}' AND platform = '{data['platform']}'" # cur.execute(query_sql) query_sql = """ SELECT * FROM {} WHERE product = %s AND min_price = %s AND shop = %s AND scrape_date = %s AND platform = %s """.format(self.table_name) cur.execute(query_sql, ( data['product'], data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") # 验证店铺信息是否在数据库中已存在 def shop_is_exists_database(self, shop): try: # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() query_sql = """ SELECT * FROM {} WHERE shop = %s """.format(self.shop_table_name) cur.execute(query_sql, ( shop )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") def wait_if_verifying(self, monitor, timeout=120): """验证码处理期间阻塞主线程""" start = time.time() while monitor.pausing.is_set() and time.time() - start < timeout: time.sleep(1) def wait_for_ready(self, monitor, timeout=86400): """进入每一页前都先等验证码""" start = time.time() while monitor.pausing.is_set() and time.time() - start < timeout: time.sleep(1) # 额外保险:如果验证码突然在这一秒才弹,再主动扫一次 monitor.check_and_handle_popup() def safe_list(self, xpath, monitor): """线程安全地拿商品列表""" self.wait_for_ready(monitor) return self.d.xpath(xpath).all() def safe_exec(self, func, *args, max_retries=20, retry_delay=2, default_return=None, **kwargs): """ 增强版安全执行器:执行 func 前检查验证码,支持重试机制和异常处理 :param func: 要执行的函数 :param args: 函数参数 :param max_retries: 最大重试次数 :param retry_delay: 重试延迟(秒) :param default_return: 异常时的默认返回值 :param kwargs: 函数关键字参数 :return: 函数执行结果或默认返回值 """ # 等待验证码处理完成 while self.monitor.pausing.is_set(): time.sleep(1) last_exception = None for attempt in range(max_retries): try: # 检查设备连接状态 if not hasattr(self, 'd') or self.d is None: self.loggerMT.warning("设备未连接,尝试重新连接") if not self.connect_devices(self.device_id if hasattr(self, 'device_id') else '95b2c764'): self.loggerMT.error("设备重新连接失败") return default_return # 执行函数 result = func(*args, **kwargs) return result except u2.exceptions.UiObjectNotFoundError as e: # UI元素未找到异常 error_msg = f"UI元素未找到 (尝试 {attempt + 1}/{max_retries}): {e}" self.loggerMT.warning(error_msg) last_exception = e except u2.exceptions.SessionBrokenError as e: # 会话断开异常 error_msg = f"设备会话断开 (尝试 {attempt + 1}/{max_retries}): {e}" self.loggerMT.error(error_msg) last_exception = e # 尝试重启应用 try: self.loggerMT.info("尝试重启应用恢复会话") self.restart_app() except Exception as restart_error: self.loggerMT.error(f"重启应用失败: {restart_error}") except requests.exceptions.RequestException as e: # 网络请求异常 error_msg = f"网络请求失败 (尝试 {attempt + 1}/{max_retries}): {e}" self.loggerMT.error(error_msg) last_exception = e except Exception as e: # 其他异常 error_msg = f"执行函数 {func.__name__ if hasattr(func, '__name__') else 'unknown'} 时发生异常 (尝试 {attempt + 1}/{max_retries}): {e}" self.loggerMT.exception(error_msg) last_exception = e # 如果不是最后一次尝试,等待后重试 if attempt < max_retries - 1: wait_time = retry_delay * (attempt + 1) # 指数退避 self.loggerMT.info(f"等待 {wait_time} 秒后重试...") time.sleep(wait_time) # 所有重试都失败 self.loggerMT.error(f"函数执行失败,已达到最大重试次数") if last_exception: self.loggerMT.error(f"最后异常: {last_exception}") return default_return def get_next_data(self, data, target): for i, item in enumerate(data): if item['words'] == target: if i + 1 < len(data): return data[i + 1]['words'] return None def delete_instruction_screenshot(self, screenshot_path): # 删除截图文件 try: os.remove(screenshot_path) print(f"截图文件已删除:{screenshot_path}") except FileNotFoundError: print(f"文件未找到,无法删除:{screenshot_path}") except Exception as e: print(f"删除文件时出错:{e}") ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="查看详细说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="加载更多"]').click_exists() loop_page = 5 # new_list = list() new_list = [] for i in range(loop_page): self.d.xpath('//*[@text="加载更多"]').click_exists() time.sleep(0.2) if i == 0: self.d.swipe(200, 1000, 200, 300, 0.4) else: self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() time.sleep(0.2) all_tt = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all() for idx in range(1, len(all_tt) + 1): all_tt1 = self.d.xpath( f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all() # print(f'当前说明书列表数据:{all_tt1}') for tt in all_tt1: if tt.text and tt.text != '展开全文': new_list.append(tt.text) if i == 0: height = 938 else: drug_box = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info bounds = drug_box['bounds'] height = bounds['bottom'] - bounds['top'] if height < 938: # print('说明书翻页到底部') break # 展开全文 new_list = [item for item in new_list if item != '展开全文'] print(f'当前说明书列表数据:{new_list}') # expiry_date_index = next(idx for idx, i in enumerate(new_list) if i == '有效期') # manufacturer_index = next(idx for idx, i in enumerate(new_list) if i == '生产单位') # approval_number_index = next(idx for idx, i in enumerate(new_list) if i == '批准文号') # res_data = { # "有效期": new_list[expiry_date_index + 1], # "生产单位": new_list[manufacturer_index + 1], # "批准文号": new_list[approval_number_index + 1] # } res_data = { "有效期": (new_list[new_list.index("有效期") + 1]) if "有效期" in new_list and new_list.index("有效期") + 1 < len(new_list) else "", "生产单位": (new_list[new_list.index("生产单位") + 1]) if "生产单位" in new_list and new_list.index("生产单位") + 1 < len(new_list) else "", "批准文号": (new_list[new_list.index("批准文号") + 1]) if "批准文号" in new_list and new_list.index("批准文号") + 1 < len(new_list) else "" } print(f'当前说明书字典数据:{res_data}') return res_data ''' ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="查看详细说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) # 1) 先向上滑动一次,触发“加载更多”出现 self.d.swipe(200, 1000, 200, 300, 0.4) time.sleep(0.3) # 2) 再进入“出现就点”的循环 while self.d.xpath('//*[@text="加载更多"]').click_exists(timeout=1): time.sleep(0.2) self.d.swipe(200, 1000, 200, 300, 0.4) # self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) # 一次性获取所有文本 texts = [ node.text.strip() # for node in self.d.xpath('//android.widget.TextView').all() for node in self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.TextView').all() if node.text and node.text.strip() and node.text != '加载更多' ] print(f'当前说明书列表数据:{texts}') # 提取关键字段 def safe_get(key): # try: # idx = texts.index(key) # return texts[idx + 1] if idx + 1 < len(texts) else "" # except ValueError: # return "" try: idx = next(i for i, text in enumerate(texts) if text == key) return texts[idx + 1] if idx + 1 < len(texts) else "" except StopIteration: return "" res_data = { "有效期": safe_get("有效期"), "生产单位": safe_get("生产单位"), "批准文号": safe_get("批准文号") } print(f'当前说明书字典数据:{res_data}') return res_data ''' ''' def get_instructions_data(self): """ 说明书键值对采集:连续两个 TextView 为一对,精确提取 """ # 1. 进入说明书 self.d(text="说明").click() time.sleep(0.5) self.d(text="查看详细说明").click() time.sleep(0.5) # self.d(text="加载更多").click_exists(timeout=0.5) # 2. 找到说明书最外层 ScrollView(页面主体) scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.ScrollView") count = scroll_view.count print(f"找到的 ScrollView 数量: {count}") if not scroll_view.exists: return {"有效期": "", "生产单位": "", "批准文号": ""} # 3. 在 ScrollView 内再定位真正包含键值对的容器 # 绝大多数美团说明书页面对应的是 ScrollView > ViewGroup > 若干 TextView kv_container = scroll_view.child(className="android.view.ViewGroup") if not kv_container.exists: kv_container = scroll_view # 降级:直接对 ScrollView 取子孙 TextView # 4. 滑动到底并收集所有 TextView(保留顺序) all_texts = [] max_swipe = 5 last_length = 0 for _ in range(max_swipe): texts = kv_container.child(className="android.widget.TextView") #获取texts中的文本 print(f'当前说明书列表数据:{texts}') current_texts = [] self.loggerMT.info(f'说明书111') for tv in texts: try: txt = tv.get_text().strip() # txt = tv.info['text'].strip() except Exception: continue if txt and txt != "展开全文": current_texts.append(txt) self.loggerMT.info(f'说明书222') print(f'当前说明书列表数据:{current_texts}') # 去重 if current_texts: current_texts = [t for t in current_texts if t not in all_texts] all_texts.extend(current_texts) # 判断是否到底 # if not scroll_view.info.get("scrollable"): # break # 判断是否到底 if len(all_texts) == last_length: break last_length = len(all_texts) # self.d.swipe_ext("up", scale=0.7) #向上滑动一次 self.d.swipe(200, 1000, 200, 300, 0.2) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() # 5. 成对解析 res_data = {"有效期": "", "生产单位": "", "批准文号": ""} for i in range(len(all_texts) - 1): key = all_texts[i] val = all_texts[i + 1] if key in res_data: res_data[key] = val print(f'说明书文本共 {len(all_texts)} 条,提取结果: {res_data}') # time.sleep(1000000) return res_data ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) if self.d.xpath('//*[@text="查看详细说明"]').exists: self.d.xpath('//*[@text="查看详细说明"]').click() else: for i in range(8): if self.d.xpath('//*[@text="查看全部"]').exists: print('开始点击查看全部') break self.d.swipe_ext('down', 0.3) time.sleep(1) if self.d.xpath('//*[@text="查看全部"]').exists: print('开始点击查看全部2') break if self.d.xpath('//*[@text="查看全部"]').exists: self.d.xpath('//*[@text="查看全部"]').click() else: res_data = { "有效期": '', "生产单位": '', "批准文号": '' } self.loggerMT.info('获取到的说明书信息为空。') return res_data # time.sleep(random.randint(3, 5)) time.sleep(0.5) # self.d.xpath('//*[@text="加载更多"]').click_exists() # loop_page = 5 # new_list = list() # new_list = [] for ii in range(8): if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() time.sleep(0.2) break else: self.d.swipe(200, 1000, 200, 300, 0.3) # self.d.swipe_ext("up", scale=0.3) for iii in range(10): if self.d.xpath('//*[@text="生产单位"]').exists and self.d.xpath('//*[@text="批准文号"]').exists: break else: self.d.swipe(200, 1300, 200, 300, 0.3) # self.d.swipe_ext("up", scale=0.3) instruction_path = self.screenshot_instruction() print(f"instruction_path= {instruction_path}") time.sleep(2) ocr_res = self.get_ocr_res_image(instruction_path) # print(f'ocr_res:{ocr_res}') if ocr_res: # 获取有效期的下一个数据 validity = self.get_next_data(ocr_res, '有效期') # 获取批准文号的下一个数据 approval_number = self.get_next_data(ocr_res, '批准文号') # 获取生产单位的下一个数据 manufacturer = self.get_next_data(ocr_res, '生产单位') else: validity = '' approval_number = '' manufacturer = '' # print("有效期:", validity) # print("批准文号:", approval_number) # print("生产单位:", manufacturer) res_data = { "有效期": validity, "生产单位": manufacturer, "批准文号": approval_number } print(f"res_data={res_data}") time.sleep(1) self.delete_instruction_screenshot(instruction_path) return res_data def has_instructions(self): """ 是否有说明书 :return: """ # 没有说明书的无法采集具体数据 time.sleep(self.get_sleep_time()) is_has_instructions = False for i in range(8): if self.d.xpath('//*[@text="说明"]').exists: print(f"第{i}次有说明书1") is_has_instructions = True break self.d.swipe_ext('down', 0.3) time.sleep(1) # detail_info = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info # bounds = detail_info['bounds'] # height = bounds['bottom'] - bounds['top'] # if self.d.xpath('//*[@text="进店"]').exists and height > 100: if self.d.xpath('//*[@text="说明"]').exists: is_has_instructions = True print(f"第{i}次有说明书2") break # is_has_instructions = self.d.xpath('//*[@text="说明"]').exists return is_has_instructions def has_shop(self): """ 是否有进店按钮 :return: """ # self.d.swipe_ext('up', 0.1) time.sleep(self.get_sleep_time()) is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists return is_has_enter_shop # 获取商品对应的店铺信息 def get_license_info_ex(self): # self.enter_shop() self.safe_exec(self.enter_shop) # self.enter_shoper() result = self.safe_exec(self.enter_shoper) if result == False: license_info_data = {'contact_address': '', 'qualification_number': '', 'business_license_company': '', 'business_license_address': ''} return license_info_data for i in range(10): if self.d.xpath('//*[@text="查看商家资质"]').exists: print(f"第{i}次有商家资质") break else: print(f"第{i}次没有商家资质") time.sleep(self.get_sleep_time()) # 获取地址 # contact_address = self.get_shop_address() contact_address = self.safe_exec(self.get_shop_address) # time.sleep(50000) ### # self.scan_shoper_license() self.safe_exec(self.scan_shoper_license) # 获取资质编码 # qualification_number = self.get_qualification_number() qualification_number = self.safe_exec(self.get_qualification_number) # qualification_number 不为None继续下一步 if qualification_number: # 营业执照公司名称 business_license_company = '' # 营业执照地址 business_license_address = '' self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) cropped_screenshot_path = self.screenshot_the_business_license(qualification_number) print(f'cropped_screenshot_path:{cropped_screenshot_path}') # if qualification_number: # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' # else: # cropped_screenshot_path = 'cropped_screenshot.png' # ocr_res = self.get_ocr_res('cropped_screenshot.png') ocr_res = self.get_ocr_res(cropped_screenshot_path) print(f'ocr_res:{ocr_res}') # 获取ocr_res 中的地址、单位名称 if ocr_res: if '单位名称' in ocr_res.keys(): business_license_company = ocr_res['单位名称'] if '地址' in ocr_res.keys(): business_license_address = ocr_res['地址'] license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number, 'business_license_company': business_license_company, 'business_license_address': business_license_address} else: license_info_data = {'contact_address': contact_address, 'qualification_number': '', 'business_license_company': '', 'business_license_address': ''} return license_info_data """暂不用该功能 def get_license_info(self): self.enter_shop() self.enter_shoper() self.scan_shoper_license() # 获取资质编码 qualification_number = self.get_qualification_number() if qualification_number: table_license_info = self.get_table_license_info(qualification_number) if table_license_info: return { '单位名称': table_license_info[0], '地址': table_license_info[1], '社会信用代码': table_license_info[2] } else: # operate_no = random.randint(0, 1) self.d.click(0.603, 0.27) # if operate_no == 0: # self.d.xpath('//*[@text="营业执照"]').click() # else: # self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) self.screenshot_the_business_license() ocr_res = self.get_ocr_res('cropped_screenshot.png') return ocr_res # operate_no = random.randint(0, 1) self.d.click(0.603, 0.27) # if operate_no == 0: # self.d.xpath('//*[@text="营业执照"]').click() # else: # self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) self.screenshot_the_business_license() ocr_res = self.get_ocr_res('cropped_screenshot.png') return ocr_res """ def distinct_target(self): result = False position_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]' position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]' is_position = self.d.xpath(position_xpath).exists is_position2 = self.d.xpath(position_xpath2).exists xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' is_position5 = self.d.xpath(xpath).exists is_position6 = self.d.xpath(xpath2).exists is_position7 = self.d.xpath(xpath3).exists is_position8 = self.d.xpath(xpath4).exists # print(f"is_position = {is_position}") # print(f"is_position2 = {is_position2}") if result == False: print("---检测没有回到列表页---") else: print("---检测回到了列表页---") if is_position or is_position2 or is_position5 or is_position6 or is_position7 or is_position8: result = True return result # return is_position def enter_target_page(self): self.d.xpath('//*[@content-desc="看病买药"]').click() time.sleep(self.get_sleep_time()) self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click() time.sleep(self.get_sleep_time()) self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click() time.sleep(self.get_sleep_time()) self.d.send_keys(self.search_key, clear=True) time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="搜索"]').click() time.sleep(self.get_sleep_time()) # 下滑逻辑 self.click_express_send() def click_express_send(self): # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.HorizontalScrollView[last()]' slide_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' slide_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' slide_xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' slide_xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]' for i in range(1, 3): if self.d.xpath(slide_xpath).exists: bounds = self.d.xpath(slide_xpath).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动1') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break elif self.d.xpath(slide_xpath2).exists: bounds = self.d.xpath(slide_xpath2).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动2') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break elif self.d.xpath(slide_xpath3).exists: bounds = self.d.xpath(slide_xpath3).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动3') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break elif self.d.xpath(slide_xpath4).exists: bounds = self.d.xpath(slide_xpath4).info['bounds'] top = bounds['top'] bottom = bounds['bottom'] print(f'top={top}') print(f'bottom={bottom}') y = (top + bottom) // 2 print(f'y={y}') self.loggerMT.info('开始滑动4') self.d.swipe(500, y, 100, y, 0.5) time.sleep(self.get_sleep_time()) break max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' # print(f"xpath:{xpath}") # scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.HorizontalScrollView") if self.d.xpath(xpath).exists: self.d.xpath(xpath).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath快递送成功") time.sleep(self.get_sleep_time()) break elif self.d.xpath(xpath2).exists: self.d.xpath(xpath2).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath2快递送成功") time.sleep(self.get_sleep_time()) break elif self.d.xpath(xpath3).exists: self.d.xpath(xpath3).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath3快递送成功") time.sleep(self.get_sleep_time()) break elif self.d.xpath(xpath4).exists: self.d.xpath(xpath4).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath4快递送成功") time.sleep(self.get_sleep_time()) break else: print(f"第{idx}次点击xpath或xpath2或xpath3快递送都失败") time.sleep(self.get_sleep_time()) # xpath2= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' # if self.d.xpath(xpath2).exists: # self.d.xpath(xpath2).click() # print(f"第{idx}次点击xpath2快递送成功") # time.sleep(self.get_sleep_time()) # break """暂不用该功能 def get_table_license_info(self, qualification_number): try: sql = f'select business_license_company,city,credit_code from mt_drug where credit_code = "{qualification_number}"' self.mysql_client.cur.execute(sql) res = self.mysql_client.cur.fetchone() return res except: return None """ def get_clipboard(self): time.sleep(1) self.loggerMT.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息 clipboard_content = self.d.clipboard if clipboard_content is None: return '' return clipboard_content.strip() # return self.d.clipboard.strip() def clear_clipboard(self): self.d.set_clipboard("", "text/plain") def get_product_link(self): product_link = '' # 两种可能的“···”按钮 dots_xpaths = [ '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]', '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]', '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]' ] max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): if product_link: # 已经拿到则退出 break for xp in dots_xpaths: if self.d.xpath(xp).exists: print(f'{idx}-进入分享点点点') self.loggerMT.info(f'{idx}-进入分享点点点') # #先清空剪贴板的内容 # self.clear_clipboard() # print("清空剪贴板内容成功。") self.d.xpath(xp).click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) link_xpath = '//*[@text="复制链接"]' if self.d.xpath(link_xpath).exists: self.d.xpath(link_xpath).click() time.sleep(1) product_link = self.get_clipboard() time.sleep(0.5) print(f'{idx}-商品链接:{product_link}') self.loggerMT.info(f'{idx}-商品链接:{product_link}') break # 找到并执行后跳出内层循环 else: print(f'{idx}-商品链接:{product_link}') self.loggerMT.info(f'{idx}-商品链接:{product_link}') product_link = '' if not product_link and idx < max_retry: time.sleep(0.5) # 最后一次不需要再等待 return product_link def integrate_data(self): # 测试说明书详情: # instructions_info = self.safe_exec(self.get_instructions_data) # time.sleep(1000000) # 测试店铺信息 # license_info = self.safe_exec(self.get_license_info_ex) # time.sleep(1000000) # 测试定位地址 # 获取链接开始 # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text # 1、点击页面的... 先判断元素是否存在 ''' if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('1-进入分享点点点111') self.loggerMT.info('1-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'1-商品链接:{product_link}') self.loggerMT.info(f'1-商品链接:{product_link}') #清空剪切板 # self.clear_clipboard() # if self.d.xpath('//*[@text="加载更多"]').click_exists(): # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() # if self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').exists: # self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').click() # #获取剪切板的数据 # product_link = self.get_clipboard() # time.sleep(0.5) # print(f'商品链接:{product_link}') # #清空剪切板 # self.clear_clipboard() # else: # print('未找到分享按钮111') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('1-进入分享点点点222') self.loggerMT.info('1-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'1-商品链接:{product_link}') self.loggerMT.info(f'1-商品链接:{product_link}') #如果为获取到product_link 则等待0.5秒再获取 if not product_link: time.sleep(0.5) if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('2-进入分享点点点111') self.loggerMT.info('2-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'2-商品链接:{product_link}') self.loggerMT.info(f'2-商品链接:{product_link}') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('2-进入分享点点点222') self.loggerMT.info('2-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'2-商品链接:{product_link}') self.loggerMT.info(f'2-商品链接:{product_link}') #如果为获取到product_link 则等待0.5秒再获取 if not product_link: time.sleep(0.5) if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('3-进入分享点点点111') self.loggerMT.info('3-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'3-商品链接:{product_link}') self.loggerMT.info(f'3-商品链接:{product_link}') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('3-进入分享点点点222') self.loggerMT.info('3-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'3-商品链接:{product_link}') self.loggerMT.info(f'3-商品链接:{product_link}') ''' # 获取链接结束 """ 整合数据 :return: """ product, specifications = self.safe_exec(self.get_title) # 药品,规格 if product: # product, specifications = title_info # 如果关键字包含999 则 product必须包含999 和 999后面的那段字符串 ps 999感冒灵颗粒必须包含:"999"和"感冒灵颗粒" if '天力士' in self.search_key: if self.search_key == '999皮炎平曲安奈德益康唑乳膏30': temp_search_key = self.search_key.replace('天力士', '') if '天力士' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return else: if self.search_key == '史达功右美沙芬愈创甘油醚糖浆120': temp_search_key = self.search_key.replace('史达功', '') temp_search_key = temp_search_key.replace('120', '') if '史达功' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '三九胃泰养胃舒颗粒8袋': temp_search_key = self.search_key.replace('三九胃泰', '') temp_search_key = temp_search_key.replace('8袋', '') if '三九胃泰' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '今维多赐多康牌蛋白粉': temp_search_key = self.search_key.replace('今维多', '') if '今维多' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4': temp_search_key = self.search_key.replace('佳美舒', '') temp_search_key = temp_search_key.replace('4', '') if '佳美舒' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '三九胃泰颗粒20g*10': temp_search_key = self.search_key.replace('20g*10', '') if temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '三九胃泰颗粒20g*6袋': temp_search_key = self.search_key.replace('20g*6袋', '') if temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return elif self.search_key == '顺峰康王酮康他索乳膏': temp_search_key = self.search_key.replace('顺峰康王', '') if '顺峰康王' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return else: if self.search_key not in product.replace(' ', ''): self.swipe_back(1) self.unrelated_data += 1 return # if self.search_key not in product.replace(' ', ''): # self.swipe_back(1) # self.unrelated_data += 1 # return else: self.swipe_back(1) return min_price = self.drug_price() # 最低价格 sales_num = self.drug_sale_num() # 销售数量 snapshot_url = '' # 网页快照 # 在这里截图存放到OSS;#采集图片存放的oss_url; # mt_screenshot = MTScreenshot( # d=self.d, # oss_config=self.oss_config, # search_key=self.search_key # 添加这行 # ) # snapshot_url = mt_screenshot.get_oss_url() #网页快照 # 判断是否有自营的文本,有的话不需要获取店铺的信息 if self.d.xpath('//*[@text="自营"]').exists: shop = "美团自营大药房(快递电商)" # 爬取日期 scrape_date = self.get_current_date() # scrape_date = "2025-07-18" dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团'} print(f'当前数据:{dup_data}') if self.data_is_exists(dup_data): print('存在相同数据不入库') self.swipe_back(1) return else: for i in range(8): if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名1') break self.d.swipe_ext('up', 0.3) time.sleep(1) # detail_info = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info # bounds = detail_info['bounds'] # height = bounds['bottom'] - bounds['top'] # if self.d.xpath('//*[@text="进店"]').exists and height > 100: if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名2') break shop = self.get_shop_name() # 爬取日期 scrape_date = self.get_current_date() # scrape_date = "2025-07-18" dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团'} print(f'当前数据:{dup_data}') # 获取店铺信息开始 # 暂时不获取店铺信息 start is_has_enter_shop = self.has_shop() # 需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品 shop_is_exists = self.shop_is_exists_database(shop) # 存在进店 并且店铺的名称不包含美团官方的字样 print(f"已采集{self.shop_data_num}家店铺数据") if is_has_enter_shop and '美团官方' not in shop and '美团自营' not in shop and not shop_is_exists and self.shop_data_num < 500: # license_info = self.get_license_info_ex() license_info = self.safe_exec(self.get_license_info_ex) contact_address = license_info['contact_address'] qualification_number = license_info['qualification_number'] business_license_company = license_info['business_license_company'] business_license_address = license_info['business_license_address'] save_shop_data = { 'shop': shop, 'contact_address': contact_address, 'qualification_number': qualification_number, 'scrape_date': scrape_date, 'business_license_company': business_license_company, 'business_license_address': business_license_address, 'platform': '美团' } self.save_shop_info_to_database(save_shop_data) self.shop_data_num += 1 # 店铺数据数量+1 self.swipe_back(2) else: print('不采集店铺信息') # 获取店铺信息结束 # 暂时不获取店铺信息 end if self.data_is_exists(dup_data): print('存在相同数据不入库') self.swipe_back(1) return # 商品链接 product_link = self.get_product_link() if not shop: print('未获取到店铺名:开始回退') self.swipe_back(1) return if not shop or '自营' in shop: self.swipe_back(1) return time.sleep(self.get_sleep_time()) # 生产日期为空 manufacture_date = '' # 执政信息 # if is_has_enter_shop: # license_info = self.get_license_info() # business_license_company = license_info["单位名称"] # credit_code = license_info['社会信用代码'] # city_str = license_info['地址'] # # 先把省份啥的替换掉 # city_sub_str = re.sub(r'[u4e00-\u9fa5]+省', '', city_str) # try: # city = re.search(r'[\u4e00-\u9fa5]+?(市|区|县)', city_sub_str).group(0) # except: # city = city_sub_str # try: # province = self.city2province[city] # except: # province = '' # self.swipe_back(2) # else: # business_license_company = '' # credit_code = '' # city = '' # province = '' business_license_company = '' credit_code = '' city = '' province = '' expiry_date = '' manufacturer = '' approval_number = '' # 暂时不获取说明书信息 start # 是否存在说明书 # is_has_instructions = self.has_instructions() # 有的药品没有说明书,直接默认 if self.search_key == '今维多赐多康牌蛋白粉': expiry_date = '18个月' manufacturer = '华润圣海健康科技有限公司' approval_number = '食健备G202437001992' elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4': expiry_date = '24个月' manufacturer = '浙江华润三九众益制药有限公司' approval_number = '国药准字H20090152' elif self.search_key == '999可调式生理性海水鼻腔喷雾50': expiry_date = '3年' manufacturer = '江苏萨瑞斯医疗科技有限公司' approval_number = '苏械注准20212140025' elif self.search_key == '999蒲地蓝消炎片': expiry_date = '24个月' manufacturer = '特一药业集团股份有限公司' approval_number = '国药准字Z20063596' elif self.search_key == '999养胃舒颗粒': expiry_date = '36个月' manufacturer = '合肥华润神鹿药业有限公司' approval_number = '国药准字Z34020289' elif self.search_key == '999糠酸莫米松凝胶15': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20080010' elif self.search_key == '999黄芪精': expiry_date = '36个月' manufacturer = '台州南峰药业有限公司' approval_number = '国药准字Z33020783' elif self.search_key == '999复方感冒灵颗粒': expiry_date = '24个月' manufacturer = '华润三九(郴州)制药有限公司' approval_number = '国药准字Z43020334' elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20074155' elif self.search_key == '史达功右美沙芬愈创甘油醚糖浆120': expiry_date = '暂定24个月,具体有效期以实物说明书为准' manufacturer = '史达德药业(北京)有限公司' approval_number = '国药准字H11021837' elif self.search_key == '999速复康布洛芬缓释胶囊': expiry_date = '24个月' manufacturer = '北京红林制药有限公司' approval_number = '国药准字H20074172' elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒': expiry_date = '24个月' manufacturer = '重庆科瑞东和制药有限责任公司' approval_number = '国药准字Z50020420' elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g': expiry_date = '24个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20073954' elif self.search_key == '999维生素C咀嚼片': expiry_date = '24个月' manufacturer = '甘肃成纪生物药业有限公司' approval_number = '国药准字H62021166' elif self.search_key == '999强力枇杷露120ml': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字Z36021533' elif self.search_key == '999强力枇杷露150ml': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字Z36021533' elif self.search_key == '999抗病毒口服液10ml*10' or self.search_key == '999抗病毒口服液10ml*12': expiry_date = '24个月' manufacturer = '杭州华润老桐君药业有限公司' approval_number = '国药准字Z33020518' elif self.search_key == '999精氨酸布洛芬颗粒': expiry_date = '暂定36个月' manufacturer = '华润三九(唐山)药业有限公司' approval_number = '国药准字H20070139' elif self.search_key == '999糠酸莫米松乳膏10g支': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20074090' elif self.search_key == '999选平硝酸咪康唑乳膏20g': expiry_date = '24个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20074079' elif self.search_key == '999感冒清热颗粒(无糖)6g': expiry_date = '36个月' manufacturer = '合肥华润神鹿药业有限公司' approval_number = '国药准字Z20055023' elif self.search_key == '999银菊清咽颗粒': expiry_date = '30个月' manufacturer = '合肥华润神鹿药业有限公司' approval_number = '国药准字Z20026680' elif self.search_key == '999阿奇霉素片': expiry_date = '48个月' manufacturer = '浙江华润三九众益制药有限公司' approval_number = '国药准字H20084458' elif self.search_key == '999补脾益肠丸': expiry_date = '24个月' manufacturer = '惠州市九惠制药股份有限公司' approval_number = '国药准字Z44023376' elif self.search_key == '999壮骨关节丸6g*20': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44023377' elif self.search_key == '999壮骨关节胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z20080055' elif self.search_key == '999正天丸6g*15': expiry_date = '30个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020711' elif self.search_key == '999正天胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z20010142' elif self.search_key == '三九胃泰胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020704' elif self.search_key == '三九胃泰颗粒20g*10': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020705' elif self.search_key == '999感冒灵颗粒': expiry_date = '24个月' manufacturer = '华润三九(枣庄)药业有限公司' approval_number = '国药准字Z44021940' elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20': expiry_date = '36个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字H44024170' elif self.search_key == '三九胃泰颗粒20g*6袋': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44020705' elif self.search_key == '顺峰康王酮康他索乳膏': expiry_date = '24个月' manufacturer = '广东华润顺峰药业有限公司' approval_number = '国药准字H10980204' elif self.search_key == '999糠酸莫米松凝胶10': expiry_date = '36个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20080010' elif self.search_key == '999板蓝根颗粒10g*20': expiry_date = '36个月' manufacturer = '广东恒诚制药股份有限公司' approval_number = '国药准字Z44021520' elif self.search_key == '999复方氨酚烷胺胶囊' or self.search_key == '999复方氨酚烷胺胶囊12粒' or self.search_key == '999复方氨酚烷胺胶囊10粒' or self.search_key == '999复方氨酚烷胺胶囊6粒': expiry_date = '36个月' manufacturer = '华润三九(唐山)药业有限公司' approval_number = '国药准字H13021912' elif self.search_key == '999咽炎片0.26g*12片*2板': expiry_date = '24个月' manufacturer = '华润三九(黄石)药业有限公司' approval_number = '国药准字Z42021062' elif self.search_key == '999小儿止咳糖浆120' or self.search_key == '999小儿止咳糖浆225': expiry_date = '24个月' manufacturer = '华润三九(雅安)药业有限公司' approval_number = '国药准字Z51020675' elif self.search_key == '999小儿感冒颗粒6g*10' or self.search_key == '999小儿感冒颗粒6g*24': expiry_date = '36个月' manufacturer = '华润三九(枣庄)药业有限公司' approval_number = '国药准字Z37021392' elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋' or self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋': expiry_date = '36个月' manufacturer = '华润三九(黄石)药业有限公司' approval_number = '国药准字H42022510' elif self.search_key == '999感冒灵胶囊': expiry_date = '24个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z44021939' elif self.search_key == '999小儿咽扁颗粒8g*10袋': expiry_date = '24个月' manufacturer = '华润三九(黄石)药业有限公司' approval_number = '国药准字Z42021105' elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋': expiry_date = '18个月' manufacturer = '华润三九医药股份有限公司' approval_number = '国药准字Z20100067' elif self.search_key == '999感冒清热颗粒12g*18': expiry_date = '36个月' manufacturer = '山东新大陆制药有限公司' approval_number = '国药准字Z37020066' elif self.search_key == '999小柴胡颗粒10g*15': expiry_date = '24个月' manufacturer = '广东一力罗定制药有限公司' approval_number = '国药准字Z44020709' elif self.search_key == '999布洛芬混悬液': expiry_date = '24个月' manufacturer = '华润三九(南昌)药业有限公司' approval_number = '国药准字H20223755' else: is_has_instructions = self.safe_exec(self.has_instructions) # 说明书等信息 if is_has_instructions: print('开始获取说明书信息') # instructions_info = self.get_instructions_data() instructions_info = self.safe_exec(self.get_instructions_data) if instructions_info['有效期'] is not None: expiry_date = instructions_info['有效期'].strip('。') if instructions_info['生产单位'] is not None: manufacturer = instructions_info['生产单位'].strip('。') if instructions_info['批准文号'] is not None: approval_number = instructions_info['批准文号'].strip('。') else: # 没有说明书不入库 print('没有获取到说明书信息') self.swipe_back(1) return # 暂时不获取说明书信息 end self.unrelated_data = 0 if self.search_key == '999小柴胡颗粒10g*15': save_search_key = '999小柴胡颗粒' else: save_search_key = self.search_key # 爬取省份 scrape_province = '广东' # 这里先默认广东 # 是否有货 availability = '' save_data = { 'product': product, 'min_price': min_price, 'manufacture_date': manufacture_date, 'expiry_date': expiry_date, 'shop': shop, 'business_license_company': business_license_company, 'province': province, 'city': city, 'manufacturer': manufacturer, 'specification': specifications, 'approval_number': approval_number, 'product_link': product_link, 'scrape_date': scrape_date, 'scrape_province': scrape_province, 'availability': availability, 'credit_code': credit_code, 'platform': '美团', 'search_key': save_search_key, 'sales': sales_num, 'inventory': '', 'snapshot_url': snapshot_url } # ccc注释 self.save_to_database(save_data) # time.sleep(100000) time.sleep(self.get_sleep_time()) if self.distinct_target(): print('已到达搜索列表页') else: for i in range(1): print('在详情页') self.swipe_back(1) time.sleep(self.get_sleep_time()) # 最外部有个定位按钮 if self.distinct_target(): break # 主函数 def main(self, device_id, kkk=None, interval_m=None, retry_count=0): """ 主采集函数,增强异常处理和恢复机制 """ print(kkk, "采集") MAX_RETRY = 3 # 最大重试次数 spider_no = 0 # 保存设备ID用于重连 self.device_id = device_id # 1. 设备连接(支持重试) if not self.connect_devices(device_id): self.loggerMT.error(f"设备 {device_id} 连接失败,跳过本次采集") return False time.sleep(self.get_sleep_time()) # 2. 启动全局弹窗监控 self.monitor = SpiderMonitor(self) self.monitor.start() try: # 重新开启美团应用 data_s = 0 rang_page = 100 self.restart_app() self.safe_exec(self.enter_target_page) # 尝试从进度文件恢复 progress_data = self.safe_exec(self.wr_re, "读", device_id) if progress_data: self.loggerMT.info(f"从进度恢复: 第{progress_data['page']}页 - {progress_data['shop']}") self.safe_exec(self.li_or_lo, kkk) data_s += 1 rang_page = rang_page - progress_data['page'] # 滑动到上次的位置 self.loggerMT.info("滑动到上次采集的位置...") while True: if self.d.xpath(f'//*[@text="{progress_data["shop"]}"]').exists: break else: self.safe_exec(self.slide_n) else: self.loggerMT.info("无进度文件或进度文件无效,从头开始采集") for idx in range(rang_page): current_page = idx + 1 self.current_page = current_page # 保存当前页码用于异常处理 print(f'第{current_page}页') if spider_no > 30: time.sleep(60) spider_no = 0 print('目前无关数据量: ', self.unrelated_data) # 检查是否需要暂停(验证码过多) if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY: print("频繁遇到验证码,暂停程序") # self.d.toast("请处理验证码后点击继续", 30) # 等待用户点击屏幕继续 self.d.click(0, 0) # 无效点击,等待用户操作 self.monitor.verification_count = 0 if self.unrelated_data > 15: # 连续超过15个不达标的数据则停止采集 print("连续超过15个不达标的数据则停止采集") return # 采集区间 # 是否需要根据价格升降排序 # 升降序采集 if interval_m and data_s == 0: self.safe_exec(self.li_or_lo, kkk) data_s += 1 while True: if any(x >= interval_m[0] for x in self.get_prices()): break else: self.safe_exec(self.slide_n) if data_s == 0: self.safe_exec(self.li_or_lo, kkk) data_s += 1 while True: if self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').exists: break time.sleep(1) drug_lis = self.safe_exec( self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all) lis_len = len(drug_lis) print(f'当前页面共有{lis_len}个商品') for idxx, drug_one in enumerate(drug_lis, start=1): bounds = drug_one.info['bounds'] top = bounds['top'] bottom = bounds['bottom'] # height = bottom - top print(f'当前商品bottom:{bottom}') print(f'当前商品top:{top}') # if 304 <= top and bottom <= 1475: # 默认高度241的才行 if 304 <= top and bottom <= 1475: # 默认高度241的才行 1559 # print('目标-->', drug_one.info) # drug_one.click() # 获取当前元素中的属性来判断是否要点击进入采集 print(f"这页的第几个商品:{idxx}") product_title = '' price = '' shop_name = '' # 价格 price_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' price_xpath3 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' price_xpath1 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(price_xpath).exists: price_str = self.d.xpath(price_xpath).text print(f"price_xpath列表当前商品价格:{price_str}") if price_str: price = float(re.search(r'[\d\.]+', price_str).group()) elif self.d.xpath(price_xpath3).exists: price_str = self.d.xpath(price_xpath3).text print(f"price_xpath3列表当前商品价格:{price_str}") if price_str: price = float(re.search(r'[\d\.]+', price_str).group()) elif self.d.xpath(price_xpath1).exists: price_str = self.d.xpath(price_xpath1).text print(f"price_xpath1列表当前商品价格:{price_str}") if price_str: price = float(re.search(r'[\d\.]+', price_str).group()) else: price_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(price_xpath2).exists: price_str = self.d.xpath(price_xpath2).text print(f"price_xpath2列表当前商品价格:{price_str}") if price_str: price = float(re.search(r'[\d\.]+', price_str).group()) else: print(f"列表当前商品价格不存在") # price_str = self.d.xpath(f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]//*[starts-with(@text,"¥")]').text if interval_m: if kkk == "升序": if interval_m[0] > price: continue if price > interval_m[1]: print("当前区间采集完成", interval_m) return if kkk == "降序": if interval_m[1] < price: continue if price < interval_m[0]: print("当前区间采集完成", interval_m) return print(f'列表获取到价格:{price}') # 商品名称的xpath product_tittle_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' product_tittle_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(product_tittle_xpath).exists: product_title = self.d.xpath(product_tittle_xpath).text product_title = product_title[1:] if product_title.startswith('0') else product_title print(f"product_tittle_xpath列表当前商品名称:{product_title}") if '天力士' in self.search_key: self.search_key = self.search_key elif self.d.xpath(product_tittle_xpath2).exists: product_title = self.d.xpath(product_tittle_xpath2).text product_title = product_title[1:] if product_title.startswith('0') else product_title print(f"product_tittle_xpath2列表当前商品名称:{product_title}") if '天力士' in self.search_key: self.search_key = self.search_key else: print(f"列表当前商品名称不存在") # 店铺名称的xpath shop_name_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]' shop_name_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]' if self.d.xpath(shop_name_xpath).exists: shop_name = self.d.xpath(shop_name_xpath).text print(f"shop_name_xpath列表当前商品店铺名称:{shop_name}") elif self.d.xpath(shop_name_xpath2).exists: shop_name = self.d.xpath(shop_name_xpath2).text print(f"shop_name_xpath2列表当前商品店铺名称:{shop_name}") else: print(f"列表当前商品店铺名称不存在") # 自动保存进度(每采集一个商品就保存) self.auto_save_progress(device_id, idx, shop_name, product_title, price) # 如果商品的名称、价格和生产厂家都不存在则直接下一条数据。 跳过一些不是商品的数据。 if product_title == '' and price == '' and shop_name == '': continue scrape_date = self.get_current_date() if product_title and price and shop_name: # 判断数据表中是否存在 dup_data = {'product': product_title, 'min_price': price, 'shop': shop_name, 'scrape_date': scrape_date, 'platform': '美团'} if self.data_is_exists(dup_data): print('列表存在相同数据不入库') continue self.safe_exec(drug_one.click) print('点击目标药品完毕') time.sleep(2) # 采集药品信息 try: # self.integrate_data() # 检查数据库是否有数据 self.integrate_data() # 检测下是否回退到列表页 if self.distinct_target(): print('回退到列表页', True) else: if self.d.xpath('//*[@text="搜索"]').exists: print("检测到搜索按钮,重新开始采集流程") if retry_count < MAX_RETRY: # 停止当前监控线程 self.monitor.stop() self.monitor.join() # 递归重启采集(保留进度文件) print("递归重启采集,保留当前进度") self.loggerMT.warning(f"第{retry_count + 1}次重试,保留进度文件以便恢复") return self.main(device_id, kkk, interval_m, retry_count + 1) else: print("超过最大重试次数,终止程序") return else: print("无法恢复页面,终止采集") # 插入异常处理 return # print('回退到列表页失败,终止采集') # return time.sleep(self.get_sleep_time()) spider_no += 1 except Exception as e: print(f'采集药品详情数据出错:{e}') # 增加阻塞的方法: if not self.distinct_target(): for i in range(1): self.swipe_back(1) # 最外部有个定位按钮 if self.distinct_target(): break if i == 0 and not self.distinct_target(): print('页面出错,退出采集') return else: continue if self.d.xpath('//*[@text="已经到底啦"]').exists: print('已经到达列表页最底部') return self.safe_exec(self.slide_n) except u2.exceptions.SessionBrokenError as e: # 设备会话断开异常 self.loggerMT.error(f"设备会话断开: {e}") print(f"⚠️ 设备会话断开,需要重新连接: {e}") # 在抛出异常前尝试保存当前进度 try: if hasattr(self, 'current_page') and hasattr(self, 'device_id'): self.loggerMT.info("设备会话断开,尝试保存当前进度...") self.save_progress(self.device_id, self.current_page) except Exception as save_error: self.loggerMT.warning(f"保存进度失败: {save_error}") # 记录错误但不停止程序,让外层处理 raise except u2.exceptions.UiObjectNotFoundError as e: # UI元素未找到异常 self.loggerMT.warning(f"UI元素未找到: {e}") print(f"⚠️ UI元素未找到,跳过当前操作: {e}") # 尝试返回上一页 try: self.swipe_back(1) except: pass except requests.exceptions.RequestException as e: # 网络请求异常 self.loggerMT.error(f"网络请求失败: {e}") print(f"⚠️ 网络请求失败: {e}") # 等待网络恢复 print("等待30秒后继续...") time.sleep(30) except Exception as e: # 其他未预料异常 self.loggerMT.exception(f"采集过程中发生未预料异常: {e}") print(f"❌ 采集过程中发生未预料异常: {e}") # 在抛出异常前尝试保存当前进度 try: if hasattr(self, 'current_page') and hasattr(self, 'device_id'): self.loggerMT.info("发生未预料异常,尝试保存当前进度...") self.save_progress(self.device_id, self.current_page) except Exception as save_error: self.loggerMT.warning(f"保存进度失败: {save_error}") # 记录异常但不停止程序,让外层处理 raise finally: try: # 确保监控线程被停止 self.monitor.stop() self.monitor.join() # 采集完成,清理进度文件(如果采集成功) if hasattr(self, 'current_page'): try: progress_file = f'./ycwj/{self.device_id}_{self.search_key}.txt' if os.path.exists(progress_file): self.loggerMT.info(f"采集完成,清理进度文件: {progress_file}") os.remove(progress_file) # 清理检查点文件 self._cleanup_checkpoint_files(self.device_id, self.search_key) except Exception as e: self.loggerMT.warning(f"清理进度文件失败: {e}") except Exception as e: self.loggerMT.error(f"finally块执行异常: {e}") # 循环停止 self.xh = False def unitest(self): time.sleep(100000) """ 单元测试 :return: """ save_data = { 'product': "[昆中药]舒肝颗粒(低糖型)", 'min_price': 14.0, 'manufacture_date': '', 'expiry_date': '36个月', 'shop': '美团自营大药房(快递电商)', 'business_license_company': '', 'province': '', 'city': '', 'manufacturer': '昆明中药厂有限公司', 'specification': '3g*16袋/盒', 'approval_number': '国药准字Z53021161', 'product_link': '', 'scrape_date': '2025/07/09', 'scrape_province': '广东', 'availability': '', 'credit_code': '', 'platform': '美团', 'search_key': '', 'sales': '', 'inventory': '', 'snapshot_url': '' } self.save_to_database(save_data) time.sleep(100000) pass def main(): """ 主程序入口,增强异常处理和恢复机制 """ keys_list = [ '天力士复方丹参滴丸27mg180丸盒', # '999糠酸莫米松凝胶10', # 不低于26.9 # '999糠酸莫米松凝胶15', ] device_id = '' cycle_no = 0 # 轮次计数 max_cycles = 100 # 最大循环次数,防止无限循环 cycle_cooldown = 60 # 每轮之间的冷却时间(秒) # 是否循环采集 xh_d = True while xh_d and cycle_no < max_cycles: cycle_no += 1 logging.info(f'========== 第 {cycle_no} 轮采集开始 ==========') # 记录本轮开始时间 cycle_start_time = time.time() for idx, key in enumerate(keys_list, 1): logging.info(f'[{idx}/{len(keys_list)}] 开始采集关键字:{key}') mt = None try: # 创建MT实例 mt = MT(key) # 执行采集 success = mt.main(device_id, ) if success: logging.info(f'✅ 关键字 {key} 采集成功') else: logging.warning(f'⚠️ 关键字 {key} 采集失败或部分失败') xh_d = mt.xh # 如果采集成功,跳出循环进入下一轮 if success: break except u2.exceptions.SessionBrokenError as e: # 设备会话断开异常 logging.error(f'❌ 设备会话断开,需要人工干预: {e}') print(f"⚠️ 设备会话断开,请检查设备连接后继续...") # 等待用户处理 input("请处理设备连接问题后按Enter键继续...") except u2.exceptions.UiObjectNotFoundError as e: # UI元素未找到异常 logging.warning(f'⚠️ UI元素未找到,跳过当前关键字: {e}') print(f"⚠️ UI元素未找到,跳过关键字 {key}") except requests.exceptions.RequestException as e: # 网络异常 logging.error(f'🌐 网络异常,等待恢复: {e}') print(f"⚠️ 网络异常,等待30秒后重试...") time.sleep(30) except KeyboardInterrupt: # 用户中断 logging.info('用户中断采集') print("\n⚠️ 采集被用户中断") xh_d = False break except Exception as e: # 其他未预料异常 logging.exception(f'❌ 关键字 {key} 采集发生未预料异常: {e}') print(f"❌ 关键字 {key} 采集失败: {e}") finally: # 清理资源 if mt: try: # 调用清理方法(如果存在) if hasattr(mt, 'cleanup'): mt.cleanup() elif hasattr(mt, 'close'): mt.close() except Exception as cleanup_error: logging.warning(f'清理资源时发生错误: {cleanup_error}') # 计算本轮耗时 cycle_duration = time.time() - cycle_start_time logging.info(f'第 {cycle_no} 轮采集完成,耗时: {cycle_duration:.2f}秒') # 如果不是最后一轮,添加冷却时间 if xh_d and cycle_no < max_cycles: logging.info(f'等待 {cycle_cooldown} 秒后开始下一轮采集...') print(f"等待 {cycle_cooldown} 秒后开始下一轮采集...") time.sleep(cycle_cooldown) if cycle_no >= max_cycles: logging.warning(f'已达到最大循环次数 ({max_cycles}),停止采集') print(f"⚠️ 已达到最大循环次数 ({max_cycles}),停止采集") logging.info('========== 采集程序结束 ==========') print("✅ 采集程序结束") if __name__ == '__main__': main()