| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396 |
- import sys
- import sys
- import logging
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import pymysql
- import requests
- import base64
- import cv2
- import uiautomator2 as u2
- import time
- import subprocess
- import re
- import random
- import datetime
- import json
- from aip import AipOcr
- import threading
- from collections import deque
- import numpy as np
- import secrets
- import os
- import oss2
- import urllib.parse
- from config import Config
- from logger import setup_logger
- import logging
- from PIL import Image
- from pathlib import Path
- from PIL import Image, ImageDraw, ImageFont
- import http.client
- _DEFAULT_PATH = Path(__file__).with_name("city.json")
- import city_name_to_id
- setup_logger("mt_spider") # 初始化日志
- def get_access_token():
- AppKey = "tRK2RhyItCSh6BzyT4CNVXQa"
- AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh"
- token_url = 'https://aip.baidubce.com/oauth/2.0/token'
- url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}"
- payload = ""
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- try:
- return response.json()['access_token']
- except:
- return None
- def get_mysql():
- # return pymysql.connect(
- # host='39.108.116.125', # "localhost", # 修改后的主机
- # port=3306, # 3306, # 添加端口号
- # user='drug_retrieve', # 'root', # 修改后的用户名
- # password='Pem287cwM58jNpe2', # 修改后的密码
- # db='drug_retrieve', #
- # charset='utf8mb4'
- # )
- return pymysql.connect(
- host='120.24.49.2', # 修改后的主机
- port=3306, # 添加端口号
- user='drug_retrieve', # 修改后的用户名
- password='ksCt3xm6chzdkafj', # 修改后的密码
- db='drug_retrieve', # 修改后的数据库名
- charset='utf8mb4'
- )
- SCHEDULER_INTERVAL_SECONDS = 600
- PLATFORM_MT = 4
- TASK_STATUS_PENDING = 1
- DEVICE_STATUS_IDLE = 0
- dispatch_lock = threading.Lock()
- running_task_ids = set()
- running_device_ids = set()
- worker_threads = {}
- scheduler_stop_event = threading.Event()
- scheduler_timer = None
- def parse_optional_int(value, default=None):
- if value in (None, ""):
- return default
- try:
- return int(value)
- except (TypeError, ValueError):
- return default
- def parse_spec_list(value):
- if value is None:
- return []
- if isinstance(value, (list, tuple)):
- return [str(item).strip() for item in value if str(item).strip()]
- text = str(value).strip()
- if not text:
- return []
- parts = re.split(r"[,,/\s]+", text)
- return [part.strip() for part in parts if part.strip()]
- def fetch_pending_tasks():
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cursor:
- sql = """
- SELECT *
- FROM retrieve_collect_task_allocate
- WHERE platform = %s AND status = %s
- ORDER BY id ASC
- """
- cursor.execute(sql, (PLATFORM_MT, TASK_STATUS_PENDING))
- return cursor.fetchall()
- except Exception as e:
- logging.exception(f"读取美团待执行任务失败: {e}")
- return []
- finally:
- if conn:
- conn.close()
- def fetch_idle_device_by_equipment_id(equipment_id):
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cursor:
- sql = """
- SELECT *
- FROM retrieve_collect_equipment
- WHERE name LIKE %s AND id = %s AND status = %s
- LIMIT 1
- """
- cursor.execute(sql, ('%mt%', equipment_id, DEVICE_STATUS_IDLE))
- return cursor.fetchone()
- except Exception as e:
- logging.exception(f"读取美团空闲设备失败 equipment_id={equipment_id}: {e}")
- return None
- finally:
- if conn:
- conn.close()
- def build_mt_task_payload(task_row, device_row):
- title_key = (task_row[5] if len(task_row) > 5 and task_row[5] is not None else "").strip()
- spec_list = parse_spec_list(task_row[6] if len(task_row) > 6 else None)
- brand = (task_row[7] if len(task_row) > 7 and task_row[7] is not None else "").strip()
- search_key = f"{brand}{title_key}".strip() or title_key
- start_page = parse_optional_int(task_row[9] if len(task_row) > 9 else None, None)
- end_page = parse_optional_int(task_row[10] if len(task_row) > 10 else None, None)
- page_range = []
- if start_page and end_page:
- page_range = [start_page, end_page]
- return {
- "task_id": task_row[0],
- "equipment_id": task_row[2],
- "enterprise_id": task_row[3] if len(task_row) > 3 else None,
- "platform": task_row[4],
- "title_key": title_key,
- "spec_list": spec_list,
- "brand": brand,
- "search_key": search_key,
- "sort": "升序",
- "collect_range": [],
- "page_range": page_range,
- "start_page": start_page,
- "end_page": end_page,
- "device_id": device_row[2],
- "workflow_retry_limit": {
- "start_app": 3,
- "open_product_list_page": 3,
- "collect_single_product": 3,
- },
- "workflow_error_action": {
- "start_app": "start_app",
- "open_product_list_page": "start_app",
- "collect_single_product": "back_to_list_page",
- },
- "task_row": task_row,
- }
- def fetch_runnable_mt_task_payloads():
- tasks = fetch_pending_tasks()
- if not tasks:
- logging.info("当前没有美团待执行任务")
- return []
- payloads = []
- reserved_equipment_ids = set()
- for task_row in tasks:
- task_id = task_row[0]
- equipment_id = task_row[2]
- with dispatch_lock:
- if task_id in running_task_ids:
- continue
- if equipment_id in reserved_equipment_ids:
- continue
- device_row = fetch_idle_device_by_equipment_id(equipment_id)
- if not device_row:
- logging.info(f"美团任务 {task_id} 对应设备 {equipment_id} 当前不空闲,跳过本轮")
- continue
- device_id = device_row[2]
- with dispatch_lock:
- if device_id in running_device_ids:
- logging.info(f"美团设备 {device_id} 已在本进程执行任务,跳过任务 {task_id}")
- continue
- running_task_ids.add(task_id)
- running_device_ids.add(device_id)
- reserved_equipment_ids.add(equipment_id)
- payloads.append(build_mt_task_payload(task_row, device_row))
- return payloads
- def cleanup_finished_workers():
- dead_threads = []
- with dispatch_lock:
- for device_id, thread in worker_threads.items():
- if not thread.is_alive():
- dead_threads.append(device_id)
- for device_id in dead_threads:
- worker_threads.pop(device_id, None)
- def run_mt_task_worker(task_payload):
- task_id = task_payload["task_id"]
- device_id = task_payload["device_id"]
- mt = None
- try:
- logging.info(f"[美团任务 {task_id}] 开始执行,设备: {device_id}")
- mt = MT(
- task_payload["search_key"],
- task_payload["title_key"],
- task_payload["spec_list"],
- task_payload["brand"],
- task_payload.get("sort"),
- task_payload.get("collect_range"),
- task_payload.get("page_range"),
- task_payload.get("workflow_retry_limit"),
- task_payload.get("workflow_error_action"),
- )
- mt.main(device_id)
- logging.info(f"[美团任务 {task_id}] 执行完成,设备: {device_id}")
- except Exception as e:
- logging.exception(f"[美团任务 {task_id}] 执行异常,设备: {device_id},错误: {e}")
- finally:
- if mt and hasattr(mt, 'close'):
- mt.close()
- with dispatch_lock:
- running_task_ids.discard(task_id)
- running_device_ids.discard(device_id)
- worker_threads.pop(device_id, None)
- def dispatch_pending_tasks():
- cleanup_finished_workers()
- task_payloads = fetch_runnable_mt_task_payloads()
- if not task_payloads:
- return
- for task_payload in task_payloads:
- device_id = task_payload["device_id"]
- try:
- thread = threading.Thread(
- target=run_mt_task_worker,
- args=(task_payload,),
- daemon=True,
- name=f"mt-{device_id}",
- )
- with dispatch_lock:
- worker_threads[device_id] = thread
- thread.start()
- logging.info(f"[美团任务 {task_payload['task_id']}] 已分发到设备 {device_id}")
- except Exception:
- with dispatch_lock:
- running_task_ids.discard(task_payload["task_id"])
- running_device_ids.discard(device_id)
- worker_threads.pop(device_id, None)
- raise
- def schedule_dispatch(delay_seconds=SCHEDULER_INTERVAL_SECONDS):
- global scheduler_timer
- if scheduler_stop_event.is_set():
- return
- scheduler_timer = threading.Timer(delay_seconds, scheduled_dispatch_job)
- scheduler_timer.daemon = False
- scheduler_timer.name = "mt-scheduler"
- scheduler_timer.start()
- def scheduled_dispatch_job():
- try:
- dispatch_pending_tasks()
- except Exception as e:
- logging.exception(f"美团定时调度异常: {e}")
- finally:
- schedule_dispatch(SCHEDULER_INTERVAL_SECONDS)
- class SpiderMonitor(threading.Thread):
- """全局弹窗监控线程(增强版)"""
- def __init__(self, spider_instance):
- super().__init__(daemon=True)
- self.spider = spider_instance
- self.running = True
- self.pausing = threading.Event() # 主线程同步事件
- self.last_verification_time = 0
- self.verification_count = 0
- self.MAX_VERIFICATION_RETRY = 10
- self.recent_clicks = deque(maxlen=10) # 防重复点击
- self.logger = logging.getLogger("SpiderMonitor")
- # 可配置化弹窗规则
- self.popup_rules = {
- "simple": [
- ('//*[@text="确定"]', "点击确定"),
- ('//*[@text="允许"]', "点击允许"),
- ('//*[@text="关闭"]', "点击关闭"),
- ('//*[@resource-id="com.sankuai.meituan:id/close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/address_center_location_close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/location_close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/btn_close"]', "关闭按钮"),
- ],
- "verification": [
- '//*[contains(@text, "验证")]',
- '//*[contains(@text, "滑块")]',
- '//*[contains(@text, "依次点击")]',
- '//*[contains(@text, "请点击")]',
- '//*[contains(@text, "拖动滑块刚")]', # 这个需要拖动滑块至最右边,然后再截图
- '//*[contains(@text, "请输入图片中的内容")]',
- '//*[contains(@text, "用最短线连接")]',
- '//*[contains(@text, "请按语序依次点击")]',
- '//*[contains(@text, "请向右滑动滑块")]',
- '//*[contains(@text, "请拖动下方滑块完成拼图")]',
- '//*[contains(@resource-id, "captcha")]'
- ]
- }
- def run(self):
- while self.running:
- try:
- handled = self.check_and_handle_popup()
- time.sleep(2 if handled else 1)
- except Exception as e:
- self.logger.exception("监控线程异常: %s", e)
- time.sleep(1)
- def _is_recent_click(self, xpath):
- """防止重复点击同一个弹窗"""
- key = f"{xpath}_{int(time.time())}"
- if key in self.recent_clicks:
- return True
- self.recent_clicks.append(key)
- return False
- def check_and_handle_popup(self):
- d = self.spider.d
- # 1. 处理简单弹窗
- for xpath, desc in self.popup_rules["simple"]:
- if d.xpath(xpath).exists and not self._is_recent_click(xpath):
- self.logger.info("检测到弹窗: %s", desc)
- d.xpath(xpath).click()
- return True
- # 2. 处理验证码弹窗
- for xpath in self.popup_rules["verification"]:
- if d.xpath(xpath).exists:
- now = time.time()
- if now - self.last_verification_time < 30:
- return False # 30秒内不重复触发
- self.last_verification_time = now
- self.verification_count += 1
- self.logger.warning("验证码弹窗触发,等待人工处理...")
- if self.verification_count > self.MAX_VERIFICATION_RETRY:
- self.logger.error("验证码重试超限,终止任务")
- self.spider.stop_all()
- return True
- self.pausing.set() # 通知主线程暂停
- # d.toast.show("需要人工处理验证码", 120)
- while True:
- if not d.xpath(xpath).exists:
- self.logger.info("验证码已处理")
- # d.toast.show("验证完成", 2)
- self.pausing.clear() # 放行主线程
- return True
- time.sleep(2)
- self.logger.warning("验证码超时,重启APP")
- self.spider.restart_app()
- return True
- # 3. 处理广告弹窗(点击右上角)
- if d.xpath('//*[contains(@text, "广告")]').exists:
- w, h = d.info['displayWidth'], d.info['displayHeight']
- d.click(w - 50, 50)
- self.logger.info("关闭广告弹窗")
- return True
- return False
- def stop(self):
- self.running = False
- class MTScreenshot:
- def __init__(self, d, oss_config, search_key, title_key, scroll_times=4, compress_quality=7, resize_ratio=0.8):
- # 接收外部已连接好的u2设备实例
- self.d = d
- self.search_key = search_key # 添加这行
- self.title_key = title_key
- # 启动全局弹窗监控
- self.monitor = SpiderMonitor(self)
- self.monitor.start()
- self.loggerMT = logging.getLogger()
- # 日志初始化
- self.logger = self._init_logger()
- # OSS配置与初始化(核心配置,无冗余)
- self.oss_config = oss_config
- self.oss_bucket = self._init_oss_bucket()
- # 截图核心参数
- self.scroll_times = scroll_times
- self.compress_quality = compress_quality
- self.resize_ratio = resize_ratio
- def _init_logger(self):
- # 极简日志配置,仅保留必要输出
- logger = logging.getLogger("mt_screenshot")
- logger.setLevel(logging.INFO)
- logger.handlers.clear()
- handler = logging.StreamHandler()
- handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
- logger.addHandler(handler)
- return logger
- def _init_oss_bucket(self):
- # 仅做OSS配置校验和Bucket连接,无额外功能
- if not all([self.oss_config.get("access_key_id"),
- self.oss_config.get("access_key_secret"),
- self.oss_config.get("endpoint"),
- self.oss_config.get("bucket_name")]):
- self.logger.warning("OSS配置不完整,无法上传")
- return None
- try:
- auth = oss2.Auth(self.oss_config["access_key_id"], self.oss_config["access_key_secret"])
- bucket = oss2.Bucket(auth, self.oss_config["endpoint"], self.oss_config["bucket_name"])
- bucket.get_bucket_info() # 验证连接
- self.logger.info("OSS Bucket连接成功")
- return bucket
- except Exception as e:
- self.logger.error(f"OSS Bucket连接失败: {e}")
- return None
- def _upload_to_oss(self, local_path):
- # 极简上传逻辑,仅返回OSS URL或None
- if not self.oss_bucket or not os.path.exists(local_path):
- return None
- file_name = os.path.basename(local_path)
- safe_name = re.sub(r'[^\w\.\-]', '_', file_name)
- oss_key = f"{self.oss_config.get('oss_prefix', 'scrape_data/')}{safe_name}"
- try:
- oss2.resumable_upload(self.oss_bucket, oss_key, local_path)
- # 生成并返回完整OSS URL
- oss_file_url = f"https://{self.oss_config['bucket_name']}.{self.oss_config['endpoint']}/{urllib.parse.quote(oss_key, safe='/')}"
- self.logger.info(f"OSS上传成功: {oss_file_url}")
- return oss_file_url
- except Exception as e:
- self.logger.error(f"OSS上传失败: {e}")
- return None
- def safe_exec(self, func, *args, **kwargs):
- """
- 万能安全壳:执行 func 前检查验证码,
- 若监控线程已置位 pausing,则一直阻塞直到放行。
- """
- while self.monitor.pausing.is_set():
- time.sleep(1)
- # 执行真正逻辑
- return func(*args, **kwargs)
- def _get_title(self):
- # try:
- def _inner():
- print(f'获取商品title时的搜索关键字:{self.title_key}')
- # 初始化
- drugs_name = ''
- specifications = ''
- title = ''
- # 循环的获取title为了有时间来处理人机验证
- for m in range(1, 6000):
- if self.d.xpath(f'//*[contains(@text, "{self.title_key}")]').exists:
- title = self.safe_exec(
- lambda: self.d.xpath(f'//*[contains(@text, "{self.title_key}")]').text
- )
- self.loggerMT.info(f"第{m}次获取title成功")
- print(f"第{m}次获取title成功")
- break
- else:
- time.sleep(1)
- # return drugs_name, specifications
- title = title[1:] if title.startswith('0') else title
- print(f'获取到药品标题:{title}')
- match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title)
- if match:
- drugs_name = title
- specifications = match.group(3).strip()
- print("药品名:", drugs_name)
- print("规格:", specifications)
- # print('完整药名:', drugs_name + specifications)
- return drugs_name # , specifications
- else:
- drugs_name = title
- specifications = ''
- return drugs_name
- # 用 safe_exec 包装内部逻辑,确保验证码阻塞
- return self.safe_exec(_inner)
- def _merge_screenshots(self, screens):
- # 仅拼接截图,无额外功能
- if len(screens) == 1:
- return screens[0].convert('RGB')
- rgb_screens = [s.convert('RGB') for s in screens]
- total_width = rgb_screens[0].width
- total_height = sum(s.height for s in rgb_screens)
- merged_img = Image.new('RGB', (total_width, total_height))
- y_offset = 0
- for img in rgb_screens:
- merged_img.paste(img, (0, y_offset))
- y_offset += img.height
- return merged_img
- def get_oss_url(self):
- """核心方法:截图+临时本地保存+上传OSS+上传成功删本地文件+返回OSS URL,可直接赋值给oss_file"""
- local_file_path = None
- try:
- # 1. 提取标题
- title = self._get_title()
- self.logger.info(f"获取标题: {title[:20]}..." if title else "未获取到标题")
- # 2. 生成本地文件路径
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- safe_title = re.sub(r'[\\/*?:"<>|]', '_', title)
- local_dir = "../scrape_data"
- os.makedirs(local_dir, exist_ok=True)
- local_file_path = os.path.join(local_dir, f"{timestamp}_{safe_title}.jpg")
- # 3. 滚动截图
- screen_list = [self.d.screenshot()]
- w, h = self.d.window_size()
- for i in range(self.scroll_times):
- # 可能滑动距离太短,截不到店名。原本是0.8
- # self.d.swipe(w // 2, h * 0.9, w // 2, h * 0.1, duration=random.uniform(0.6, 1.2))
- self.d.swipe(w // 2, h * 0.85, w // 2, h * 0.15, # 滑动到15%
- duration=random.uniform(0.8, 1.5))
- time.sleep(random.uniform(2.0, 4.0))
- screen_list.append(self.d.screenshot())
- if self.d(textContains='商家服务').exists:
- # 看情况是否需要补滑
- break
- # 4. 拼接+压缩+保存
- merged_img = self._merge_screenshots(screen_list)
- if 0.1 < self.resize_ratio < 1.0:
- new_size = (int(merged_img.width * self.resize_ratio), int(merged_img.height * self.resize_ratio))
- resample_mode = Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS
- merged_img = merged_img.resize(new_size, resample_mode)
- # 临时保存到本地
- merged_img.save(local_file_path, format='JPEG', quality=self.compress_quality)
- merged_img.close() # 释放长图句柄
- self.logger.info(f"临时本地保存: {local_file_path}")
- # 5. 上传OSS
- oss_url = self._upload_to_oss(local_file_path)
- # 6. 核心:OSS上传成功后,删除本地临时文件
- if oss_url is not None:
- try:
- self.logger.info(f"✅ OSS上传成功,已删除本地临时文件: {local_file_path}")
- except Exception as e:
- self.logger.warning(f"⚠️ OSS上传成功,但删除本地文件失败: {e}")
- return oss_url
- except Exception as e:
- self.logger.error(f"截图/上传失败: {e}")
- return None
- class MT:
- def __init__(
- self,
- key,
- title_key,
- spec_list,
- brand,
- sort=None,
- collect_range=None,
- page_range=None,
- workflow_retry_limit=None,
- workflow_error_action=None,
- ):
- self.package_name = Config.PACKAGE_NAME
- self.access_token = get_access_token()
- self.city2province = self.get_city_info()
- self.APP_ID = '116857964'
- self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh'
- self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3'
- self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY)
- self.city_to_name = city_name_to_id.build_city_name_to_id(_DEFAULT_PATH)
- self.table_name = "retrieve_scrape_data"
- self.shop_table_name = "mt_shop_info_middle"
- self.loggerMT = logging.getLogger()
- self.page = 0
- self.search_key = key
- self.title_key = title_key
- self.spec_list = spec_list
- self.brand = brand
- self.sort = sort
- self.collect_range = self.normalize_collect_range(collect_range)
- self.page_range = self.normalize_page_range(page_range)
- self.sort_key = 0
- self.unrelated_data = 0
- self.shop_data_num = 0
- self.max_unrelated_data = 15
- self.collection_cursor = {"page_no": 1, "item_index": 0}
- self.workflow_retry_limit = workflow_retry_limit or {
- "start_app": 3,
- "open_product_list_page": 3,
- "collect_single_product": 3,
- }
- self.workflow_error_action = workflow_error_action or {
- "start_app": "start_app",
- "open_product_list_page": "start_app",
- "collect_single_product": "back_to_list_page",
- }
- @staticmethod
- def normalize_collect_range(collect_range):
- if not collect_range:
- return None
- start = None
- end = None
- if isinstance(collect_range, dict):
- start = collect_range.get("start")
- end = collect_range.get("end")
- elif isinstance(collect_range, (list, tuple)) and len(collect_range) >= 2:
- start, end = collect_range[0], collect_range[1]
- elif isinstance(collect_range, str):
- matched = re.match(r"^\s*(\d+(?:\.\d+)?)\s*[-,~]\s*(\d+(?:\.\d+)?)\s*$", collect_range)
- if matched:
- start, end = matched.group(1), matched.group(2)
- try:
- start = float(start)
- end = float(end)
- except (TypeError, ValueError):
- return None
- if start < 0 or end < 0:
- return None
- if start > end:
- start, end = end, start
- return {"start": start, "end": end}
- @staticmethod
- def normalize_page_range(page_range):
- if not page_range:
- return None
- start = None
- end = None
- if isinstance(page_range, dict):
- start = page_range.get("start")
- end = page_range.get("end")
- elif isinstance(page_range, (list, tuple)) and len(page_range) >= 2:
- start, end = page_range[0], page_range[1]
- elif isinstance(page_range, str):
- matched = re.match(r"^\s*[\[\(]?\s*(\d+)\s*[,,\-~]\s*(\d+)\s*[\]\)]?\s*$", page_range)
- if matched:
- start, end = matched.group(1), matched.group(2)
- try:
- start = int(float(start))
- end = int(float(end))
- except (TypeError, ValueError):
- return None
- if start <= 0 or end <= 0:
- return None
- if start > end:
- start, end = end, start
- return {"start": start, "end": end}
- def stop_app(self):
- self.d.app_stop(self.package_name)
- time.sleep(1)
- def start_app(self):
- self.d.app_start(self.package_name)
- time.sleep(1)
- def restart_app(self):
- self.stop_app()
- self.start_app()
- def li_or_lo(self, key):
- if key == "升序":
- self.sort_key += 1
- self.d.xpath('//*[@text="价格"]').click()
- n = self.d.xpath('//*[@text="总价低到高"]')
- if n.exists:
- n.click()
- time.sleep(1)
- if key == "降序":
- self.sort_key += 1
- self.d.xpath('//*[@text="价格"]').click()
- time.sleep(2)
- self.d.xpath('//*[@text="价格"]').click()
- def wr_re(self, mod, device_id, sort=None, page=None):
- file_path = f'./ycwj/{device_id}_{self.title_key}.txt'
- if mod == "写":
- try:
- data = {
- "page": page if page else "",
- "sort": sort if sort else "",
- }
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f"进度保存成功:{sort},{page}页")
- except Exception as e:
- print("保存进度失败")
- elif mod == "读":
- try:
- if not os.path.exists(file_path):
- return None
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- print(self.sort)
- if self.sort and self.sort_key == 0:
- self.li_or_lo(self.sort)
- i = 0
- while True:
- if i == data['page']:
- self.page = data['page']
- print("当前页", self.page)
- break
- else:
- i += 1
- self.d.drag(300, 1400, 300, 400, 1)
- return data
- except Exception as e:
- print(f"读取进度失败")
- return None
- return None
- # 任何一个spec满足都算有效
- def is_link_spec_useful(self, product_title):
- if len(self.spec_list) == 0:
- return True
- for spec in self.spec_list:
- if spec in product_title:
- return True
- return False
- # TODO 继续优化这里的判断逻辑,可以考虑搭配config的修改
- def is_link_useful(self, product_title):
- if self.title_key != "" and self.title_key not in product_title:
- print(f"当前商品名称:{product_title} 不包含{self.title_key}关键字")
- return False
- if self.brand != "" and self.brand not in product_title:
- print(f"当前商品名称:{product_title} 不包含{self.brand}品牌")
- return False
- if not self.is_link_spec_useful(product_title):
- print(f"当前商品名称:{product_title} 不包含{self.spec_list}品规")
- return False
- return True
- @staticmethod
- def get_sleep_time():
- # return random.randint(5, 8)
- # return random.randint(1, 2)
- return 1
- @staticmethod
- def get_current_date():
- return datetime.datetime.now().strftime('%Y/%m/%d')
- @staticmethod
- def get_city_info():
- """
- 获取所有的省市数据
- :return:
- """
- file_path = '../kailin_city.json'
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- province = {province_one["id"]: province_one for province_one in data['province']}
- city2province = dict()
- city = data['city']
- for city_one in city:
- name = city_one['name']
- pid = city_one['pid']
- if len(str(pid)) > 2:
- pid = int(re.match('^\d{2}', str(pid)).group())
- city2province[name] = province[pid]['name']
- return city2province
- def get_shop_name_from_current_page(self):
- """
- 仅从当前商品详情页读取店铺名,不做任何页面跳转。
- """
- shop_name = self.get_first_text_by_xpaths([
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView',
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView',
- ])
- if shop_name:
- print(f'获取到店铺名:{shop_name}')
- return shop_name
- def get_shop_name(self):
- """
- 获取店铺名
- :return:
- """
- shop_name = self.get_shop_name_from_current_page()
- if shop_name:
- return shop_name
- try:
- # 点击店铺进入后获取店铺名称
- print("点击店铺进入后获取店铺名称")
- self.enter_shop()
- shop_xpath = '//*[@resource-id="com.sankuai.meituan:id/layout_header_view"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]//android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(shop_xpath).exists:
- shop_name = self.d.xpath(shop_xpath).text
- self.swipe_back(1)
- return shop_name
- shop_name = ''
- return shop_name
- except Exception as e:
- print(f'获取店铺名出错:{e}')
- return ''
- def get_qualification_number(self):
- """
- 获取资质编号
- :return:
- """
- try:
- qualification_number_str = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text
- qualification_number = qualification_number_str.strip('资质编号:').strip()
- return qualification_number
- except:
- return None
- def get_shop_address(self):
- try:
- shop_address_xpaths = [
- '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView',
- '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.TextView',
- ]
- shop_address = self.get_first_text_by_xpaths(shop_address_xpaths)
- print(f'111-获取到店铺地址:{shop_address}')
- if '发货时间' in shop_address:
- print(f'店铺地址包含发货时间,再次获取店铺地址')
- shop_address = self._read_xpath_text(shop_address_xpaths[1])
- if shop_address:
- print(f'222-获取到店铺地址:{shop_address}')
- else:
- print(f'222-xpath2获取店铺地址失败')
- print(f'333-获取到店铺地址:{shop_address}')
- return shop_address
- except:
- print(f'获取店铺地址出错-get_shop_address')
- return None
- def execute_db_write(self, sql, params, action_desc, max_retries=5):
- for attempt in range(max_retries):
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cur:
- cur.execute(sql, params)
- conn.commit()
- print(f"{action_desc}成功")
- return True
- except Exception as e:
- print(f'{action_desc}异常 (尝试 {attempt + 1}/{max_retries}): {e}')
- if conn:
- conn.rollback()
- if attempt == max_retries - 1:
- print(f"{action_desc}失败,达到最大重试次数")
- return False
- time.sleep(2)
- finally:
- if conn:
- conn.close()
- def query_exists(self, sql, params, error_desc):
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return bool(cur.fetchone())
- except Exception as e:
- print(f"{error_desc}错误: {str(e)}")
- return None
- finally:
- if conn:
- conn.close()
- def save_to_database(self, data):
- print(f'保存数据到数据库:{data}')
- add_sql = f"""
- INSERT INTO {self.table_name} (
- enterprise_id, platform_id, platform_item_id, province_id, city_id,
- province_name, city_name, area_info, product_name, product_specs,
- one_box_price, manufacture_date, expiry_date, manufacturer, approval_number,
- is_sold_out, online_posting_count, continuous_listing_count, link_url,
- store_name, store_url, shipment_province_id, shipment_province_name,
- shipment_city_id, shipment_city_name, company_name, qualification_number,
- scrape_date, min_price, number, sales, inventory, snapshot_url, insert_time
- ) VALUES (
- %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s, %s, %s, %s, %s, %s
- )
- """
- params = (
- data['enterprise_id'],
- data['platform_id'],
- data['platform_item_id'],
- data['province_id'],
- data['city_id'],
- data['province_name'],
- data['city_name'],
- data['area_info'],
- data['product_name'],
- data['product_specs'],
- data['one_box_price'],
- data['manufacture_date'],
- data['expiry_date'],
- data['manufacturer'],
- data['approval_number'],
- data['is_sold_out'],
- data['online_posting_count'],
- data['continuous_listing_count'],
- data['link_url'],
- data['store_name'],
- data['store_url'],
- data['shipment_province_id'],
- data['shipment_province_name'],
- data['shipment_city_id'],
- data['shipment_city_name'],
- data['company_name'],
- data['qualification_number'],
- data['scrape_date'],
- data['min_price'],
- data['number'],
- data['sales'],
- data['inventory'],
- data['snapshot_url'],
- data['insert_time'],
- )
- return self.execute_db_write(add_sql, params, "保存商品数据到数据库")
- def save_shop_info_to_database(self, data):
- print(f'保存店铺数据到数据库:{data}')
- add_sql = f"""
- INSERT INTO {self.shop_table_name}
- (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
- params = (
- data['shop'],
- data['contact_address'],
- data['qualification_number'],
- data['business_license_company'],
- data['business_license_address'],
- data['scrape_date'],
- data['platform'],
- )
- return self.execute_db_write(add_sql, params, "保存店铺数据到数据库")
- def swipe_back(self, no):
- """
- 返回
- :param no: 回退次数
- :return:
- """
- for idx in range(no):
- self.d.press('back')
- time.sleep(self.get_sleep_time())
- def drug_price(self):
- """
- 获取药品价格
- :return:
- """
- try:
- price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text
- price = float(re.search(r'[\d\.]+', price_str).group())
- print(f'获取到价格:{price}')
- return price
- except Exception as e:
- print(f'提取价格出错-->{e}')
- return None
- def drug_sale_num(self):
- """
- 获取药品销量
- :return:
- """
- try:
- sales_element = self.d.xpath('//*[starts-with(@text,"已售")]')
- if sales_element.exists:
- sales_num_str = self.d.xpath('//*[starts-with(@text,"已售")]').text
- sales_num_str = sales_num_str.replace("已售", "").strip()
- # price = float(re.search(r'[\d\.]+', price_str).group())
- print(f'获取到已售数量:{sales_num_str}')
- return sales_num_str
- return None
- except Exception as e:
- print(f'提取已售数量出错-->{e}')
- return None
- def restart_uiautomator_services(self, device_id):
- """
- 重启atx的uiautomator 服务
- :param device_id:
- :return:
- """
- stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop'
- start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d'
- subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- def reconnect_device(self):
- """重启 atx-agent 并重新连接设备"""
- try:
- # 停止 atx-agent
- subprocess.run(["adb", "-s", self.device_id, "shell",
- "/data/local/tmp/atx-agent", "server", "-d", "--stop"],
- capture_output=True, timeout=5)
- time.sleep(1)
- # 启动 atx-agent
- subprocess.run(["adb", "-s", self.device_id, "shell",
- "/data/local/tmp/atx-agent", "server", "-d"],
- capture_output=True, timeout=5)
- time.sleep(2)
- # 重新连接 uiautomator2
- self.d = u2.connect_usb(self.device_id)
- self.restart_uiautomator_services(self.device_id)
- self.loggerMT.info("设备重连成功")
- return True
- except Exception as e:
- self.loggerMT.error(f"设备重连失败: {e}")
- return False
- def connect_devices(self, device_id):
- """
- 连接设备
- :return:
- """
- try:
- self.device_id = device_id
- self.d = u2.connect_usb(device_id)
- self.restart_uiautomator_services(device_id)
- self.oss_config = {
- "access_key_id": 'LTAI5tDwjfteBvivYN41r8sJ',
- "access_key_secret": 'yowuOGi2nYYnrqGpO3qcz94C4brcPp',
- "endpoint": "oss-cn-shenzhen.aliyuncs.com", # 例:oss-cn-beijing.aliyuncs.com
- "bucket_name": "zhijiayun-jiansuo",
- "oss_prefix": "scrape_data/" # OSS中存放截图的前缀(虚拟文件夹)
- }
- print(f'连接到设备:{device_id}')
- self.loggerMT.info(f'连接到设备:{device_id}')
- except Exception as e:
- print(f'{device_id} 连接错误: {e}')
- self.loggerMT.info(f'{device_id} 连接错误: {e}')
- raise Exception(e)
- def get_ocr_res(self, img):
- try:
- # img地址
- print(f'开始识别图片:{img}')
- request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license"
- f = open(img, 'rb')
- img = base64.b64encode(f.read())
- params = {"image": img}
- request_url = request_url + "?access_token=" + self.access_token
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- response = requests.post(request_url, data=params, headers=headers)
- if response:
- res = response.json()
- new_dic = dict()
- for ite in res['words_result'].keys():
- new_dic[ite] = res['words_result'][ite]['words']
- print('资质数据信息', new_dic)
- return new_dic
- else:
- return None
- except:
- return None
- def remove_watermark(self, img_path):
- """
- 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据
- :param img_path: 图片路径
- :return: 二进制图片数据
- """
- img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
- endswith = os.path.splitext(img_path)[1]
- new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8)
- _, img_binary = cv2.imencode(endswith, new)
- return img_binary
- def get_ocr_res_image(self, img):
- try:
- image = self.remove_watermark(img)
- # image_file = open(img,'wb')
- # image_file.write(image)
- # res_image = self.client.basicAccurate(image) # 高精度
- res_image = self.client.basicGeneral(image)
- data = res_image.get('words_result', '')
- print(f'百度api返回结果:{data}')
- return data
- except:
- return None
- def screenshot_the_business_license(self, qualification_number):
- screenshot_path = 'screenshot1.png'
- self.d.screenshot(screenshot_path)
- img = cv2.imread(screenshot_path)
- # 指定裁剪区域 (left, top, right, bottom)
- left = 0
- top = 480
- right = 720
- bottom = 1420
- cropped_img = img[top:bottom, left:right]
- # 创建目录
- SCREENSHOT_DIR = Path('screenshot') # 注意这里的变化和py文件同一级目录即可
- SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
- if qualification_number:
- # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- cropped_screenshot_path = SCREENSHOT_DIR / f'{qualification_number}.png'
- else:
- cropped_screenshot_path = 'cropped_screenshot.png'
- cv2.imwrite(cropped_screenshot_path, cropped_img)
- return cropped_screenshot_path
- def screenshot_instruction(self):
- # 获取当前时间
- current_time = datetime.datetime.now()
- # 格式化为时分秒
- time_str = current_time.strftime("%H-%M-%S")
- # 生成随机的 8 位字符串
- random_str = secrets.token_hex(4) # 生成 4 个字节的随机字符串,转换为 8 位十六进制字符串
- print(time_str)
- screenshot_path = 'instructionscreenshot1-' + time_str + '-' + random_str + '.png'
- self.d.screenshot(screenshot_path)
- return screenshot_path
- def extract_specification(self, text):
- """提取药品规格信息"""
- # 方法1:简单去除到期信息
- pattern = r'^[^【]+'
- match = re.search(pattern, text)
- if match:
- return match.group(0).strip()
- return text
- # 获取商品title
- def get_title(self):
- def _inner():
- print(f'获取商品title时的搜索关键字:{self.title_key}')
- # 初始化
- drugs_name = ''
- specifications = ''
- title = ''
- # 循环的获取title为了有时间来处理人机验证
- for m in range(1, 6000):
- if self.d.xpath(f'//*[contains(@text, "{self.title_key}")]').exists:
- title = self.safe_exec(
- lambda: self.d.xpath(f'//*[contains(@text, "{self.title_key}")]').text
- )
- print(f"第{m}次获取title成功")
- break
- else:
- time.sleep(3)
- # return drugs_name, specifications
- title = title[1:] if title.startswith('0') else title
- print(f'获取到药品标题:{title}')
- # match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title)
- match = re.match(r'^(?:0?)?(?:\[([^\]]+)\])?\s*(.*?)\s*(\d+[^\s]+)$', title)
- if match:
- # drugs_name = match.group(1).strip() + match.group(2).strip()
- drugs_name = title
- specifications = match.group(3).strip()
- print("药品名:", drugs_name)
- print("规格:", specifications)
- # 如果品规中包含到期则需要再次的正则处理
- if '到期' in specifications:
- specifications = self.extract_specification(specifications)
- # print('完整药名:', drugs_name + specifications)
- return drugs_name, specifications
- else:
- print("没有匹配到预期格式")
- drugs_name = title
- specifications = ''
- return drugs_name, specifications
- # 用 safe_exec 包装内部逻辑,确保验证码阻塞
- return self.safe_exec(_inner)
- def enter_shop(self):
- """
- 进店,方便提取资质环境
- :return:
- """
- # self.d.xpath('//*[@text="进店"]').click()
- self.d.xpath('//*[@text="店铺"]').click()
- time.sleep(self.get_sleep_time())
- def enter_shoper(self):
- """
- 进入商家
- :return:
- """
- is_shoper_exists = 0
- for i in range(10):
- if self.d.xpath('//*[@text="商家"]').exists:
- print(f'第{i}次商家存在')
- is_shoper_exists = 1
- break
- else:
- print(f'第{i}次商家不存在')
- time.sleep(self.get_sleep_time())
- if is_shoper_exists == 1:
- self.d.xpath('//*[@text="商家"]').click()
- time.sleep(self.get_sleep_time())
- return True
- else:
- return False
- # 点击查看商家资质
- def scan_shoper_license(self):
- exist_shoper = 0
- for i in range(10):
- if self.d.xpath('//*[@text="查看商家资质"]').exists:
- print(f'第{i}次查看商家资质存在')
- exist_shoper = 1
- break
- else:
- print(f'第{i}次查看商家资质不存在')
- time.sleep(self.get_sleep_time())
- if exist_shoper == 1:
- self.d.xpath('//*[@text="查看商家资质"]').click()
- time.sleep(self.get_sleep_time())
- else:
- self.swipe_back(1)
- # 验证商品的信息是否在数据库中已存在
- def data_is_exists(self, data):
- """
- 检查指定数据是否已存在于数据库表中(仅检查存在性)
- 参数:
- data: 包含查询条件的字典,键为列名,值为条件值
- 返回:
- True: 数据存在
- False: 数据不存在
- None: 检查过程中出错
- """
- required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform']
- if not all(key in data for key in required_keys):
- missing = [key for key in required_keys if key not in data]
- logging.error(f"缺少必要字段: {', '.join(missing)}")
- return None
- query_sql = f"""
- SELECT 1 FROM {self.table_name}
- WHERE product_name = %s
- AND min_price = %s
- AND store_name = %s
- AND scrape_date = %s
- AND platform_id = %s
- LIMIT 1
- """
- params = (
- data['product'],
- data['min_price'],
- data['shop'],
- data['scrape_date'],
- data['platform']
- )
- return self.query_exists(query_sql, params, "商品查重")
- # 验证店铺信息是否在数据库中已存在
- def shop_is_exists_database(self, shop):
- query_sql = f"""
- SELECT 1 FROM {self.shop_table_name}
- WHERE shop = %s
- LIMIT 1
- """
- return self.query_exists(query_sql, (shop,), "店铺查重")
- def wait_if_verifying(self, monitor, timeout=120):
- """验证码处理期间阻塞主线程"""
- start = time.time()
- while monitor.pausing.is_set() and time.time() - start < timeout:
- time.sleep(1)
- def wait_for_ready(self, monitor, timeout=86400):
- """进入每一页前都先等验证码"""
- start = time.time()
- while monitor.pausing.is_set() and time.time() - start < timeout:
- time.sleep(1)
- # 额外保险:如果验证码突然在这一秒才弹,再主动扫一次
- monitor.check_and_handle_popup()
- def safe_list(self, xpath, monitor):
- """线程安全地拿商品列表"""
- self.wait_for_ready(monitor)
- return self.d.xpath(xpath).all()
- def safe_exec(self, func, *args, **kwargs):
- """
- 万能安全壳:执行 func 前检查验证码,
- 若监控线程已置位 pausing,则一直阻塞直到放行。
- """
- while self.monitor.pausing.is_set():
- time.sleep(1)
- max_retries = 3
- for attempt in range(max_retries):
- try:
- return func(*args, **kwargs)
- except http.client.RemoteDisconnected as e:
- self.loggerMT.error(f"连接断开 (尝试 {attempt + 1}/{max_retries}): {e}")
- if attempt == max_retries - 1:
- raise # 最后一次失败,向上抛出
- # 尝试重连
- if self.reconnect_device():
- self.loggerMT.info("重连成功,准备重试...")
- time.sleep(2) # 等待设备稳定
- continue
- else:
- self.loggerMT.error("重连失败,无法继续")
- raise
- except Exception as e:
- # 其他异常直接抛出
- raise
- # 执行真正逻辑
- return func(*args, **kwargs)
- def get_next_data(self, data, target):
- for i, item in enumerate(data):
- if item['words'] == target:
- if i + 1 < len(data):
- return data[i + 1]['words']
- return None
- def delete_instruction_screenshot(self, screenshot_path):
- # 删除截图文件
- try:
- os.remove(screenshot_path)
- print(f"截图文件已删除:{screenshot_path}")
- except FileNotFoundError:
- print(f"文件未找到,无法删除:{screenshot_path}")
- except Exception as e:
- print(f"删除文件时出错:{e}")
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- if self.d.xpath('//*[@text="查看详细说明"]').exists:
- self.d.xpath('//*[@text="查看详细说明"]').click()
- else:
- view_all_xpath = self.find_xpath_with_swipes(
- ['//*[@text="查看全部"]'],
- swipe_direction='down',
- swipe_scale=0.3,
- max_swipes=8,
- found_desc='查看全部'
- )
- if view_all_xpath:
- self.d.xpath(view_all_xpath).click()
- else:
- res_data = {
- "有效期": '',
- "生产单位": '',
- "批准文号": ''
- }
- self.loggerMT.info('获取到的说明书信息为空。')
- return res_data
- time.sleep(0.5)
- for ii in range(8):
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- time.sleep(0.2)
- break
- else:
- self.d.swipe(200, 1000, 200, 300, 0.3)
- # self.d.swipe_ext("up", scale=0.3)
- for iii in range(10):
- if self.d.xpath('//*[@text="生产单位"]').exists and self.d.xpath('//*[@text="批准文号"]').exists:
- break
- else:
- self.d.swipe(200, 1300, 200, 300, 0.3)
- # self.d.swipe_ext("up", scale=0.3)
- instruction_path = self.screenshot_instruction()
- print(f"instruction_path= {instruction_path}")
- time.sleep(2)
- ocr_res = self.get_ocr_res_image(instruction_path)
- # print(f'ocr_res:{ocr_res}')
- if ocr_res:
- # 获取有效期的下一个数据
- validity = self.get_next_data(ocr_res, '有效期')
- # 获取批准文号的下一个数据
- approval_number = self.get_next_data(ocr_res, '批准文号')
- # 获取生产单位的下一个数据
- manufacturer = self.get_next_data(ocr_res, '生产单位')
- else:
- validity = ''
- approval_number = ''
- manufacturer = ''
- res_data = {
- "有效期": validity,
- "生产单位": manufacturer,
- "批准文号": approval_number
- }
- print(f"res_data={res_data}")
- time.sleep(1)
- self.delete_instruction_screenshot(instruction_path)
- return res_data
- def has_instructions(self):
- """
- 是否有说明书
- :return:
- """
- # 没有说明书的无法采集具体数据
- time.sleep(self.get_sleep_time())
- return bool(self.find_xpath_with_swipes(
- ['//*[@text="说明"]'],
- swipe_direction='down',
- swipe_scale=0.3,
- max_swipes=8,
- found_desc='说明'
- ))
- def has_shop(self):
- """
- 是否有进店按钮
- :return:
- """
- # self.d.swipe_ext('up', 0.1)
- time.sleep(self.get_sleep_time())
- is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists
- return is_has_enter_shop
- # 获取商品对应的店铺信息
- def get_license_info_ex(self):
- # self.enter_shop()
- self.safe_exec(self.enter_shop)
- # self.enter_shoper()
- result = self.safe_exec(self.enter_shoper)
- if result == False:
- license_info_data = {'contact_address': '', 'qualification_number': '', 'business_license_company': '',
- 'business_license_address': ''}
- return license_info_data
- for i in range(10):
- if self.d.xpath('//*[@text="查看商家资质"]').exists:
- print(f"第{i}次有商家资质")
- break
- else:
- print(f"第{i}次没有商家资质")
- time.sleep(self.get_sleep_time())
- # 获取地址
- # contact_address = self.get_shop_address()
- contact_address = self.safe_exec(self.get_shop_address)
- # time.sleep(50000)
- ###
- # self.scan_shoper_license()
- self.safe_exec(self.scan_shoper_license)
- # 获取资质编码
- # qualification_number = self.get_qualification_number()
- qualification_number = self.safe_exec(self.get_qualification_number)
- # qualification_number 不为None继续下一步
- if qualification_number:
- # 营业执照公司名称
- business_license_company = ''
- # 营业执照地址
- business_license_address = ''
- self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- cropped_screenshot_path = self.screenshot_the_business_license(qualification_number)
- print(f'cropped_screenshot_path:{cropped_screenshot_path}')
- # if qualification_number:
- # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- # else:
- # cropped_screenshot_path = 'cropped_screenshot.png'
- # ocr_res = self.get_ocr_res('cropped_screenshot.png')
- ocr_res = self.get_ocr_res(cropped_screenshot_path)
- print(f'ocr_res:{ocr_res}')
- # 获取ocr_res 中的地址、单位名称
- if ocr_res:
- if '单位名称' in ocr_res.keys():
- business_license_company = ocr_res['单位名称']
- if '地址' in ocr_res.keys():
- business_license_address = ocr_res['地址']
- license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number,
- 'business_license_company': business_license_company,
- 'business_license_address': business_license_address}
- else:
- license_info_data = {'contact_address': contact_address, 'qualification_number': '',
- 'business_license_company': '', 'business_license_address': ''}
- return license_info_data
- def distinct_target(self):
- result = False
- position_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- is_position = self.d.xpath(position_xpath).exists
- is_position2 = self.d.xpath(position_xpath2).exists
- xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- is_position5 = self.d.xpath(xpath).exists
- is_position6 = self.d.xpath(xpath2).exists
- is_position7 = self.d.xpath(xpath3).exists
- is_position8 = self.d.xpath(xpath4).exists
- # print(f"is_position = {is_position}")
- # print(f"is_position2 = {is_position2}")
- if is_position or is_position2 or is_position5 or is_position6 or is_position7 or is_position8:
- result = True
- if result == False:
- print("---检测没有回到列表页---")
- else:
- print("---检测回到了列表页---")
- return result
- # return is_position
- def enter_target_page(self):
- self.d.xpath('//*[@content-desc="看病买药"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click()
- time.sleep(1)
- self.d.send_keys(self.search_key, clear=True)
- time.sleep(1)
- self.d.xpath('//*[@text="搜索"]').click()
- time.sleep(1)
- self.click_express_send()
- time.sleep(1)
- self.wr_re("读", self.device_id)
- def click_express_send(self):
- # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.HorizontalScrollView[last()]'
- slide_xpaths = [
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]',
- ]
- for i in range(1, 3):
- matched_slide_xpath = self.get_first_existing_xpath(slide_xpaths)
- if not matched_slide_xpath:
- time.sleep(self.get_sleep_time())
- continue
- bounds = self.d.xpath(matched_slide_xpath).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info(f'开始滑动{i}')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- express_send_xpaths = [
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/androidx.recyclerview.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]',
- ]
- self.click_candidate_xpaths(
- express_send_xpaths,
- action_desc="点击快递送",
- max_retries=5,
- sleep_after=self.get_sleep_time(),
- )
- def get_clipboard(self):
- time.sleep(1)
- self.loggerMT.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息
- clipboard_content = self.d.clipboard
- if clipboard_content is None:
- return ''
- return clipboard_content.strip()
- # return self.d.clipboard.strip()
- def clear_clipboard(self):
- self.d.set_clipboard("", "text/plain")
- def get_product_link(self):
- product_link = ''
- dots_xpaths = [
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]'
- ]
- max_retry = 5 # 最多尝试次数
- for idx in range(1, max_retry + 1):
- if product_link: # 已经拿到则退出
- break
- clicked_xpath = self.click_candidate_xpaths(
- dots_xpaths,
- action_desc=f'{idx}-进入分享点点点',
- max_retries=1,
- sleep_after=0.2,
- )
- if not clicked_xpath:
- print(f'{idx}-分享入口点击失败')
- time.sleep(self.get_sleep_time())
- continue
- self.loggerMT.info(f'{idx}-click_exists')
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- link_xpath = '//*[@text="复制链接"]'
- if self.d.xpath(link_xpath).exists:
- self.loggerMT.info(f'{idx}-link_xpath click')
- self.d.xpath(link_xpath).click()
- time.sleep(1)
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'{idx}-商品链接:{product_link}')
- self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- break
- print(f'{idx}-商品链接:{product_link}')
- self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- product_link = ''
- if not product_link and idx < max_retry:
- time.sleep(0.5) # 最后一次不需要再等待
- return product_link
- def run_parallel_tasks(self, task_map):
- """
- 并行执行相互独立的只读任务。
- 任务本身不能包含点击、滑动、返回等会改变页面状态的操作。
- """
- if not task_map:
- return {}
- results = {}
- with ThreadPoolExecutor(max_workers=len(task_map)) as executor:
- future_map = {
- task_name: executor.submit(self.safe_exec, task_func)
- for task_name, task_func in task_map.items()
- }
- for task_name, future in future_map.items():
- try:
- results[task_name] = future.result()
- except Exception as e:
- print(f'并行采集任务 {task_name} 执行失败: {e}')
- results[task_name] = None
- return results
- def get_available_xpaths(self, xpaths):
- check_tasks = {
- f'xpath_{idx}': (lambda xp=xp: self.d.xpath(xp).exists)
- for idx, xp in enumerate(xpaths)
- }
- exists_results = self.run_parallel_tasks(check_tasks)
- return [
- xpath for idx, xpath in enumerate(xpaths)
- if exists_results.get(f'xpath_{idx}')
- ]
- def get_first_existing_xpath(self, xpaths):
- available_xpaths = self.get_available_xpaths(xpaths)
- if not available_xpaths:
- return None
- return available_xpaths[0]
- def get_first_text_by_xpaths(self, xpaths):
- text_tasks = {
- f'xpath_{idx}': (lambda xp=xp: self._read_xpath_text(xp))
- for idx, xp in enumerate(xpaths)
- }
- text_results = self.run_parallel_tasks(text_tasks)
- for idx, _ in enumerate(xpaths):
- text = text_results.get(f'xpath_{idx}')
- if text:
- return text
- return ''
- def get_first_texts_by_xpath_groups(self, xpath_groups):
- tasks = {}
- group_keys = {}
- for group_name, xpaths in xpath_groups.items():
- group_keys[group_name] = []
- for idx, xpath in enumerate(xpaths):
- task_name = f'{group_name}_{idx}'
- group_keys[group_name].append(task_name)
- tasks[task_name] = (lambda xp=xpath: self._read_xpath_text(xp))
- text_results = self.run_parallel_tasks(tasks)
- grouped_results = {}
- for group_name, task_names in group_keys.items():
- grouped_results[group_name] = ''
- for task_name in task_names:
- text = text_results.get(task_name)
- if text:
- grouped_results[group_name] = text
- break
- return grouped_results
- def _read_xpath_text(self, xpath):
- selector = self.d.xpath(xpath)
- if not selector.exists:
- return ''
- try:
- text = selector.text
- return text.strip() if isinstance(text, str) else text
- except Exception:
- return ''
- def click_candidate_xpaths(self, xpaths, action_desc, max_retries=1, sleep_after=0):
- for attempt in range(1, max_retries + 1):
- available_xpaths = self.get_available_xpaths(xpaths)
- if not available_xpaths:
- print(f'{action_desc}失败,第{attempt}次没有匹配到可点击的xpath')
- time.sleep(self.get_sleep_time())
- continue
- rotate_offset = (attempt - 1) % len(available_xpaths)
- candidate_xpaths = available_xpaths[rotate_offset:] + available_xpaths[:rotate_offset]
- for xpath in candidate_xpaths:
- try:
- self.safe_exec(lambda xp=xpath: self.d.xpath(xp).click())
- print(f'{action_desc}成功')
- if sleep_after:
- time.sleep(sleep_after)
- return xpath
- except Exception as e:
- print(f'{action_desc}点击异常: {e}')
- time.sleep(self.get_sleep_time())
- return None
- def find_xpath_with_swipes(self, xpaths, swipe_direction='down', swipe_scale=0.3, max_swipes=8, found_desc=''):
- for idx in range(max_swipes):
- matched_xpath = self.get_first_existing_xpath(xpaths)
- if matched_xpath:
- if found_desc:
- print(f'第{idx}次找到{found_desc}')
- return matched_xpath
- self.d.swipe_ext(swipe_direction, swipe_scale)
- time.sleep(1)
- matched_xpath = self.get_first_existing_xpath(xpaths)
- if matched_xpath and found_desc:
- print(f'第{max_swipes}次找到{found_desc}')
- return matched_xpath
- def build_dup_data(self, product, min_price, shop, scrape_date):
- return {
- 'product': product,
- 'min_price': min_price,
- 'shop': shop,
- 'scrape_date': scrape_date,
- 'platform': '4',
- }
- def integrate_data(self):
- """
- 整合数据
- :return:
- """
- # title_info = self.get_title() # 药品,规格
- # title_info = self.safe_exec(self.get_title) # 药品,规格
- product, specifications = self.safe_exec(self.get_title) # 药品,规格
- if not product:
- self.swipe_back(1)
- return
- page_data = self.run_parallel_tasks({
- "min_price": self.drug_price,
- "sales_num": self.drug_sale_num,
- "is_self_operated": lambda: self.d.xpath('//*[@text="自营"]').exists,
- })
- min_price = page_data.get("min_price") # 最低价格
- sales_num = page_data.get("sales_num") # 销售数量
- product_link = ''
- if page_data.get("is_self_operated"):
- shop = "美团自营大药房(快递电商)"
- # 爬取日期
- scrape_date = self.get_current_date()
- dup_data = self.build_dup_data(product, min_price, shop, scrape_date)
- print(f'当前数据:{dup_data}')
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.back_to_list_page()
- return
- else:
- self.find_xpath_with_swipes(
- ['//*[@text="进店"]'],
- swipe_direction='up',
- swipe_scale=0.3,
- max_swipes=8,
- found_desc='进店'
- )
- shop_page_data = self.run_parallel_tasks({
- "shop": self.get_shop_name_from_current_page,
- "is_has_enter_shop": self.has_shop,
- })
- shop = shop_page_data.get("shop")
- if not shop:
- shop = self.get_shop_name()
- # 爬取日期
- scrape_date = self.get_current_date()
- dup_data = self.build_dup_data(product, min_price, shop, scrape_date)
- print(f'当前数据:{dup_data}')
- if not shop:
- print('未获取到店铺名:开始回退')
- self.back_to_list_page()
- return
- if '自营' in shop:
- self.back_to_list_page()
- return
- db_check_results = self.run_parallel_tasks({
- "dup_exists": lambda: self.data_is_exists(dup_data),
- "shop_exists": lambda: self.shop_is_exists_database(shop),
- })
- if db_check_results.get("dup_exists"):
- print('存在相同数据不入库')
- self.back_to_list_page()
- return
- # 获取店铺信息开始
- is_has_enter_shop = bool(shop_page_data.get("is_has_enter_shop"))
- # 需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品
- shop_is_exists = bool(db_check_results.get("shop_exists"))
- # 存在进店 并且店铺的名称不包含美团官方的字样
- print(f"已采集{self.shop_data_num}家店铺数据")
- if is_has_enter_shop and '美团官方' not in shop and '美团自营' not in shop and not shop_is_exists and self.shop_data_num < 500:
- license_info = self.safe_exec(self.get_license_info_ex)
- contact_address = license_info['contact_address']
- qualification_number = license_info['qualification_number']
- business_license_company = license_info['business_license_company']
- business_license_address = license_info['business_license_address']
- save_shop_data = {
- 'shop': shop,
- 'contact_address': contact_address,
- 'qualification_number': qualification_number,
- 'scrape_date': scrape_date,
- 'business_license_company': business_license_company,
- 'business_license_address': business_license_address,
- 'platform': '4'
- }
- self.save_shop_info_to_database(save_shop_data)
- self.shop_data_num += 1
- self.swipe_back(2)
- else:
- print('不采集店铺信息')
- # 获取店铺信息结束
- # 商品链接
- product_link = self.get_product_link()
- print(f'获取到product_link: {product_link}')
- time.sleep(self.get_sleep_time())
- # 生产日期为空
- manufacture_date = ''
- expiry_date = ''
- manufacturer = ''
- approval_number = ''
- # 暂时不获取说明书信息 start
- is_has_instructions = self.safe_exec(self.has_instructions)
- # 说明书等信息
- if is_has_instructions:
- print('开始获取说明书信息')
- instructions_info = self.safe_exec(self.get_instructions_data)
- if instructions_info['有效期'] is not None:
- expiry_date = instructions_info['有效期'].strip('。')
- if instructions_info['生产单位'] is not None:
- manufacturer = instructions_info['生产单位'].strip('。')
- if instructions_info['批准文号'] is not None:
- approval_number = instructions_info['批准文号'].strip('。')
- else:
- expiry_date = "未知"
- manufacturer = None
- approval_number = None
- province_id = 0
- city_id = 0
- city = ''
- province = ''
- if province in self.city_to_name:
- province_id = self.city_to_name[province]
- if city in self.city_to_name:
- city_id = self.city_to_name[city]
- save_data = {
- 'enterprise_id': 3,
- 'platform_id': 4,
- 'platform_item_id': '',
- 'province_id': province_id,
- 'city_id': city_id,
- 'province_name': '',
- 'city_name': '',
- 'area_info': "",
- 'product_name': product,
- 'product_specs': specifications,
- 'one_box_price': 0.00,
- 'manufacture_date': manufacture_date,
- 'expiry_date': expiry_date,
- 'manufacturer': manufacturer,
- 'approval_number': approval_number,
- 'is_sold_out': 0,
- 'online_posting_count': 1,
- 'continuous_listing_count': 1,
- 'link_url': product_link,
- 'store_name': shop,
- 'store_url': '',
- 'shipment_province_id': 0,
- 'shipment_province_name': "",
- 'shipment_city_id': 0,
- 'shipment_city_name': "",
- 'company_name': "",
- 'qualification_number': "",
- 'scrape_date': scrape_date,
- 'min_price': min_price,
- 'number': 0,
- 'sales': sales_num,
- 'inventory': "",
- 'snapshot_url': "",
- 'insert_time': time.strftime('%Y-%m-%d %H:%M:%S'),
- 'update_time': time.strftime('%Y-%m-%d %H:%M:%S'),
- }
- self.save_to_database(save_data)
- def back_to_list_page(self):
- for i in range(5):
- if self.distinct_target():
- return True
- print(f'第{i}次尝试退回到列表页')
- self.swipe_back(1)
- time.sleep(self.get_sleep_time())
- print('页面出错,没有退回到列表页')
- return False
- def reset_collection_cursor(self):
- self.collection_cursor["page_no"] = 1
- self.collection_cursor["item_index"] = 0
- def get_current_page_no(self):
- return self.page + self.collection_cursor["page_no"]
- def jump_to_page(self, target_page):
- current_page = self.get_current_page_no()
- if target_page <= current_page:
- return
- while current_page < target_page:
- if self.d.xpath('//*[@text="已经到底啦"]').exists:
- print(f"列表实际页数不足,当前停留在第{current_page}页,无法跳转到第{target_page}页")
- return
- print(f"跳过第{current_page}页,前往第{current_page + 1}页")
- self.d.drag(300, 1400, 300, 400, 1)
- time.sleep(1)
- self.collection_cursor["page_no"] += 1
- self.collection_cursor["item_index"] = 0
- current_page = self.get_current_page_no()
- def move_to_page_range_start(self):
- if not self.page_range:
- return
- start_page = self.page_range["start"]
- current_page = self.get_current_page_no()
- if current_page < start_page:
- self.jump_to_page(start_page)
- def start_collection_app(self):
- self.sort_key = 0
- self.restart_app()
- def open_product_list_page(self):
- self.safe_exec(self.enter_target_page)
- self.reset_collection_cursor()
- if self.sort and self.sort_key == 0:
- self.li_or_lo(self.sort)
- self.move_to_page_range_start()
- def handle_workflow_error(self, step_name):
- action = self.workflow_error_action.get(step_name)
- if action == "back_to_list_page":
- if not self.back_to_list_page():
- raise RuntimeError("退回列表页失败")
- return "collect_single_product"
- if action == "open_product_list_page":
- return "open_product_list_page"
- if action == "start_app":
- return "start_app"
- raise RuntimeError(f"未配置步骤 {step_name} 的错误处理动作")
- def get_list_items(self):
- for _ in range(10):
- items = self.safe_exec(
- self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all
- )
- if items:
- return items
- time.sleep(1)
- raise RuntimeError("列表页商品加载失败")
- def move_to_next_list_page(self):
- current_page = self.get_current_page_no()
- if self.page_range and current_page >= self.page_range["end"]:
- self.wr_re("写", self.device_id, self.sort, current_page)
- print(f'已完成第{current_page}页采集,达到结束页{self.page_range["end"]},停止采集')
- return False
- if self.d.xpath('//*[@text="已经到底啦"]').exists:
- return False
- self.wr_re("写", self.device_id, self.sort, current_page)
- print(f'当前第{current_page}页采集完成,开始滑动到下一页')
- self.d.drag(300, 1400, 300, 400, 1)
- time.sleep(1)
- self.collection_cursor["page_no"] += 1
- self.collection_cursor["item_index"] = 0
- return True
- def _collect_list_item(self, drug_idx, drug_one):
- bounds = drug_one.info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'当前商品bottom:{bottom}')
- print(f'当前商品top:{top}')
- if not (304 <= top and bottom <= 1475):
- return "skip"
- print(f"这页的第几个商品:{drug_idx}")
- item_text_data = self.get_first_texts_by_xpath_groups({
- "product_title": [
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView',
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView',
- ],
- "price_str": [
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView',
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView',
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView',
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView',
- ],
- "shop_name": [
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]',
- f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{drug_idx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]',
- ],
- })
- product_title = item_text_data.get("product_title", "")
- if not product_title:
- print("列表当前商品名称不存在")
- self.unrelated_data += 1
- return "continue"
- product_title = product_title[1:] if product_title.startswith('0') else product_title
- print(f"列表当前商品名称:{product_title}")
- if not self.is_link_useful(product_title):
- print(f"is_link_useful 没通过:{product_title}")
- self.unrelated_data += 1
- return "continue"
- self.unrelated_data = 0
- price = ''
- price_str = item_text_data.get("price_str", "")
- print(f"列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search(r'[\d\.]+', price_str).group())
- if price != '' and self.collect_range:
- range_start = self.collect_range["start"]
- range_end = self.collect_range["end"]
- if not (range_start <= price <= range_end):
- print(f"price {price} not in range {range_start}-{range_end}, skip")
- return "continue"
- shop_name = item_text_data.get("shop_name", "")
- print(f"列表当前商品店铺名称:{shop_name}")
- if price == '' or shop_name == '':
- print("列表当前商品价格或店铺名称不存在")
- return "continue"
- scrape_date = self.get_current_date()
- dup_data = {
- 'product': product_title,
- 'min_price': price,
- 'shop': shop_name,
- 'scrape_date': scrape_date,
- 'platform': '4',
- }
- if self.data_is_exists(dup_data):
- print('列表存在相同数据不入库')
- return "continue"
- self.safe_exec(drug_one.click)
- print('点击目标药品完毕')
- time.sleep(1)
- self.safe_exec(self.integrate_data)
- print('integrate_data结束')
- time.sleep(1)
- return "collected"
- def collect_single_product(self):
- if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY:
- raise RuntimeError("验证码触发过多,暂停程序")
- if self.page_range:
- self.move_to_page_range_start()
- current_page = self.get_current_page_no()
- if current_page > self.page_range["end"]:
- print(f"当前已在第{current_page}页,超过结束页{self.page_range['end']},停止采集")
- return False
- items = self.get_list_items()
- print(f'当前第{self.get_current_page_no()}页,共有{len(items)}个商品')
- while self.collection_cursor["item_index"] < len(items):
- item_index = self.collection_cursor["item_index"]
- self.collection_cursor["item_index"] += 1
- result = self._collect_list_item(item_index + 1, items[item_index])
- if result in {"continue", "collected"}:
- self.back_to_list_page()
- return True
- if not self.move_to_next_list_page():
- print('已经到达列表页最底部')
- return False
- return True
- def execute_workflow_step(self, step_name):
- if step_name == "start_app":
- self.safe_exec(self.start_collection_app)
- return "open_product_list_page"
- if step_name == "open_product_list_page":
- self.safe_exec(self.open_product_list_page)
- return "collect_single_product"
- if step_name == "collect_single_product":
- has_next = self.safe_exec(self.collect_single_product)
- if not has_next:
- return None
- print('目前连续无关数据量: ', self.unrelated_data)
- if self.unrelated_data > self.max_unrelated_data:
- print(f"连续超过{self.max_unrelated_data}个不达标的数据则停止采集")
- return None
- return "collect_single_product"
- raise RuntimeError(f"未知流程步骤: {step_name}")
- def main(self, device_id):
- self.device_id = device_id
- self.connect_devices(device_id)
- time.sleep(self.get_sleep_time())
- self.monitor = SpiderMonitor(self)
- self.monitor.start()
- current_step = "start_app"
- step_failures = {step: 0 for step in self.workflow_retry_limit}
- try:
- while current_step:
- try:
- next_step = self.execute_workflow_step(current_step)
- step_failures[current_step] = 0
- current_step = next_step
- except Exception as e:
- print(f'{current_step} 执行异常: {e}')
- time.sleep(5)
- step_failures[current_step] += 1
- if step_failures[current_step] > self.workflow_retry_limit[current_step]:
- raise
- current_step = self.handle_workflow_error(current_step)
- finally:
- self.monitor.stop()
- self.monitor.join()
- device_list = {
- "U8ONIJJJS4CELVD6": [
- {
- "search_key": "喇叭牌正露丸 100粒",
- "title_key": "正露丸",
- "spec_list": [""],
- "brand": "喇叭牌",
- "sort": "升序",
- "collect_range": [],
- "page_range": [],
- "workflow_retry_limit": {
- "start_app": 3,
- "open_product_list_page": 3,
- "collect_single_product": 3,
- },
- "workflow_error_action": {
- "start_app": "start_app",
- "open_product_list_page": "start_app",
- "collect_single_product": "back_to_list_page",
- },
- },
- ],
- }
- def run_device(device_id):
- """单个设备的采集任务(运行于独立线程)"""
- if device_id not in device_list:
- logging.error(f"设备id没有配置: {device_id}")
- return
- tasks = device_list[device_id]
- logging.info(f"[设备 {device_id}] 开始执行,共 {len(tasks)} 个任务")
- for task in tasks:
- cycle_no = 0
- while True:
- cycle_no += 1
- mt = None
- logging.info(f'[设备 {device_id}] ========== {task["search_key"]} 第 {cycle_no} 轮采集开始 ==========')
- try:
- # 注意:MT类需要从外部导入或定义,此处假设已存在
- mt = MT(
- task["search_key"],
- task["title_key"],
- task["spec_list"],
- task["brand"],
- task.get("sort"),
- task.get("collect_range"),
- task.get("page_range"),
- task.get("workflow_retry_limit"),
- task.get("workflow_error_action"),
- )
- mt.main(device_id)
- logging.info(f'[设备 {device_id}] 关键字 {task["search_key"]} 本轮采集完成')
- break # 成功则跳出重试循环
- except Exception as e:
- logging.exception(f'[设备 {device_id}] 关键字 {task["search_key"]} 采集异常:{e}')
- # 发生异常后继续循环重试
- finally:
- if mt and hasattr(mt, 'close'):
- mt.close()
- logging.info(f"[设备 {device_id}] 所有任务执行完毕")
- def main():
- # 配置日志格式,便于区分不同线程的输出
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s [%(threadName)s] %(levelname)s: %(message)s'
- )
- if len(sys.argv) < 2:
- logging.info(f"美团数据库调度器启动,轮询间隔 {SCHEDULER_INTERVAL_SECONDS} 秒")
- dispatch_pending_tasks()
- schedule_dispatch(SCHEDULER_INTERVAL_SECONDS)
- return
- device_ids = sys.argv[1:]
- invalid_ids = [did for did in device_ids if did not in device_list]
- if invalid_ids:
- logging.error(f"以下设备ID未配置: {invalid_ids}")
- return
- logging.info(f"将运行指定的设备: {device_ids}")
- if not device_ids:
- logging.warning("没有可运行的设备")
- return
- with ThreadPoolExecutor(max_workers=len(device_ids)) as executor:
- futures = [executor.submit(run_device, did) for did in device_ids]
- for future in futures:
- future.result()
- if __name__ == '__main__':
- main()
|