import requests import base64 import cv2 import uiautomator2 as u2 import time import subprocess import re import random import datetime import json from apscheduler.schedulers.blocking import BlockingScheduler # from db_mysql import mysqlClient import threading from collections import deque # import pyperclip from config import Config from logger import setup_logger import logging import concurrent.futures # from database import MySQLClient # 配置日志 # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') setup_logger("mt_spider") # 初始化日志 class SpiderMonitor(threading.Thread): """全局弹窗监控线程(增强版)""" def __init__(self, spider_instance): super().__init__(daemon=True) self.spider = spider_instance self.running = True self.pausing = threading.Event() # 主线程同步事件 self.last_verification_time = 0 self.verification_count = 0 self.MAX_VERIFICATION_RETRY = 3 self.recent_clicks = deque(maxlen=10) # 防重复点击 self.logger = logging.getLogger("SpiderMonitor") # 可配置化弹窗规则 self.popup_rules = { "simple": [ ('//*[@text="确定"]', "点击确定"), ('//*[@text="允许"]', "点击允许"), ('//*[@text="关闭"]', "点击关闭"), ('//*[@resource-id="com.sankuai.meituan:id/close"]', "关闭按钮"), ], "verification": [ '//*[contains(@text, "验证")]', '//*[contains(@text, "滑块")]', '//*[contains(@text, "依次点击")]', '//*[contains(@text, "请点击")]', '//*[contains(@text, "拖动滑块刚")]', '//*[contains(@text, "请输入图片中的内容")]', '//*[contains(@text, "用最短线连接")]', '//*[contains(@text, "请按语序依次点击")]', '//*[contains(@text, "请向右滑动滑块")]', '//*[contains(@text, "请拖动下方滑块完成拼图")]', '//*[contains(@resource-id, "captcha")]' ] } def run(self): while self.running: try: handled = self.check_and_handle_popup() time.sleep(2 if handled else 1) except Exception as e: self.logger.exception("监控线程异常: %s", e) time.sleep(3) def _is_recent_click(self, xpath): """防止重复点击同一个弹窗""" key = f"{xpath}_{int(time.time())}" if key in self.recent_clicks: return True self.recent_clicks.append(key) return False def check_and_handle_popup(self): d = self.spider.d # 1. 处理简单弹窗 for xpath, desc in self.popup_rules["simple"]: if d.xpath(xpath).exists and not self._is_recent_click(xpath): self.logger.info("检测到弹窗: %s", desc) d.xpath(xpath).click() return True # 2. 处理验证码弹窗 for xpath in self.popup_rules["verification"]: if d.xpath(xpath).exists: now = time.time() if now - self.last_verification_time < 30: return False # 30秒内不重复触发 self.last_verification_time = now self.verification_count += 1 self.logger.warning("验证码弹窗触发,等待人工处理...") if self.verification_count > self.MAX_VERIFICATION_RETRY: self.logger.error("验证码重试超限,终止任务") self.spider.stop_all() return True self.pausing.set() # 通知主线程暂停 d.toast.show("需要人工处理验证码", 120) # 等待人工处理 start = time.time() # while time.time() - start < 120*60: # if not d.xpath(xpath).exists: # self.logger.info("验证码已处理") # d.toast.show("验证完成", 2) # self.pausing.clear() # 放行主线程 # return True # time.sleep(5) while True: if not d.xpath(xpath).exists: self.logger.info("验证码已处理") d.toast.show("验证完成", 2) self.pausing.clear() # 放行主线程 return True time.sleep(5) self.logger.warning("验证码超时,重启APP") self.spider.restart_app() return True # 3. 处理广告弹窗(点击右上角) if d.xpath('//*[contains(@text, "广告")]').exists: w, h = d.info['displayWidth'], d.info['displayHeight'] d.click(w - 50, 50) self.logger.info("关闭广告弹窗") return True return False def stop(self): self.running = False # def screenshot_the_image_verify(self): # d = self.spider.d # screenshot_path = 'image_verify.png' # d.screenshot(screenshot_path) # img = cv2.imread(screenshot_path) # # 指定裁剪区域 (left, top, right, bottom) # left = 0 # top = 480 # right = 720 # bottom = 1420 # cropped_img = img[top:bottom, left:right] # if qualification_number: # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' # else: # cropped_screenshot_path = 'cropped_screenshot.png' # cv2.imwrite(cropped_screenshot_path, cropped_img) # return cropped_screenshot_path def get_access_token(): AppKey = "tRK2RhyItCSh6BzyT4CNVXQa" AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh" token_url = 'https://aip.baidubce.com/oauth/2.0/token' url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}" payload = "" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) try: return response.json()['access_token'] except: return None def get_mysql(): """ 建立并返回一个到数据库的连接对象 """ import pymysql return pymysql.connect( host = Config.DB_HOST, #"localhost", # 修改后的主机 port = Config.DB_PORT, #3306, # 添加端口号 user = Config.DB_USER, #'root', # 修改后的用户名 password = Config.DB_PASSWORD, # 修改后的密码 db = Config.DB_NAME, #"drug_data", # 修改后的数据库名 charset='utf8mb4' ) def yunma_image_content_verify(img_path): with open(img_path, 'rb') as f: b = base64.b64encode(f.read()).decode() ## 图片二进制流base64字符串 url = "http://api.jfbym.com/api/YmServer/customApi" data = { ## 关于参数,一般来说有3个;不同类型id可能有不同的参数个数和参数名,找客服获取 "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": "10103", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data).json() print(response) if response.get("msg") == "识别成功": # 获取 data 中的 data 字段 result = response.get("data", {}).get("data") if result: print(result) # 输出结果 else: print("无法获取数据") else: print("识别未成功") return result class MT: def __init__(self, key): # self.package_name = 'com.sankuai.meituan' self.package_name = Config.PACKAGE_NAME self.access_token = get_access_token() self.city2province = self.get_city_info() # host = Config.DB_HOST #"localhost" # user = Config.DB_USER #"root" # password = Config.DB_PASSWORD #"dfwy2025" # database = Config.DB_NAME #"drug_data" # port = Config.DB_PORT#3306 # print(f'数据库配置:host:{host},user:{user},password:{password},database:{database},port:{port}') self.table_name = Config.DB_TABLE #"mt_drug" self.shop_table_name = Config.DB_SHOP_TABLE # print(f'数据库表名:table_name:{self.table_name},shop_table_name:{self.shop_table_name}') # self.mysql_client = mysqlClient(host, user, password, database, port) self.loggerMT = logging.getLogger() self.search_key = key # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒 self.unrelated_data = 0 # 无关数据数量 self.shop_data_num = 0 # 店铺数据数量 def stop_app(self): self.d.app_stop(self.package_name) time.sleep(5) def start_app(self): self.d.app_start(self.package_name) time.sleep(5) def restart_app(self): """ 重启app :return: """ self.stop_app() self.start_app() @staticmethod def get_sleep_time(): # return random.randint(5, 8) return random.randint(1, 3) @staticmethod def get_current_date(): return datetime.datetime.now().strftime('%Y/%m/%d') @staticmethod def get_city_info(): """ 获取所有的省市数据 :return: """ file_path = '../kailin_city.json' with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) province = {province_one["id"]: province_one for province_one in data['province']} city2province = dict() city = data['city'] for city_one in city: name = city_one['name'] pid = city_one['pid'] if len(str(pid)) > 2: pid = int(re.match('^\d{2}', str(pid)).group()) city2province[name] = province[pid]['name'] return city2province def get_shop_name(self): """ 获取店铺名 :return: """ try: shop_name = self.d.xpath( '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text print(f'获取到店铺名:{shop_name}') return shop_name except: try: shop_name = self.d.xpath( '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text print(f'获取到店铺名2:{shop_name}') return shop_name except Exception as e: print(f'获取店铺名出错:{e}') return None def get_qualification_number(self): """ 获取资质编号 :return: """ try: qualification_number_str = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text qualification_number = qualification_number_str.strip('资质编号:').strip() return qualification_number except: return None def get_shop_address(self): try: xpath = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView' if self.d.xpath(xpath).exists: shop_address = self.d.xpath(xpath).text print(f'111-获取到店铺地址:{shop_address}') if '发货时间' in shop_address: print(f'店铺地址包含发货时间,再次获取店铺地址') xpath2 = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.TextView' if self.d.xpath(xpath2).exists: shop_address = self.d.xpath(xpath2).text print(f'222-获取到店铺地址:{shop_address}') else: print(f'222-xpath2获取店铺地址失败') else: shop_address = '' print(f'333-获取到店铺地址:{shop_address}') return shop_address except: print(f'获取店铺地址出错-get_shop_address') return None def enter_detail(self): self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/recycler"]/android.widget.FrameLayout[1]').click() time.sleep(self.get_sleep_time()) def save_to_database(self, data): print(f'保存数据到数据库:{data}') # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() # add_sql = "insert into delete_friend_table(delete_user_name,delete_user_id,delete_content,delete_time) value(%s,%s,%s,%s)" add_sql = f""" INSERT INTO {self.table_name} (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], self.get_current_date(), data['scrape_province'], data['availability'], data['credit_code'], data['platform'])) cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], data['scrape_date'], data['scrape_province'], data['availability'], data['credit_code'], data['platform'])) conn.commit() # 提交数据 #self.mysql_client.insert(self.table_name, data) print(f"存入数据库成功") def save_shop_info_to_database(self, data): print(f'保存店铺数据到数据库:{data}') # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() add_sql = f""" INSERT INTO {self.shop_table_name} (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.execute(add_sql, (data['shop'], data['contact_address'], data['qualification_number'], data['business_license_company'], data['business_license_address'], data['scrape_date'], data['platform'])) conn.commit() # 提交数据 #self.mysql_client.insert(self.shop_table_name, data) print(f'存入店铺信息到数据库成功') def swipe_up(self): """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) def swipe_back(self, no): """ 返回 :param no: 回退次数 :return: """ for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) def drug_price(self): """ 获取药品价格 :return: """ try: price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text price = float(re.search('[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return price except Exception as e: print(f'提取价格出错-->{e}') return None def restart_uiautomator_services(self, device_id): """ 重启atx的uiautomator 服务 :param device_id: :return: """ stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop' start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d' # result = subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) # print(result.stdout) subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) def connect_devices(self, device_id): """ 连接设备 :return: """ try: self.d = u2.connect_usb(device_id) # 设置隐形等待时间 # self.d.implicitly_wait(5) self.restart_uiautomator_services(device_id) print(f'连接到设备:{device_id}') except Exception as e: print(f'{device_id} 连接错误: {e}') raise Exception(e) def get_ocr_res(self, img): try: #img地址 print(f'开始识别图片:{img}') request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license" # 二进制方式打开图片文件 f = open(img, 'rb') img = base64.b64encode(f.read()) params = {"image": img} # access_token = get_access_token() request_url = request_url + "?access_token=" + self.access_token headers = {'content-type': 'application/x-www-form-urlencoded'} response = requests.post(request_url, data=params, headers=headers) if response: res = response.json() new_dic = dict() for ite in res['words_result'].keys(): new_dic[ite] = res['words_result'][ite]['words'] print('资质数据信息', new_dic) return new_dic else: return None except: return None def screenshot_the_business_license(self, qualification_number): screenshot_path = 'screenshot1.png' self.d.screenshot(screenshot_path) img = cv2.imread(screenshot_path) # 指定裁剪区域 (left, top, right, bottom) left = 0 top = 480 right = 720 bottom = 1420 cropped_img = img[top:bottom, left:right] if qualification_number: cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' else: cropped_screenshot_path = 'cropped_screenshot.png' cv2.imwrite(cropped_screenshot_path, cropped_img) return cropped_screenshot_path #获取商品title def get_title(self): # try: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text # except: # title = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text def _inner(): temp_search_key = self.search_key if "999" in self.search_key and self.search_key != "999皮炎平": temp_search_key = self.search_key.replace("999", "") # elif self.search_key == '三九胃泰颗粒': # self.search_key = '三九胃泰' #兼容三九胃泰 温胃舒颗粒 print(f'获取商品title时的搜索关键字:{temp_search_key}') # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text title = self.safe_exec( lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text ) #奇怪:有的时候title取出来的记过第一位会多一个0 # title = self.safe_exec(self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text) # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text title = title[1:] if title.startswith('0') else title print(f'获取到药品标题:{title}') # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) if self.search_key == "999感冒清热颗粒" : match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) else: match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title) if match: #drugs_name = match.group(1).strip() + match.group(2).strip() drugs_name = title specifications = match.group(3).strip() print("药品名:", drugs_name) print("规格:", specifications) # print('完整药名:', drugs_name + specifications) return drugs_name, specifications else: print("没有匹配到预期格式") return None, None # 用 safe_exec 包装内部逻辑,确保验证码阻塞 return self.safe_exec(_inner) def enter_shop(self): """ 进店,方便提取资质环境 :return: """ # self.d.xpath('//*[@text="进店"]').click() self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) def enter_shoper(self): """ 进入商家 :return: """ for i in range(10): if self.d.xpath('//*[@text="商家"]').exists: print(f'第{i}次商家存在') break else: print(f'第{i}次商家不存在') time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="商家"]').click() time.sleep(self.get_sleep_time()) #点击查看商家资质 def scan_shoper_license(self): exist_shoper = 0 for i in range(10): if self.d.xpath('//*[@text="查看商家资质"]').exists: print(f'第{i}次查看商家资质存在') exist_shoper = 1 break else: print(f'第{i}次查看商家资质不存在') time.sleep(self.get_sleep_time()) if exist_shoper == 1: self.d.xpath('//*[@text="查看商家资质"]').click() time.sleep(self.get_sleep_time()) else: self.swipe_back(1) #验证商品的信息是否在数据库中已存在 def data_is_exists(self, data): """ 检查指定数据是否已存在于数据库表中(仅检查存在性) 参数: data: 包含查询条件的字典,键为列名,值为条件值 返回: True: 数据存在 False: 数据不存在 None: 检查过程中出错 """ # dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, # 'platform': '美团'} # 1. 验证必要字段 required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] logging.error(f"缺少必要字段: {', '.join(missing)}") return None try: # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() # query_sql = f"SELECT * FROM {self.table_name} WHERE product = '{data['product']}' AND min_price = '{data['min_price']}' AND shop = '{data['shop']}' AND scrape_date = '{data['scrape_date']}' AND platform = '{data['platform']}'" # cur.execute(query_sql) query_sql = """ SELECT * FROM {} WHERE product = %s AND min_price = %s AND shop = %s AND scrape_date = %s AND platform = %s """.format(self.table_name) cur.execute(query_sql, ( data['product'], data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") #验证店铺信息是否在数据库中已存在 def shop_is_exists_database(self, shop): try: # 连接数据库 conn = get_mysql() # 创建游标对象 cur = conn.cursor() query_sql = """ SELECT * FROM {} WHERE shop = %s """.format(self.shop_table_name) cur.execute(query_sql, ( shop )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") def wait_if_verifying(self, monitor, timeout=120): """验证码处理期间阻塞主线程""" start = time.time() while monitor.pausing.is_set() and time.time() - start < timeout: time.sleep(1) # def safe_xpath(self, xpath, timeout=10): # """线程安全 xpath 查找""" # self.wait_if_verifying(self.monitor) # return self.d.xpath(xpath).wait(timeout=timeout) def wait_for_ready(self, monitor, timeout=86400): """进入每一页前都先等验证码""" start = time.time() while monitor.pausing.is_set() and time.time() - start < timeout: time.sleep(1) # 额外保险:如果验证码突然在这一秒才弹,再主动扫一次 monitor.check_and_handle_popup() def safe_list(self, xpath, monitor): """线程安全地拿商品列表""" self.wait_for_ready(monitor) return self.d.xpath(xpath).all() def safe_exec(self, func, *args, **kwargs): """ 万能安全壳:执行 func 前检查验证码, 若监控线程已置位 pausing,则一直阻塞直到放行。 """ while self.monitor.pausing.is_set(): time.sleep(1) # 执行真正逻辑 return func(*args, **kwargs) ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="查看详细说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="加载更多"]').click_exists() loop_page = 5 # new_list = list() new_list = [] for i in range(loop_page): self.d.xpath('//*[@text="加载更多"]').click_exists() time.sleep(0.2) if i == 0: self.d.swipe(200, 1000, 200, 300, 0.4) else: self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() time.sleep(0.2) all_tt = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all() for idx in range(1, len(all_tt) + 1): all_tt1 = self.d.xpath( f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all() # print(f'当前说明书列表数据:{all_tt1}') for tt in all_tt1: if tt.text and tt.text != '展开全文': new_list.append(tt.text) if i == 0: height = 938 else: drug_box = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info bounds = drug_box['bounds'] height = bounds['bottom'] - bounds['top'] if height < 938: # print('说明书翻页到底部') break # 展开全文 new_list = [item for item in new_list if item != '展开全文'] print(f'当前说明书列表数据:{new_list}') # expiry_date_index = next(idx for idx, i in enumerate(new_list) if i == '有效期') # manufacturer_index = next(idx for idx, i in enumerate(new_list) if i == '生产单位') # approval_number_index = next(idx for idx, i in enumerate(new_list) if i == '批准文号') # res_data = { # "有效期": new_list[expiry_date_index + 1], # "生产单位": new_list[manufacturer_index + 1], # "批准文号": new_list[approval_number_index + 1] # } res_data = { "有效期": (new_list[new_list.index("有效期") + 1]) if "有效期" in new_list and new_list.index("有效期") + 1 < len(new_list) else "", "生产单位": (new_list[new_list.index("生产单位") + 1]) if "生产单位" in new_list and new_list.index("生产单位") + 1 < len(new_list) else "", "批准文号": (new_list[new_list.index("批准文号") + 1]) if "批准文号" in new_list and new_list.index("批准文号") + 1 < len(new_list) else "" } print(f'当前说明书字典数据:{res_data}') return res_data ''' ''' def get_instructions_data(self): """ 确定有说明书之后,提取所有的说明书数据 :return: """ self.d.xpath('//*[@text="说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) self.d.xpath('//*[@text="查看详细说明"]').click() # time.sleep(random.randint(3, 5)) time.sleep(0.5) # 1) 先向上滑动一次,触发“加载更多”出现 self.d.swipe(200, 1000, 200, 300, 0.4) time.sleep(0.3) # 2) 再进入“出现就点”的循环 while self.d.xpath('//*[@text="加载更多"]').click_exists(timeout=1): time.sleep(0.2) self.d.swipe(200, 1000, 200, 300, 0.4) # self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) # 一次性获取所有文本 texts = [ node.text.strip() # for node in self.d.xpath('//android.widget.TextView').all() for node in self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.TextView').all() if node.text and node.text.strip() and node.text != '加载更多' ] print(f'当前说明书列表数据:{texts}') # 提取关键字段 def safe_get(key): # try: # idx = texts.index(key) # return texts[idx + 1] if idx + 1 < len(texts) else "" # except ValueError: # return "" try: idx = next(i for i, text in enumerate(texts) if text == key) return texts[idx + 1] if idx + 1 < len(texts) else "" except StopIteration: return "" res_data = { "有效期": safe_get("有效期"), "生产单位": safe_get("生产单位"), "批准文号": safe_get("批准文号") } print(f'当前说明书字典数据:{res_data}') return res_data ''' def get_instructions_data(self): """ 说明书键值对采集:连续两个 TextView 为一对,精确提取 """ # 1. 进入说明书 self.d(text="说明").click() time.sleep(0.5) self.d(text="查看详细说明").click() time.sleep(0.5) # self.d(text="加载更多").click_exists(timeout=0.5) # 2. 找到说明书最外层 ScrollView(页面主体) scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.ScrollView") count = scroll_view.count print(f"找到的 ScrollView 数量: {count}") if not scroll_view.exists: return {"有效期": "", "生产单位": "", "批准文号": ""} # 3. 在 ScrollView 内再定位真正包含键值对的容器 # 绝大多数美团说明书页面对应的是 ScrollView > ViewGroup > 若干 TextView kv_container = scroll_view.child(className="android.view.ViewGroup") if not kv_container.exists: kv_container = scroll_view # 降级:直接对 ScrollView 取子孙 TextView # 4. 滑动到底并收集所有 TextView(保留顺序) all_texts = [] max_swipe = 5 last_length = 0 for _ in range(max_swipe): texts = kv_container.child(className="android.widget.TextView") #获取texts中的文本 print(f'当前说明书列表数据:{texts}') current_texts = [] self.loggerMT.info(f'说明书111') for tv in texts: try: txt = tv.get_text().strip() # txt = tv.info['text'].strip() except Exception: continue if txt and txt != "展开全文": current_texts.append(txt) self.loggerMT.info(f'说明书222') print(f'当前说明书列表数据:{current_texts}') # 去重 if current_texts: current_texts = [t for t in current_texts if t not in all_texts] all_texts.extend(current_texts) # 判断是否到底 # if not scroll_view.info.get("scrollable"): # break # 判断是否到底 if len(all_texts) == last_length: break last_length = len(all_texts) # self.d.swipe_ext("up", scale=0.7) #向上滑动一次 self.d.swipe(200, 1000, 200, 300, 0.2) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() # 5. 成对解析 res_data = {"有效期": "", "生产单位": "", "批准文号": ""} for i in range(len(all_texts) - 1): key = all_texts[i] val = all_texts[i + 1] if key in res_data: res_data[key] = val print(f'说明书文本共 {len(all_texts)} 条,提取结果: {res_data}') # time.sleep(1000000) return res_data def has_instructions(self): """ 是否有说明书 :return: """ # 没有说明书的无法采集具体数据 time.sleep(self.get_sleep_time()) is_has_instructions = False for i in range(8): if self.d.xpath('//*[@text="说明"]').exists: print(f"第{i}次有说明书1") is_has_instructions = True break self.d.swipe_ext('down', 0.3) time.sleep(1) # detail_info = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info # bounds = detail_info['bounds'] # height = bounds['bottom'] - bounds['top'] # if self.d.xpath('//*[@text="进店"]').exists and height > 100: if self.d.xpath('//*[@text="说明"]').exists: is_has_instructions = True print(f"第{i}次有说明书2") break # is_has_instructions = self.d.xpath('//*[@text="说明"]').exists return is_has_instructions def has_shop(self): """ 是否有进店按钮 :return: """ # self.d.swipe_ext('up', 0.1) time.sleep(self.get_sleep_time()) is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists return is_has_enter_shop #获取商品对应的店铺信息 def get_license_info_ex(self): # self.enter_shop() self.safe_exec(self.enter_shop) # self.enter_shoper() self.safe_exec(self.enter_shoper) for i in range(10): if self.d.xpath('//*[@text="查看商家资质"]').exists: print(f"第{i}次有商家资质") break else: print(f"第{i}次没有商家资质") time.sleep(self.get_sleep_time()) #获取地址 # contact_address = self.get_shop_address() contact_address = self.safe_exec(self.get_shop_address) # time.sleep(50000) ### # self.scan_shoper_license() self.safe_exec(self.scan_shoper_license) # 获取资质编码 # qualification_number = self.get_qualification_number() qualification_number = self.safe_exec(self.get_qualification_number) #qualification_number 不为None继续下一步 if qualification_number: #营业执照公司名称 business_license_company = '' #营业执照地址 business_license_address = '' self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) cropped_screenshot_path = self.screenshot_the_business_license(qualification_number) print(f'cropped_screenshot_path:{cropped_screenshot_path}') # if qualification_number: # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' # else: # cropped_screenshot_path = 'cropped_screenshot.png' # ocr_res = self.get_ocr_res('cropped_screenshot.png') ocr_res = self.get_ocr_res(cropped_screenshot_path) print(f'ocr_res:{ocr_res}') #获取ocr_res 中的地址、单位名称 if ocr_res: if '单位名称' in ocr_res.keys(): business_license_company = ocr_res['单位名称'] if '地址' in ocr_res.keys(): business_license_address = ocr_res['地址'] license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number, 'business_license_company': business_license_company, 'business_license_address': business_license_address} else: license_info_data = {'contact_address': contact_address, 'qualification_number': '', 'business_license_company': '', 'business_license_address': ''} return license_info_data """暂不用该功能 def get_license_info(self): self.enter_shop() self.enter_shoper() self.scan_shoper_license() # 获取资质编码 qualification_number = self.get_qualification_number() if qualification_number: table_license_info = self.get_table_license_info(qualification_number) if table_license_info: return { '单位名称': table_license_info[0], '地址': table_license_info[1], '社会信用代码': table_license_info[2] } else: # operate_no = random.randint(0, 1) self.d.click(0.603, 0.27) # if operate_no == 0: # self.d.xpath('//*[@text="营业执照"]').click() # else: # self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) self.screenshot_the_business_license() ocr_res = self.get_ocr_res('cropped_screenshot.png') return ocr_res # operate_no = random.randint(0, 1) self.d.click(0.603, 0.27) # if operate_no == 0: # self.d.xpath('//*[@text="营业执照"]').click() # else: # self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) self.screenshot_the_business_license() ocr_res = self.get_ocr_res('cropped_screenshot.png') return ocr_res """ def distinct_target(self): is_position = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]').exists return is_position def enter_target_page(self): self.d.xpath('//*[@content-desc="看病买药"]').click() time.sleep(self.get_sleep_time()) self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click() time.sleep(self.get_sleep_time()) self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click() time.sleep(self.get_sleep_time()) self.d.send_keys(self.search_key, clear=True) time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="搜索"]').click() time.sleep(self.get_sleep_time()) #增加点击快递送 self.click_express_send() time.sleep(self.get_sleep_time()) def click_express_send(self): max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' # print(f"xpath:{xpath}") # scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.HorizontalScrollView") if self.d.xpath(xpath).exists: self.d.xpath(xpath).click() # time.sleep(self.get_sleep_time()) print(f"第{idx}次点击xpath快递送成功") time.sleep(self.get_sleep_time()) break else: print(f"第{idx}次点击xpath快递送失败") xpath2= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' if self.d.xpath(xpath2).exists: self.d.xpath(xpath2).click() print(f"第{idx}次点击xpath2快递送成功") time.sleep(self.get_sleep_time()) break """暂不用该功能 def get_table_license_info(self, qualification_number): try: sql = f'select business_license_company,city,credit_code from mt_drug where credit_code = "{qualification_number}"' self.mysql_client.cur.execute(sql) res = self.mysql_client.cur.fetchone() return res except: return None """ # def get_clipboard(self): # """通过ADB获取Android手机剪贴板内容""" # try: # result = subprocess.run( # ["adb", "shell", "am", "broadcast", "-a", "clipper.get"], # capture_output=True, # text=True, # timeout=5 # ) # print(f"获取剪贴板结果: {result.stdout}") # # 解析返回信息中的剪贴板内容 # for line in result.stdout.splitlines(): # if "data=" in line: # return line.split("data=")[1].strip() # return "" # except Exception as e: # print("获取剪贴板失败:", e) # return "" # def get_clipboard(self): # """读取 Android 剪贴板(系统自带命令)""" # try: # text = subprocess.check_output( # ["adb", "shell", "cmd", "clipboard", "get"], # text=True, timeout=5, stderr=subprocess.STDOUT # ).strip() # print(f"获取剪贴板结果: {text}") # return text if text else "" # except Exception as e: # print("获取剪贴板失败:", e) # return "" def get_clipboard(self): return self.d.clipboard.strip() def clear_clipboard(self): self.d.set_clipboard("") # def clear_clipboard(self): # """清空手机剪贴板:写入空字符串(subprocess 版)""" # try: # subprocess.run( # ["adb", "shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", " "], # check=True, # capture_output=True, # text=True, # timeout=5 # ) # except subprocess.CalledProcessError as e: # print("ADB 清空失败:", e.stderr) # def clear_clipboard(): # """清空手机剪贴板:写入空字符串""" # try: # adb_shell(["shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", ""]) # except subprocess.CalledProcessError as e: # print("ADB 清空失败:", e.output) #获取一个商品的数据、商品对应的店铺的数据 def get_product_link(self): product_link = '' # 两种可能的“···”按钮 dots_xpaths = [ '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]', '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]' ] max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): if product_link: # 已经拿到则退出 break for xp in dots_xpaths: if self.d.xpath(xp).exists: print(f'{idx}-进入分享点点点') self.loggerMT.info(f'{idx}-进入分享点点点') self.d.xpath(xp).click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) product_link = self.get_clipboard() time.sleep(0.5) print(f'{idx}-商品链接:{product_link}') self.loggerMT.info(f'{idx}-商品链接:{product_link}') break # 找到并执行后跳出内层循环 if not product_link and idx < max_retry: time.sleep(0.5) # 最后一次不需要再等待 return product_link def integrate_data(self): #测试说明书详情: # instructions_info = self.safe_exec(self.get_instructions_data) # time.sleep(1000000) #测试店铺信息 # license_info = self.safe_exec(self.get_license_info_ex) # time.sleep(1000000) #获取链接开始 #self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text #1、点击页面的... 先判断元素是否存在 ''' if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('1-进入分享点点点111') self.loggerMT.info('1-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'1-商品链接:{product_link}') self.loggerMT.info(f'1-商品链接:{product_link}') #清空剪切板 # self.clear_clipboard() # if self.d.xpath('//*[@text="加载更多"]').click_exists(): # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() # if self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').exists: # self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').click() # #获取剪切板的数据 # product_link = self.get_clipboard() # time.sleep(0.5) # print(f'商品链接:{product_link}') # #清空剪切板 # self.clear_clipboard() # else: # print('未找到分享按钮111') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('1-进入分享点点点222') self.loggerMT.info('1-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'1-商品链接:{product_link}') self.loggerMT.info(f'1-商品链接:{product_link}') #如果为获取到product_link 则等待0.5秒再获取 if not product_link: time.sleep(0.5) if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('2-进入分享点点点111') self.loggerMT.info('2-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'2-商品链接:{product_link}') self.loggerMT.info(f'2-商品链接:{product_link}') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('2-进入分享点点点222') self.loggerMT.info('2-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'2-商品链接:{product_link}') self.loggerMT.info(f'2-商品链接:{product_link}') #如果为获取到product_link 则等待0.5秒再获取 if not product_link: time.sleep(0.5) if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('3-进入分享点点点111') self.loggerMT.info('3-进入分享点点点111') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() #点击分享商品 # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'3-商品链接:{product_link}') self.loggerMT.info(f'3-商品链接:{product_link}') elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists: print('3-进入分享点点点222') self.loggerMT.info('3-进入分享点点点222') self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click() time.sleep(0.2) self.d.xpath('//*[@text="分享商品"]').click_exists() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) #获取剪切板的数据 product_link = self.get_clipboard() time.sleep(0.5) print(f'3-商品链接:{product_link}') self.loggerMT.info(f'3-商品链接:{product_link}') ''' #获取链接结束 """ 整合数据 :return: """ # title_info = self.get_title() # 药品,规格 title_info = self.safe_exec(self.get_title) # 药品,规格 if title_info: product, specifications = title_info #如果关键字包含999 则 product必须包含999 和 999后面的那段字符串 ps 999感冒灵颗粒必须包含:"999"和"感冒灵颗粒" if '999' in self.search_key: temp_search_key = self.search_key.replace('999', '') if '999' not in product or temp_search_key not in product: self.swipe_back(1) self.unrelated_data += 1 return else: if self.search_key not in product.replace(' ', ''): self.swipe_back(1) self.unrelated_data += 1 return # if self.search_key not in product.replace(' ', ''): # self.swipe_back(1) # self.unrelated_data += 1 # return else: self.swipe_back(1) return min_price = self.drug_price() # 最低价格 # 商品链接 product_link = self.get_product_link() #判断是否有自营的文本,有的话不需要获取店铺的信息 if self.d.xpath('//*[@text="自营"]').exists: shop = "美团自营大药房(快递电商)" # 爬取日期 scrape_date = self.get_current_date() # scrape_date = "2025-07-18" dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团'} print(f'当前数据:{dup_data}') if self.data_is_exists(dup_data): print('存在相同数据不入库') self.swipe_back(1) return else: for i in range(8): if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名1') break self.d.swipe_ext('up', 0.3) time.sleep(1) # detail_info = self.d.xpath( # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info # bounds = detail_info['bounds'] # height = bounds['bottom'] - bounds['top'] # if self.d.xpath('//*[@text="进店"]').exists and height > 100: if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名2') break shop = self.get_shop_name() # 爬取日期 scrape_date = self.get_current_date() # scrape_date = "2025-07-18" dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团'} print(f'当前数据:{dup_data}') #获取店铺信息开始 #暂时不获取店铺信息 start is_has_enter_shop = self.has_shop() #需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品 shop_is_exists = self.shop_is_exists_database(shop) #存在进店 并且店铺的名称不包含美团官方的字样 print(f"已采集{self.shop_data_num}家店铺数据") if is_has_enter_shop and '美团官方' not in shop and not shop_is_exists and self.shop_data_num < 50: # license_info = self.get_license_info_ex() license_info = self.safe_exec(self.get_license_info_ex) contact_address = license_info['contact_address'] qualification_number = license_info['qualification_number'] business_license_company = license_info['business_license_company'] business_license_address = license_info['business_license_address'] save_shop_data = { 'shop': shop, 'contact_address': contact_address, 'qualification_number': qualification_number, 'scrape_date': scrape_date, 'business_license_company':business_license_company, 'business_license_address':business_license_address, 'platform': '美团' } self.save_shop_info_to_database(save_shop_data) self.shop_data_num += 1 # 店铺数据数量+1 self.swipe_back(2) else: print('不采集店铺信息') #获取店铺信息结束 #暂时不获取店铺信息 end if self.data_is_exists(dup_data): print('存在相同数据不入库') self.swipe_back(1) return if not shop: print('未获取到店铺名:开始回退') self.swipe_back(1) return if not shop or '自营' in shop: self.swipe_back(1) return time.sleep(self.get_sleep_time()) # 生产日期为空 manufacture_date = '' # 执政信息 # if is_has_enter_shop: # license_info = self.get_license_info() # business_license_company = license_info["单位名称"] # credit_code = license_info['社会信用代码'] # city_str = license_info['地址'] # # 先把省份啥的替换掉 # city_sub_str = re.sub(r'[u4e00-\u9fa5]+省', '', city_str) # try: # city = re.search(r'[\u4e00-\u9fa5]+?(市|区|县)', city_sub_str).group(0) # except: # city = city_sub_str # try: # province = self.city2province[city] # except: # province = '' # self.swipe_back(2) # else: # business_license_company = '' # credit_code = '' # city = '' # province = '' business_license_company = '' credit_code = '' city = '' province = '' expiry_date = '' manufacturer = '' approval_number = '' #暂时不获取说明书信息 start #是否存在说明书 # is_has_instructions = self.has_instructions() is_has_instructions = self.safe_exec(self.has_instructions) # 说明书等信息 if is_has_instructions: print('开始获取说明书信息') # instructions_info = self.get_instructions_data() instructions_info = self.safe_exec(self.get_instructions_data) expiry_date = instructions_info['有效期'].strip('。') manufacturer = instructions_info['生产单位'].strip('。') approval_number = instructions_info['批准文号'].strip('。') else: # 没有说明书不入库 print('没有获取到说明书信息') self.swipe_back(1) return #暂时不获取说明书信息 end self.unrelated_data = 0 # 爬取省份 scrape_province = '广东' # 这里先默认广东 # 是否有货 availability = '' save_data = { 'product': product, 'min_price': min_price, 'manufacture_date': manufacture_date, 'expiry_date': expiry_date, 'shop': shop, 'business_license_company': business_license_company, 'province': province, 'city': city, 'manufacturer': manufacturer, 'specification': specifications, 'approval_number': approval_number, 'product_link': product_link, 'scrape_date': scrape_date, 'scrape_province': scrape_province, 'availability': availability, 'credit_code': credit_code, 'platform': '美团' } self.save_to_database(save_data) # time.sleep(100000) if self.distinct_target(): print('已到达搜索列表页') else: for i in range(1): self.swipe_back(1) # 最外部有个定位按钮 if self.distinct_target(): break #主函数 def main(self, device_id, retry_count=0): MAX_RETRY = 3 # 最大重试次数 spider_no = 0 self.connect_devices(device_id) time.sleep(self.get_sleep_time()) self.d.toast.show("测试toast", 20) # 启动全局弹窗监控 self.monitor = SpiderMonitor(self) self.monitor.start() try: # 重新开启美团应用 self.restart_app() # 搜索关键字 self.enter_target_page() # print('开始滑动') # self.d.drag(300, 1400, 300, 400, 1) # time.sleep(100000) for idx in range(300): print(f'第{idx + 1}页') if spider_no > 30: time.sleep(120) spider_no = 0 print('目前无关数据量: ', self.unrelated_data) # 检查是否需要暂停(验证码过多) if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY: print("频繁遇到验证码,暂停程序") self.d.toast("请处理验证码后点击继续", 30) # 等待用户点击屏幕继续 self.d.click(0, 0) # 无效点击,等待用户操作 self.monitor.verification_count = 0 # if self.unrelated_data > 10: # # 连续超过5个不达标的数据则停止采集 # break # 线程安全获取商品列表 # drug_lis = self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all() # drug_lis = self.safe_list('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout', self.monitor) while True: if self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').exists: break time.sleep(1) drug_lis = self.safe_exec(self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all) lis_len = len(drug_lis) print(f'当前页面共有{lis_len}个商品') for idxx, drug_one in enumerate(drug_lis,start = 1): bounds = drug_one.info['bounds'] top = bounds['top'] bottom = bounds['bottom'] # height = bottom - top print(f'当前商品bottom:{bottom}') print(f'当前商品top:{top}') # if 304 <= top and bottom <= 1475: # 默认高度241的才行 if 304 <= top and bottom <= 1559: # 默认高度241的才行 # print('目标-->', drug_one.info) # drug_one.click() print(f"这页的第几个商品:{idxx}") product_title = '' price = '' shop_name = '' #商品名称的xpath product_tittle_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' product_tittle_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(product_tittle_xpath).exists: product_title = self.d.xpath(product_tittle_xpath).text print(f"product_tittle_xpath列表当前商品名称:{product_title}") if '999' in self.search_key: temp_search_key = self.search_key.replace('999', '') if self.search_key == '999抗病毒口服液': if '999' not in product_title or temp_search_key not in product_title : print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif '10ml*12' not in product_title or '10ml*18' not in product_title: print(f"当前商品名称:{product_title} 不包含10*12或10*18品规") continue elif self.search_key == '999曲安奈德益康唑乳膏': if '999' not in product_title or temp_search_key not in product_title : print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif '30' not in product_title: print(f"当前商品名称:{product_title} 不包含30品规") continue elif self.search_key == '999复方感冒灵颗粒': if '999' not in product_title or temp_search_key not in product_title : print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif '14g*15' not in product_title: print(f"当前商品名称:{product_title} 不包含14g*15品规") continue else: if '999' not in product_title or temp_search_key not in product_title: print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue else: if self.search_key not in product_title.replace(' ', ''): continue elif self.d.xpath(product_tittle_xpath2).exists: product_title = self.d.xpath(product_tittle_xpath2).text print(f"product_tittle_xpath2列表当前商品名称:{product_title}") if '999' in self.search_key: temp_search_key = self.search_key.replace('999', '') if self.search_key == '999抗病毒口服液': if '999' not in product_title or temp_search_key not in product_title : print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif '10ml*12' not in product_title or '10ml*18' not in product_title: print(f"当前商品名称:{product_title} 不包含10*12或10*18品规") continue elif self.search_key == '999曲安奈德益康唑乳膏': if '999' not in product_title or temp_search_key not in product_title : print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif '30' not in product_title: print(f"当前商品名称:{product_title} 不包含30品规") continue elif self.search_key == '999复方感冒灵颗粒': if '999' not in product_title or temp_search_key not in product_title : print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue elif '14g*15' not in product_title: print(f"当前商品名称:{product_title} 不包含14g*15品规") continue else: if '999' not in product_title or temp_search_key not in product_title: print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") continue # if '999' not in product_title or temp_search_key not in product_title: # print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}") # continue else: if self.search_key not in product_title.replace(' ', ''): continue else: print(f"列表当前商品名称不存在") #价格 price_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' price_xpath3 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(price_xpath).exists: price_str = self.d.xpath(price_xpath).text print(f"price_xpath列表当前商品价格:{price_str}") if price_str: price = float(re.search('[\d\.]+', price_str).group()) elif self.d.xpath(price_xpath3).exists: price_str = self.d.xpath(price_xpath3).text print(f"price_xpath3列表当前商品价格:{price_str}") if price_str: price = float(re.search('[\d\.]+', price_str).group()) else: price_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView' if self.d.xpath(price_xpath2).exists: price_str = self.d.xpath(price_xpath2).text print(f"price_xpath2列表当前商品价格:{price_str}") if price_str: price = float(re.search('[\d\.]+', price_str).group()) else: print(f"列表当前商品价格不存在") # price_str = self.d.xpath(f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]//*[starts-with(@text,"¥")]').text print(f'列表获取到价格:{price}') #店铺名称的xpath shop_name_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]' shop_name_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]' if self.d.xpath(shop_name_xpath).exists: shop_name = self.d.xpath(shop_name_xpath).text print(f"shop_name_xpath列表当前商品店铺名称:{shop_name}") elif self.d.xpath(shop_name_xpath2).exists: shop_name = self.d.xpath(shop_name_xpath2).text print(f"shop_name_xpath2列表当前商品店铺名称:{shop_name}") else: print(f"列表当前商品店铺名称不存在") scrape_date = self.get_current_date() if product_title and price and shop_name: #判断数据表中是否存在 dup_data = {'product': product_title, 'min_price': price, 'shop': shop_name, 'scrape_date': scrape_date,'platform': '美团'} if self.data_is_exists(dup_data): print('列表存在相同数据不入库') continue self.safe_exec(drug_one.click) print('点击目标药品完毕') time.sleep(2) # 采集药品信息 try: # self.integrate_data() self.safe_exec(self.integrate_data) # 检测下是否回退到列表页 if self.distinct_target(): print('回退到列表页', True) else: if self.d.xpath('//*[@text="搜索"]').exists: print("检测到搜索按钮,重新开始采集流程") if retry_count < MAX_RETRY: # 停止当前监控线程 self.monitor.stop() self.monitor.join() # 递归重启采集 return self.main(device_id, retry_count+1) else: print("超过最大重试次数,终止程序") return else: print("无法恢复页面,终止采集") return # print('回退到列表页失败,终止采集') # return time.sleep(self.get_sleep_time()) spider_no += 1 except Exception as e: print(f'采集药品详情数据出错:{e}') #增加阻塞的方法: if not self.distinct_target(): for i in range(1): self.swipe_back(1) # 最外部有个定位按钮 if self.distinct_target(): break if i == 0 and not self.distinct_target(): print('页面出错,退出采集') return else: continue if self.d.xpath('//*[@text="已经到底啦"]').exists: print('已经到达列表页最底部') return search_list = self.d.xpath('//android.support.v7.widget.RecyclerView').info bounds = search_list['bounds'] #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom']) # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom']) # 计算滑动距离 scroll_distance = bounds['bottom'] - bounds['top'] # 正数 start_y = 1600 end_y = start_y - scroll_distance # 向上滑动,y 坐标减小 # 确保 end_y 不小于 0 end_y = max(end_y, 304) # 留出一点边距,避免滑出屏幕 # print('滑动起点 y:', start_y, '终点 y:', end_y) # self.d.swipe(200, start_y, 200, end_y, 0.4) print('开始滑动') self.d.drag(300, 1400, 300, 400, 1) # self.safe_exec(self.d.drag, 300, 1400, 300, 400, 1) print('滑动结束') #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom']) # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom']) # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'], 0.4) time.sleep(self.get_sleep_time()) finally: # 确保监控线程被停止 self.monitor.stop() self.monitor.join() def unitest(self): """ 单元测试 :return: """ save_data = { 'product':"[昆中药]舒肝颗粒(低糖型)", 'min_price': 14.0, 'manufacture_date': '', 'expiry_date': '36个月', 'shop': '美团自营大药房(快递电商)', 'business_license_company': '', 'province': '', 'city': '', 'manufacturer': '昆明中药厂有限公司', 'specification': '3g*16袋/盒', 'approval_number': '国药准字Z53021161', 'product_link': '', 'scrape_date': '2025/07/09', 'scrape_province': '广东', 'availability': '', 'credit_code': '', 'platform': '美团' } self.save_to_database(save_data) time.sleep(100000) pass def main(): keys_list = [ # '三九胃泰颗粒', # '999小柴胡颗粒', # '999强力枇杷露', # '[999]感冒清热颗粒', # '999抗病毒口服液', # '999皮炎平', # '999盐酸特比萘芬乳膏', # '999盐酸特比萘芬', # '999藿香正气合剂', # '999必无忧盐酸特比萘芬乳膏', # '999复方感冒灵颗粒', # '999糠酸莫米松凝胶', # '999铝碳酸镁咀嚼片', # '999阿奇霉素片', # '999选平硝酸咪康唑乳膏', # 按需继续添加, #2025-08-01最新 其中 藿香正气合剂两种规格 10支和6支 抗病毒口服液 12支和18支 蒲地蓝 24片 36片和44片 枇杷露225ml 小柴胡颗粒9袋和15袋 养胃舒 6袋 复方感冒灵颗粒15袋, #曲安奈德益康唑乳膏 30g 葡萄糖酸锌口服溶液 12支 18支 24支和30支, # '999藿香正气合剂', # '999糠酸莫米松凝胶', # '999抗病毒口服液', # '999蒲地蓝消炎片', # '999强力枇杷露', # '999小柴胡颗粒', # '999养胃舒', '999复方感冒灵颗粒', # '999黄芪精', '999曲安奈德益康唑乳膏', '999葡萄糖酸锌口服溶液', # '999赐多康蛋白粉', ] #美团手机号: # device_id = '1462a51f' # 设备序列号 # device_id = 'e2899b34' # 设备序列号 # device_id = '97ae80e0' # 设备序列号 device_ids = [ 'e2899b34', # 设备序列号1 '97ae80e0', # 设备序列号2 '1462a51f', # 设备序列号3 # 按需继续添加更多设备序列号 ] cycle_no = 0 # 轮次计数 while True: cycle_no += 1 logging.info(f'========== 第 {cycle_no} 轮采集开始 ==========') # 使用线程池并发执行 with concurrent.futures.ThreadPoolExecutor(max_workers=len(device_ids)) as executor: futures = [] for idx, (key, device_id) in enumerate(zip(keys_list, device_ids), 1): logging.info(f'[{idx}/{len(keys_list)}] 开始采集关键字:{key},设备:{device_id}') futures.append(executor.submit(run_task, key, device_id)) for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logging.exception(f'任务执行异常:{e}') # for idx, key in enumerate(keys_list, 1): # logging.info(f'[{idx}/{len(keys_list)}] 开始采集关键字:{key}') # try: # mt = MT(key) # 用当前关键字实例化 # mt.main(device_id) # 执行一次完整采集 # logging.info(f'关键字 {key} 本轮采集完成') # except Exception as e: # # 发生异常直接跳过该关键字,继续下一轮 # logging.exception(f'关键字 {key} 采集异常:{e}') # finally: # # 关闭当前 MT 实例资源(如有需要) # if hasattr(mt, 'close'): # mt.close() # logging.info('本轮全部关键字采集完成,等待 2 小时后下一轮...') # time.sleep(1 * 3600) # 2 小时 = 7200 秒 # keys = '小柴胡颗粒' # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒 小柴胡颗粒 # mt = MT(keys) # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒 # # mt.main('95b2c764') # mt.main('fcb3c749') def run_task(key, device_id): try: mt = MT(key) # 用当前关键字实例化 mt.main(device_id) # 执行一次完整采集 logging.info(f'关键字 {key},设备 {device_id} 本轮采集完成') except Exception as e: logging.exception(f'关键字 {key},设备 {device_id} 采集异常:{e}') finally: # 关闭当前 MT 实例资源(如有需要) if hasattr(mt, 'close'): mt.close() if __name__ == '__main__': main() # scheduler = BlockingScheduler() # scheduler.add_job(main, 'cron', hour=21, minute=30, misfire_grace_time=120) # try: # scheduler.start() # except (KeyboardInterrupt, SystemExit): # pass