| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755 |
- import requests
- import base64
- import cv2
- import uiautomator2 as u2
- import time
- import subprocess
- import re
- import random
- import datetime
- import json
- from aip import AipOcr
- from apscheduler.schedulers.blocking import BlockingScheduler
- # from db_mysql import mysqlClient
- import threading
- from collections import deque
- import numpy as np
- import secrets
- import os
- import oss2
- import urllib.parse
- # import pyperclip
- from config import Config
- from logger import setup_logger
- import logging
- # from database import MySQLClient
- from PIL import Image
- from pathlib import Path
- from PIL import Image, ImageDraw, ImageFont
- # 配置日志
- # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- setup_logger("mt_spider") # 初始化日志
- class SpiderMonitor(threading.Thread):
- """全局弹窗监控线程(增强版)"""
- def __init__(self, spider_instance):
- super().__init__(daemon=True)
- self.spider = spider_instance
- self.running = True
- self.pausing = threading.Event() # 主线程同步事件
- self.last_verification_time = 0
- self.verification_count = 0
- self.MAX_VERIFICATION_RETRY = 10
- self.recent_clicks = deque(maxlen=10) # 防重复点击
- self.logger = logging.getLogger("SpiderMonitor")
- # 可配置化弹窗规则
- self.popup_rules = {
- "simple": [
- ('//*[@text="确定"]', "点击确定"),
- ('//*[@text="允许"]', "点击允许"),
- ('//*[@text="关闭"]', "点击关闭"),
- ('//*[@resource-id="com.sankuai.meituan:id/close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/address_center_location_close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/location_close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/btn_close"]', "关闭按钮"),
- ],
- "verification": [
- '//*[contains(@text, "验证")]',
- '//*[contains(@text, "滑块")]',
- '//*[contains(@text, "依次点击")]',
- '//*[contains(@text, "请点击")]',
- '//*[contains(@text, "拖动滑块刚")]', # 这个需要拖动滑块至最右边,然后再截图
- '//*[contains(@text, "请输入图片中的内容")]',
- '//*[contains(@text, "用最短线连接")]',
- '//*[contains(@text, "请按语序依次点击")]',
- '//*[contains(@text, "请向右滑动滑块")]',
- '//*[contains(@text, "请拖动下方滑块完成拼图")]',
- '//*[contains(@resource-id, "captcha")]'
- ]
- }
- def run(self):
- while self.running:
- try:
- handled = self.check_and_handle_popup()
- time.sleep(2 if handled else 1)
- except Exception as e:
- self.logger.exception("监控线程异常: %s", e)
- time.sleep(3)
- def _is_recent_click(self, xpath):
- """防止重复点击同一个弹窗"""
- key = f"{xpath}_{int(time.time())}"
- if key in self.recent_clicks:
- return True
- self.recent_clicks.append(key)
- return False
- def check_and_handle_popup(self):
- d = self.spider.d
- # 1. 处理简单弹窗
- for xpath, desc in self.popup_rules["simple"]:
- if d.xpath(xpath).exists and not self._is_recent_click(xpath):
- self.logger.info("检测到弹窗: %s", desc)
- d.xpath(xpath).click()
- return True
- # 2. 处理验证码弹窗
- for xpath in self.popup_rules["verification"]:
- if d.xpath(xpath).exists:
- now = time.time()
- if now - self.last_verification_time < 30:
- return False # 30秒内不重复触发
- self.last_verification_time = now
- self.verification_count += 1
- self.logger.warning("验证码弹窗触发,等待人工处理...")
- if self.verification_count > self.MAX_VERIFICATION_RETRY:
- self.logger.error("验证码重试超限,终止任务")
- self.spider.stop_all()
- return True
- self.pausing.set() # 通知主线程暂停
- # d.toast.show("需要人工处理验证码", 120)
- # 等待人工处理
- start = time.time()
- # while time.time() - start < 120*60:
- # if not d.xpath(xpath).exists:
- # self.logger.info("验证码已处理")
- # d.toast.show("验证完成", 2)
- # self.pausing.clear() # 放行主线程
- # return True
- # time.sleep(5)
- while True:
- if not d.xpath(xpath).exists:
- self.logger.info("验证码已处理")
- # d.toast.show("验证完成", 2)
- self.pausing.clear() # 放行主线程
- return True
- time.sleep(5)
- self.logger.warning("验证码超时,重启APP")
- self.spider.restart_app()
- return True
- # 3. 处理广告弹窗(点击右上角)
- if d.xpath('//*[contains(@text, "广告")]').exists:
- w, h = d.info['displayWidth'], d.info['displayHeight']
- d.click(w - 50, 50)
- self.logger.info("关闭广告弹窗")
- return True
- return False
- def stop(self):
- self.running = False
- class MTScreenshot:
- def __init__(self, d, oss_config, search_key, scroll_times=4, compress_quality=7, resize_ratio=0.8):
- # 接收外部已连接好的u2设备实例
- self.d = d
- self.search_key = search_key # 添加这行
- # 启动全局弹窗监控
- self.monitor = SpiderMonitor(self)
- self.monitor.start()
- self.loggerMT = logging.getLogger()
- # 日志初始化
- self.logger = self._init_logger()
- # OSS配置与初始化(核心配置,无冗余)
- self.oss_config = oss_config
- self.oss_bucket = self._init_oss_bucket()
- # 截图核心参数
- self.scroll_times = scroll_times
- self.compress_quality = compress_quality
- self.resize_ratio = resize_ratio
- # self.title_xpaths = [
- # '//*[@resource-id="com.jd.lib.productdetail.feature:id/db"]',
- # '//*[@resource-id="com.jd.lib.productdetail.feature:id/cx"]',
- # '//*[@resource-id="com.jd.lib.productdetail.feature:id/cj"]'
- # ]
- def _init_logger(self):
- # 极简日志配置,仅保留必要输出
- logger = logging.getLogger("mt_screenshot")
- logger.setLevel(logging.INFO)
- logger.handlers.clear()
- handler = logging.StreamHandler()
- handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
- logger.addHandler(handler)
- return logger
- def _init_oss_bucket(self):
- # 仅做OSS配置校验和Bucket连接,无额外功能
- if not all([self.oss_config.get("access_key_id"),
- self.oss_config.get("access_key_secret"),
- self.oss_config.get("endpoint"),
- self.oss_config.get("bucket_name")]):
- self.logger.warning("OSS配置不完整,无法上传")
- return None
- try:
- auth = oss2.Auth(self.oss_config["access_key_id"], self.oss_config["access_key_secret"])
- bucket = oss2.Bucket(auth, self.oss_config["endpoint"], self.oss_config["bucket_name"])
- bucket.get_bucket_info() # 验证连接
- self.logger.info("OSS Bucket连接成功")
- return bucket
- except Exception as e:
- self.logger.error(f"OSS Bucket连接失败: {e}")
- return None
- def _upload_to_oss(self, local_path):
- # 极简上传逻辑,仅返回OSS URL或None
- if not self.oss_bucket or not os.path.exists(local_path):
- return None
- file_name = os.path.basename(local_path)
- safe_name = re.sub(r'[^\w\.\-]', '_', file_name)
- oss_key = f"{self.oss_config.get('oss_prefix', 'scrape_data/')}{safe_name}"
- try:
- oss2.resumable_upload(self.oss_bucket, oss_key, local_path)
- # 生成并返回完整OSS URL
- oss_file_url = f"https://{self.oss_config['bucket_name']}.{self.oss_config['endpoint']}/{urllib.parse.quote(oss_key, safe='/')}"
- self.logger.info(f"OSS上传成功: {oss_file_url}")
- return oss_file_url
- except Exception as e:
- self.logger.error(f"OSS上传失败: {e}")
- return None
- # def _get_title(self):
- # # 仅提取标题,无冗余逻辑
- # for xpath in self.title_xpaths:
- # elem = self.d.xpath(xpath)
- # if elem.exists:
- # info = elem.info
- # title = (info.get("contentDescription") or info.get("content-desc") or info.get("text") or "").strip()
- # return title[:50] # 限制标题长度,避免文件名过长
- # return ""
- def safe_exec(self, func, *args, **kwargs):
- """
- 万能安全壳:执行 func 前检查验证码,
- 若监控线程已置位 pausing,则一直阻塞直到放行。
- """
- while self.monitor.pausing.is_set():
- time.sleep(1)
- # 执行真正逻辑
- return func(*args, **kwargs)
- def _get_title(self):
- # try:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- # except:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text
- # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text
- def _inner():
- temp_search_key = self.search_key
- if self.search_key == '天士力养血清脑丸2.5g*9袋':
- temp_search_key = '养血清脑丸2.5g*9袋'
- # elif self.search_key == '三九胃泰颗粒':
- # self.search_key = '三九胃泰' #兼容三九胃泰 温胃舒颗粒
- print(f'获取商品title时的搜索关键字:{temp_search_key}')
- # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text
- # 初始化
- drugs_name = ''
- specifications = ''
- title = ''
- # 循环的获取title为了有时间来处理人机验证
- for m in range(1, 6000):
- if self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').exists:
- title = self.safe_exec(
- lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text
- )
- self.loggerMT.info(f"第{m}次获取title成功")
- print(f"第{m}次获取title成功")
- break
- else:
- time.sleep(3)
- # return drugs_name, specifications
- # drugs_name = ''
- # specifications = ''
- # try:
- # title_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- # title_xpath_2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- # if self.d.xpath(title_xpath).exists:
- # title = self.d.xpath(title_xpath).text
- # print(f"title_xpath获取的title={title}")
- # if temp_search_key not in title:
- # return drugs_name, specifications
- # elif self.d.xpath(title_xpath_2).exists:
- # title = self.d.xpath(title_xpath_2).text
- # print(f"title_xpath_2获取的title={title}")
- # if temp_search_key not in title:
- # return drugs_name, specifications
- # else:
- # print('title_xpath不存在,请确认')
- # return drugs_name, specifications
- # # title = self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text
- # except Exception as e:
- # print(f"发生异常: {e}")
- # return drugs_name, specifications
- # 奇怪:有的时候title取出来的记过第一位会多一个0
- # title = self.safe_exec(self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text)
- # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- title = title[1:] if title.startswith('0') else title
- print(f'获取到药品标题:{title}')
- # 从里面匹配出药品名和规格
- # drugs_name
- # specifications
- # match = re.search(r'([^\d]+)([\d\D]+)', title)
- if self.search_key == '999赐多康大豆':
- return title, '1罐'
- if self.search_key == "999感冒清热颗粒":
- match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title)
- else:
- match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title)
- if match:
- # drugs_name = match.group(1).strip() + match.group(2).strip()
- drugs_name = title
- specifications = match.group(3).strip()
- print("药品名:", drugs_name)
- print("规格:", specifications)
- # print('完整药名:', drugs_name + specifications)
- return drugs_name # , specifications
- else:
- if title == '999抗病毒口服液10ml*12' or title == '999抗病毒口服液':
- drugs_name = title
- specifications = '10ml*12支/盒'
- return drugs_name # , specifications
- elif title == '999抗病毒口服液10ml*10':
- drugs_name = title
- specifications = '10ml*10支/盒'
- return drugs_name # , specifications
- elif title == '999小柴胡颗粒':
- drugs_name = title
- specifications = '10g*9袋/盒'
- return drugs_name # , specifications
- elif title == '999养胃舒颗粒':
- drugs_name = title
- specifications = '10g*10袋/盒'
- return drugs_name # , specifications
- elif title == '三九胃泰胶囊':
- drugs_name = title
- specifications = '0.5g*24粒/盒'
- return drugs_name # , specifications
- elif title == '999补脾益肠丸':
- drugs_name = title
- specifications = '6g*9袋/盒'
- return drugs_name # , specifications
- elif title == '999感冒灵颗粒':
- drugs_name = title
- specifications = '10g*9袋/盒'
- return drugs_name # , specifications
- elif title == '999感冒灵胶囊':
- drugs_name = title
- specifications = '0.5g*12粒/盒'
- return drugs_name # , specifications
- else:
- print("没有匹配到预期格式")
- drugs_name = title
- specifications = ''
- return drugs_name # , specifications
- # 用 safe_exec 包装内部逻辑,确保验证码阻塞
- return self.safe_exec(_inner)
- def _merge_screenshots(self, screens):
- # 仅拼接截图,无额外功能
- if len(screens) == 1:
- return screens[0].convert('RGB')
- rgb_screens = [s.convert('RGB') for s in screens]
- total_width = rgb_screens[0].width
- total_height = sum(s.height for s in rgb_screens)
- merged_img = Image.new('RGB', (total_width, total_height))
- y_offset = 0
- for img in rgb_screens:
- merged_img.paste(img, (0, y_offset))
- y_offset += img.height
- return merged_img
- def get_oss_url(self):
- """核心方法:截图+临时本地保存+上传OSS+上传成功删本地文件+返回OSS URL,可直接赋值给oss_file"""
- local_file_path = None
- try:
- # 1. 提取标题
- title = self._get_title()
- self.logger.info(f"获取标题: {title[:20]}..." if title else "未获取到标题")
- # 2. 生成本地文件路径
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- safe_title = re.sub(r'[\\/*?:"<>|]', '_', title)
- local_dir = "../scrape_data"
- os.makedirs(local_dir, exist_ok=True)
- local_file_path = os.path.join(local_dir, f"{timestamp}_{safe_title}.jpg")
- # 3. 滚动截图
- screen_list = [self.d.screenshot()]
- w, h = self.d.window_size()
- for i in range(self.scroll_times):
- # 可能滑动距离太短,截不到店名。原本是0.8
- # self.d.swipe(w // 2, h * 0.9, w // 2, h * 0.1, duration=random.uniform(0.6, 1.2))
- self.d.swipe(w // 2, h * 0.85, w // 2, h * 0.15, # 滑动到15%
- duration=random.uniform(0.8, 1.5))
- time.sleep(random.uniform(2.0, 4.0))
- screen_list.append(self.d.screenshot())
- if self.d(textContains='商家服务').exists:
- # 看情况是否需要补滑
- break
- # # ========== 自动处理“是否存储图像”弹窗 ==========
- # # 检测弹窗是否存在(根据弹窗的文本/控件ID定位)
- # # 通过“是否存储图像”文本定位弹窗
- # # if self.d(text="是否存储图像").exists(timeout=2):
- # # # 点击“取消”(不需要系统存储截图)
- # # self.d(text="取消").click(timeout=2)
- # # self.logger.info("已自动关闭“是否存储图像”弹窗")
- #
- # #出现标题 break
- # ========== 滑动截图完成后,滑回初始位置 ==========
- # self.logger.info("开始滑回初始位置")
- #
- # for i in range(self.scroll_times):
- # # 反向滑动(与正向滑动方向相反)
- # self.d.swipe_ext('down', 0.8)
- # time.sleep(random.uniform(1.0, 2.0))
- # print(f"第{i+1}次反向滑动,已滑回部分距离")
- # self.logger.info("✅ 已滑回初始页面位置")
- # 4. 拼接+压缩+保存
- merged_img = self._merge_screenshots(screen_list)
- if 0.1 < self.resize_ratio < 1.0:
- new_size = (int(merged_img.width * self.resize_ratio), int(merged_img.height * self.resize_ratio))
- resample_mode = Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS
- merged_img = merged_img.resize(new_size, resample_mode)
- # 临时保存到本地
- merged_img.save(local_file_path, format='JPEG', quality=self.compress_quality)
- merged_img.close() # 释放长图句柄
- self.logger.info(f"临时本地保存: {local_file_path}")
- # 5. 上传OSS
- oss_url = self._upload_to_oss(local_file_path)
- # 6. 核心:OSS上传成功后,删除本地临时文件
- if oss_url is not None:
- try:
- # 先不删除,检查还有没有问题
- # os.remove(local_file_path)
- self.logger.info(f"✅ OSS上传成功,已删除本地临时文件: {local_file_path}")
- # 若本地目录为空,可删除目录(按需开启)
- # if not os.listdir(local_dir):
- # os.rmdir(local_dir)
- # self.logger.info(f"本地目录{local_dir}为空,已删除")
- except Exception as e:
- self.logger.warning(f"⚠️ OSS上传成功,但删除本地文件失败: {e}")
- return oss_url
- except Exception as e:
- self.logger.error(f"截图/上传失败: {e}")
- return None
- def get_access_token():
- AppKey = "tRK2RhyItCSh6BzyT4CNVXQa"
- AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh"
- token_url = 'https://aip.baidubce.com/oauth/2.0/token'
- url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}"
- payload = ""
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- try:
- return response.json()['access_token']
- except:
- return None
- def get_mysql():
- """
- 建立并返回一个到数据库的连接对象
- """
- import pymysql
- return pymysql.connect(
- host=Config.DB_HOST, # "localhost", # 修改后的主机
- port=Config.DB_PORT, # 3306, # 添加端口号
- user=Config.DB_USER, # 'root', # 修改后的用户名
- password=Config.DB_PASSWORD, # 修改后的密码
- db=Config.DB_NAME, # "drug_data", # 修改后的数据库名
- charset='utf8mb4'
- )
- class MT:
- def __init__(self, key):
- # self.package_name = 'com.sankuai.meituan'
- self.package_name = Config.PACKAGE_NAME
- self.access_token = get_access_token()
- self.city2province = self.get_city_info()
- self.APP_ID = '116857964'
- self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh'
- self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3'
- self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY)
- # host = Config.DB_HOST #"localhost"
- # user = Config.DB_USER #"root"
- # password = Config.DB_PASSWORD #"dfwy2025"
- # database = Config.DB_NAME #"drug_data"
- # port = Config.DB_PORT#3306
- # print(f'数据库配置:host:{host},user:{user},password:{password},database:{database},port:{port}')
- self.table_name = Config.DB_TABLE # "mt_drug_middle"
- self.shop_table_name = Config.DB_SHOP_TABLE
- # print(f'数据库表名:table_name:{self.table_name},shop_table_name:{self.shop_table_name}')
- # self.mysql_client = mysqlClient(host, user, password, database, port)
- self.loggerMT = logging.getLogger()
- self.search_key = key # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒
- self.unrelated_data = 0 # 无关数据数量
- self.shop_data_num = 0 # 店铺数据数量
- def stop_app(self):
- self.d.app_stop(self.package_name)
- time.sleep(5)
- def start_app(self):
- self.d.app_start(self.package_name)
- time.sleep(5)
- def restart_app(self):
- """
- 重启app
- :return:
- """
- self.stop_app()
- self.start_app()
- @staticmethod
- def get_sleep_time():
- # return random.randint(5, 8)
- return random.randint(1, 3)
- @staticmethod
- def get_current_date():
- return datetime.datetime.now().strftime('%Y/%m/%d')
- @staticmethod
- def get_city_info():
- """
- 获取所有的省市数据
- :return:
- """
- file_path = '../kailin_city.json'
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- province = {province_one["id"]: province_one for province_one in data['province']}
- city2province = dict()
- city = data['city']
- for city_one in city:
- name = city_one['name']
- pid = city_one['pid']
- if len(str(pid)) > 2:
- pid = int(re.match('^\d{2}', str(pid)).group())
- city2province[name] = province[pid]['name']
- return city2province
- def get_shop_name(self):
- """
- 获取店铺名
- :return:
- """
- try:
- shop_name = self.d.xpath(
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到店铺名:{shop_name}')
- return shop_name
- except:
- try:
- shop_name = self.d.xpath(
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到店铺名2:{shop_name}')
- return shop_name
- except Exception as e:
- # 点击店铺曲获取店铺名称
- print("点击店铺进入后获取店铺名称")
- self.enter_shop()
- shop_xpath = '//*[@resource-id="com.sankuai.meituan:id/layout_header_view"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]//android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(shop_xpath).exists:
- shop_name = self.d.xpath(shop_xpath).text
- self.swipe_back(1)
- return shop_name
- else:
- print(f'获取店铺名出错:{e}')
- shop_name = ''
- return shop_name
- def get_qualification_number(self):
- """
- 获取资质编号
- :return:
- """
- try:
- qualification_number_str = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text
- qualification_number = qualification_number_str.strip('资质编号:').strip()
- return qualification_number
- except:
- return None
- def get_shop_address(self):
- try:
- xpath = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView'
- if self.d.xpath(xpath).exists:
- shop_address = self.d.xpath(xpath).text
- print(f'111-获取到店铺地址:{shop_address}')
- if '发货时间' in shop_address:
- print(f'店铺地址包含发货时间,再次获取店铺地址')
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.TextView'
- if self.d.xpath(xpath2).exists:
- shop_address = self.d.xpath(xpath2).text
- print(f'222-获取到店铺地址:{shop_address}')
- else:
- print(f'222-xpath2获取店铺地址失败')
- else:
- shop_address = ''
- print(f'333-获取到店铺地址:{shop_address}')
- return shop_address
- except:
- print(f'获取店铺地址出错-get_shop_address')
- return None
- def enter_detail(self):
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/recycler"]/android.widget.FrameLayout[1]').click()
- time.sleep(self.get_sleep_time())
- def save_to_database(self, data):
- print(f'保存数据到数据库:{data}')
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- # add_sql = "insert into delete_friend_table(delete_user_name,delete_user_id,delete_content,delete_time) value(%s,%s,%s,%s)"
- add_sql = f"""
- INSERT INTO {self.table_name}
- (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform, search_key, sales, inventory, snapshot_url)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- # cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], self.get_current_date(), data['scrape_province'], data['availability'], data['credit_code'], data['platform']))
- cur.execute(add_sql,
- (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'],
- data['business_license_company'], data['province'], data['city'], data['manufacturer'],
- data['specification'], data['approval_number'], data['product_link'], data['scrape_date'],
- data['scrape_province'], data['availability'], data['credit_code'], data['platform'],
- data['search_key'], data['sales'], data['inventory'], data['snapshot_url']))
- conn.commit() # 提交数据
- # self.mysql_client.insert(self.table_name, data)
- print(f"存入数据库成功")
- def save_shop_info_to_database(self, data):
- print(f'保存店铺数据到数据库:{data}')
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- add_sql = f"""
- INSERT INTO {self.shop_table_name}
- (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
- cur.execute(add_sql, (data['shop'], data['contact_address'], data['qualification_number'],
- data['business_license_company'], data['business_license_address'], data['scrape_date'],
- data['platform']))
- conn.commit() # 提交数据
- # self.mysql_client.insert(self.shop_table_name, data)
- print(f'存入店铺信息到数据库成功')
- def swipe_up(self):
- """
- 上滑
- :return:
- """
- screen_width = self.d.info['displayWidth']
- screen_height = self.d.info['displayHeight']
- duration_rate = random.uniform(0, 0.3)
- self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate)
- no = random.uniform(0, 1)
- if no > 0.85:
- # 有的时候卡着 再稍微往上滑一点点
- self.d.swipe_ext("up", 0.1)
- time.sleep(self.get_sleep_time())
- def swipe_back(self, no):
- """
- 返回
- :param no: 回退次数
- :return:
- """
- for idx in range(no):
- self.d.press('back')
- time.sleep(self.get_sleep_time())
- def drug_price(self):
- """
- 获取药品价格
- :return:
- """
- try:
- price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text
- price = float(re.search(r'[\d\.]+', price_str).group())
- print(f'获取到价格:{price}')
- return price
- except Exception as e:
- print(f'提取价格出错-->{e}')
- return None
- def drug_sale_num(self):
- """
- 获取药品销量
- :return:
- """
- try:
- sales_element = self.d.xpath('//*[starts-with(@text,"已售")]')
- if sales_element.exists:
- sales_num_str = self.d.xpath('//*[starts-with(@text,"已售")]').text
- sales_num_str = sales_num_str.replace("已售", "").strip()
- # price = float(re.search(r'[\d\.]+', price_str).group())
- print(f'获取到已售数量:{sales_num_str}')
- return sales_num_str
- return None
- except Exception as e:
- print(f'提取已售数量出错-->{e}')
- return None
- def restart_uiautomator_services(self, device_id):
- """
- 重启atx的uiautomator 服务
- :param device_id:
- :return:
- """
- stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop'
- start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d'
- # result = subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- # print(result.stdout)
- subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- # def connect_devices(self, device_id):
- # """
- # 连接设备
- # :return:
- # """
- # try:
- # self.d = u2.connect_usb(device_id)
- # # 设置隐形等待时间
- # # self.d.implicitly_wait(5)
- # self.restart_uiautomator_services(device_id)
- # print(f'连接到设备:{device_id}')
- # except Exception as e:
- # print(f'{device_id} 连接错误: {e}')
- # raise Exception(e)
- def connect_devices(self, device_id):
- """
- 连接设备
- :return:
- """
- try:
- self.d = u2.connect_usb(device_id)
- self.restart_uiautomator_services(device_id)
- self.oss_config = {
- "access_key_id": 'LTAI5tDwjfteBvivYN41r8sJ',
- "access_key_secret": 'yowuOGi2nYYnrqGpO3qcz94C4brcPp',
- "endpoint": "oss-cn-shenzhen.aliyuncs.com", # 例:oss-cn-beijing.aliyuncs.com
- "bucket_name": "zhijiayun-jiansuo",
- "oss_prefix": "scrape_data/" # OSS中存放截图的前缀(虚拟文件夹)
- }
- # jd_screenshot_ins = JDScreenshot(
- # d=self.d, # 传入你已连接好的设备实例
- # oss_config=self.oss_config,
- # scroll_times=2, # 可选,自定义滚动次数
- # compress_quality=8, # 可选,自定义压缩质量
- # resize_ratio=0.9 # 可选,自定义缩放比例
- # )
- print(f'连接到设备:{device_id}')
- self.loggerMT.info(f'连接到设备:{device_id}')
- except Exception as e:
- print(f'{device_id} 连接错误: {e}')
- self.loggerMT.info(f'{device_id} 连接错误: {e}')
- raise Exception(e)
- def get_ocr_res(self, img):
- try:
- # img地址
- print(f'开始识别图片:{img}')
- request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license"
- # 二进制方式打开图片文件
- f = open(img, 'rb')
- img = base64.b64encode(f.read())
- params = {"image": img}
- # access_token = get_access_token()
- request_url = request_url + "?access_token=" + self.access_token
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- response = requests.post(request_url, data=params, headers=headers)
- if response:
- res = response.json()
- new_dic = dict()
- for ite in res['words_result'].keys():
- new_dic[ite] = res['words_result'][ite]['words']
- print('资质数据信息', new_dic)
- return new_dic
- else:
- return None
- except:
- return None
- def remove_watermark(self, img_path):
- """
- 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据
- :param img_path: 图片路径
- :return: 二进制图片数据
- """
- img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
- endswith = os.path.splitext(img_path)[1]
- new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8)
- _, img_binary = cv2.imencode(endswith, new)
- return img_binary
- def get_ocr_res_image(self, img):
- try:
- image = self.remove_watermark(img)
- # image_file = open(img,'wb')
- # image_file.write(image)
- # res_image = self.client.basicAccurate(image) # 高精度
- res_image = self.client.basicGeneral(image)
- # print(f'百度api返回结果:{res_image}')
- # print(res_image.get('words_result', ''))
- # new_dic = dict()
- data = res_image.get('words_result', '')
- print(f'百度api返回结果:{data}')
- # full_text = ';'.join(item['words'] for item in data)
- # address = ''
- # for item in data:
- # if '企业注册号' in item['words']:
- # print('come in 111')
- # reg_number = item['words'].split(':', 1)[1].strip()
- # elif '企业名称' in item['words']:
- # print('come in 222')
- # company_name = item['words'].split(':', 1)[1].strip()
- # elif '所:' in item['words']:
- # print('come in 333')
- # address = item['words'].split(':', 1)[1].strip()
- # # 输出结果
- # print("企业注册号:", reg_number)
- # print("企业名称:", company_name)
- # print("住所:", address)
- return data
- except:
- return None
- def screenshot_the_business_license(self, qualification_number):
- screenshot_path = 'screenshot1.png'
- self.d.screenshot(screenshot_path)
- img = cv2.imread(screenshot_path)
- # 指定裁剪区域 (left, top, right, bottom)
- left = 0
- top = 480
- right = 720
- bottom = 1420
- cropped_img = img[top:bottom, left:right]
- # 创建目录
- SCREENSHOT_DIR = Path('screenshot') # 注意这里的变化和py文件同一级目录即可
- SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
- if qualification_number:
- # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- cropped_screenshot_path = SCREENSHOT_DIR / f'{qualification_number}.png'
- else:
- cropped_screenshot_path = 'cropped_screenshot.png'
- cv2.imwrite(cropped_screenshot_path, cropped_img)
- return cropped_screenshot_path
- def screenshot_instruction(self):
- # 获取当前时间
- current_time = datetime.datetime.now()
- # 格式化为时分秒
- time_str = current_time.strftime("%H-%M-%S")
- # 生成随机的 8 位字符串
- random_str = secrets.token_hex(4) # 生成 4 个字节的随机字符串,转换为 8 位十六进制字符串
- print(time_str)
- screenshot_path = 'instructionscreenshot1-' + time_str + '-' + random_str + '.png'
- self.d.screenshot(screenshot_path)
- return screenshot_path
- def extract_specification(self, text):
- """提取药品规格信息"""
- # 方法1:简单去除到期信息
- pattern = r'^[^【]+'
- match = re.search(pattern, text)
- if match:
- return match.group(0).strip()
- return text
- # 获取商品title
- def get_title(self):
- # try:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- # except:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text
- # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text
- def _inner():
- temp_search_key = self.search_key
- if self.search_key == '天士力养血清脑丸2.5g*9袋':
- temp_search_key = '养血清脑丸2.5g*9袋'
- print(f'获取商品title时的搜索关键字:{temp_search_key}')
- # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text
- # 初始化
- drugs_name = ''
- specifications = ''
- title = ''
- # 循环的获取title为了有时间来处理人机验证
- for m in range(1, 6000):
- if self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').exists:
- title = self.safe_exec(
- lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text
- )
- print(f"第{m}次获取title成功")
- break
- else:
- time.sleep(3)
- # return drugs_name, specifications
- # drugs_name = ''
- # specifications = ''
- # try:
- # title_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- # title_xpath_2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- # if self.d.xpath(title_xpath).exists:
- # title = self.d.xpath(title_xpath).text
- # print(f"title_xpath获取的title={title}")
- # if temp_search_key not in title:
- # return drugs_name, specifications
- # elif self.d.xpath(title_xpath_2).exists:
- # title = self.d.xpath(title_xpath_2).text
- # print(f"title_xpath_2获取的title={title}")
- # if temp_search_key not in title:
- # return drugs_name, specifications
- # else:
- # print('title_xpath不存在,请确认')
- # return drugs_name, specifications
- # # title = self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text
- # except Exception as e:
- # print(f"发生异常: {e}")
- # return drugs_name, specifications
- # 奇怪:有的时候title取出来的记过第一位会多一个0
- # title = self.safe_exec(self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text)
- # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- title = title[1:] if title.startswith('0') else title
- print(f'获取到药品标题:{title}')
- # 从里面匹配出药品名和规格
- # drugs_name
- # specifications
- # match = re.search(r'([^\d]+)([\d\D]+)', title)
- if self.search_key == '999赐多康大豆':
- return title, '1罐'
- if self.search_key == "999感冒清热颗粒":
- match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title)
- else:
- match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title)
- if match:
- # drugs_name = match.group(1).strip() + match.group(2).strip()
- drugs_name = title
- specifications = match.group(3).strip()
- print("药品名:", drugs_name)
- print("规格:", specifications)
- # 如果品规中包含到期则需要再次的正则处理
- if '到期' in specifications:
- specifications = self.extract_specification(specifications)
- # print('完整药名:', drugs_name + specifications)
- return drugs_name, specifications
- else:
- if title == '999抗病毒口服液10ml*12' or title == '999抗病毒口服液':
- drugs_name = title
- specifications = '10ml*12支/盒'
- return drugs_name, specifications
- elif title == '999抗病毒口服液10ml*10':
- drugs_name = title
- specifications = '10ml*10支/盒'
- return drugs_name, specifications
- elif title == '999小柴胡颗粒':
- drugs_name = title
- specifications = '10g*9袋/盒'
- return drugs_name, specifications
- elif title == '999养胃舒颗粒':
- drugs_name = title
- specifications = '10g*10袋/盒'
- return drugs_name, specifications
- elif title == '三九胃泰胶囊':
- drugs_name = title
- specifications = '0.5g*24粒/盒'
- return drugs_name, specifications
- elif title == '999补脾益肠丸':
- drugs_name = title
- specifications = '6g*9袋/盒'
- return drugs_name, specifications
- elif title == '999复方感冒灵颗粒':
- drugs_name = title
- specifications = '12.5g*15袋/盒'
- return drugs_name, specifications
- else:
- print("没有匹配到预期格式")
- drugs_name = title
- specifications = ''
- return drugs_name, specifications
- # 用 safe_exec 包装内部逻辑,确保验证码阻塞
- return self.safe_exec(_inner)
- def enter_shop(self):
- """
- 进店,方便提取资质环境
- :return:
- """
- # self.d.xpath('//*[@text="进店"]').click()
- self.d.xpath('//*[@text="店铺"]').click()
- time.sleep(self.get_sleep_time())
- def enter_shoper(self):
- """
- 进入商家
- :return:
- """
- is_shoper_exists = 0
- for i in range(10):
- if self.d.xpath('//*[@text="商家"]').exists:
- print(f'第{i}次商家存在')
- is_shoper_exists = 1
- break
- else:
- print(f'第{i}次商家不存在')
- time.sleep(self.get_sleep_time())
- if is_shoper_exists == 1:
- self.d.xpath('//*[@text="商家"]').click()
- time.sleep(self.get_sleep_time())
- return True
- else:
- return False
- # 点击查看商家资质
- def scan_shoper_license(self):
- exist_shoper = 0
- for i in range(10):
- if self.d.xpath('//*[@text="查看商家资质"]').exists:
- print(f'第{i}次查看商家资质存在')
- exist_shoper = 1
- break
- else:
- print(f'第{i}次查看商家资质不存在')
- time.sleep(self.get_sleep_time())
- if exist_shoper == 1:
- self.d.xpath('//*[@text="查看商家资质"]').click()
- time.sleep(self.get_sleep_time())
- else:
- self.swipe_back(1)
- # 验证商品的信息是否在数据库中已存在
- def data_is_exists(self, data):
- required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform']
- if not all(key in data for key in required_keys):
- missing = [key for key in required_keys if key not in data]
- logging.error(f"缺少必要字段: {', '.join(missing)}")
- return None
- try:
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- # query_sql = f"SELECT * FROM {self.table_name} WHERE product = '{data['product']}' AND min_price = '{data['min_price']}' AND shop = '{data['shop']}' AND scrape_date = '{data['scrape_date']}' AND platform = '{data['platform']}'"
- # cur.execute(query_sql)
- query_sql = """
- SELECT * FROM {}
- WHERE product = %s
- AND min_price = %s
- AND shop = %s
- AND scrape_date = %s
- AND platform = %s
- """.format(self.table_name)
- cur.execute(query_sql, (
- data['product'],
- data['min_price'],
- data['shop'],
- data['scrape_date'],
- data['platform']
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
- # 验证店铺信息是否在数据库中已存在
- def shop_is_exists_database(self, shop):
- try:
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- query_sql = """
- SELECT * FROM {}
- WHERE shop = %s
- """.format(self.shop_table_name)
- cur.execute(query_sql, (
- shop
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
- def wait_if_verifying(self, monitor, timeout=120):
- """验证码处理期间阻塞主线程"""
- start = time.time()
- while monitor.pausing.is_set() and time.time() - start < timeout:
- time.sleep(1)
- # def safe_xpath(self, xpath, timeout=10):
- # """线程安全 xpath 查找"""
- # self.wait_if_verifying(self.monitor)
- # return self.d.xpath(xpath).wait(timeout=timeout)
- def wait_for_ready(self, monitor, timeout=86400):
- """进入每一页前都先等验证码"""
- start = time.time()
- while monitor.pausing.is_set() and time.time() - start < timeout:
- time.sleep(1)
- # 额外保险:如果验证码突然在这一秒才弹,再主动扫一次
- monitor.check_and_handle_popup()
- def safe_list(self, xpath, monitor):
- """线程安全地拿商品列表"""
- self.wait_for_ready(monitor)
- return self.d.xpath(xpath).all()
- def safe_exec(self, func, *args, **kwargs):
- """
- 万能安全壳:执行 func 前检查验证码,
- 若监控线程已置位 pausing,则一直阻塞直到放行。
- """
- while self.monitor.pausing.is_set():
- time.sleep(1)
- # 执行真正逻辑
- return func(*args, **kwargs)
- def get_next_data(self, data, target):
- for i, item in enumerate(data):
- if item['words'] == target:
- if i + 1 < len(data):
- return data[i + 1]['words']
- return None
- def delete_instruction_screenshot(self, screenshot_path):
- # 删除截图文件
- try:
- os.remove(screenshot_path)
- print(f"截图文件已删除:{screenshot_path}")
- except FileNotFoundError:
- print(f"文件未找到,无法删除:{screenshot_path}")
- except Exception as e:
- print(f"删除文件时出错:{e}")
- '''
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- self.d.xpath('//*[@text="查看详细说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- self.d.xpath('//*[@text="加载更多"]').click_exists()
- loop_page = 5
- # new_list = list()
- new_list = []
- for i in range(loop_page):
- self.d.xpath('//*[@text="加载更多"]').click_exists()
- time.sleep(0.2)
- if i == 0:
- self.d.swipe(200, 1000, 200, 300, 0.4)
- else:
- self.d.swipe(200, 1000, 200, 62)
- time.sleep(0.2)
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- time.sleep(0.2)
- all_tt = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all()
- for idx in range(1, len(all_tt) + 1):
- all_tt1 = self.d.xpath(
- f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all()
- # print(f'当前说明书列表数据:{all_tt1}')
- for tt in all_tt1:
- if tt.text and tt.text != '展开全文':
- new_list.append(tt.text)
- if i == 0:
- height = 938
- else:
- drug_box = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info
- bounds = drug_box['bounds']
- height = bounds['bottom'] - bounds['top']
- if height < 938:
- # print('说明书翻页到底部')
- break
- # 展开全文
- new_list = [item for item in new_list if item != '展开全文']
- print(f'当前说明书列表数据:{new_list}')
- # expiry_date_index = next(idx for idx, i in enumerate(new_list) if i == '有效期')
- # manufacturer_index = next(idx for idx, i in enumerate(new_list) if i == '生产单位')
- # approval_number_index = next(idx for idx, i in enumerate(new_list) if i == '批准文号')
- # res_data = {
- # "有效期": new_list[expiry_date_index + 1],
- # "生产单位": new_list[manufacturer_index + 1],
- # "批准文号": new_list[approval_number_index + 1]
- # }
- res_data = {
- "有效期": (new_list[new_list.index("有效期") + 1]) if "有效期" in new_list and new_list.index("有效期") + 1 < len(new_list) else "",
- "生产单位": (new_list[new_list.index("生产单位") + 1]) if "生产单位" in new_list and new_list.index("生产单位") + 1 < len(new_list) else "",
- "批准文号": (new_list[new_list.index("批准文号") + 1]) if "批准文号" in new_list and new_list.index("批准文号") + 1 < len(new_list) else ""
- }
- print(f'当前说明书字典数据:{res_data}')
- return res_data
- '''
- '''
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- self.d.xpath('//*[@text="查看详细说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- # 1) 先向上滑动一次,触发“加载更多”出现
- self.d.swipe(200, 1000, 200, 300, 0.4)
- time.sleep(0.3)
- # 2) 再进入“出现就点”的循环
- while self.d.xpath('//*[@text="加载更多"]').click_exists(timeout=1):
- time.sleep(0.2)
- self.d.swipe(200, 1000, 200, 300, 0.4)
- # self.d.swipe(200, 1000, 200, 62)
- time.sleep(0.2)
- # 一次性获取所有文本
- texts = [
- node.text.strip()
- # for node in self.d.xpath('//android.widget.TextView').all()
- for node in self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.TextView').all()
- if node.text and node.text.strip() and node.text != '加载更多'
- ]
- print(f'当前说明书列表数据:{texts}')
- # 提取关键字段
- def safe_get(key):
- # try:
- # idx = texts.index(key)
- # return texts[idx + 1] if idx + 1 < len(texts) else ""
- # except ValueError:
- # return ""
- try:
- idx = next(i for i, text in enumerate(texts) if text == key)
- return texts[idx + 1] if idx + 1 < len(texts) else ""
- except StopIteration:
- return ""
- res_data = {
- "有效期": safe_get("有效期"),
- "生产单位": safe_get("生产单位"),
- "批准文号": safe_get("批准文号")
- }
- print(f'当前说明书字典数据:{res_data}')
- return res_data
- '''
- '''
- def get_instructions_data(self):
- """
- 说明书键值对采集:连续两个 TextView 为一对,精确提取
- """
- # 1. 进入说明书
- self.d(text="说明").click()
- time.sleep(0.5)
- self.d(text="查看详细说明").click()
- time.sleep(0.5)
- # self.d(text="加载更多").click_exists(timeout=0.5)
- # 2. 找到说明书最外层 ScrollView(页面主体)
- scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.ScrollView")
- count = scroll_view.count
- print(f"找到的 ScrollView 数量: {count}")
- if not scroll_view.exists:
- return {"有效期": "", "生产单位": "", "批准文号": ""}
- # 3. 在 ScrollView 内再定位真正包含键值对的容器
- # 绝大多数美团说明书页面对应的是 ScrollView > ViewGroup > 若干 TextView
- kv_container = scroll_view.child(className="android.view.ViewGroup")
- if not kv_container.exists:
- kv_container = scroll_view # 降级:直接对 ScrollView 取子孙 TextView
- # 4. 滑动到底并收集所有 TextView(保留顺序)
- all_texts = []
- max_swipe = 5
- last_length = 0
- for _ in range(max_swipe):
- texts = kv_container.child(className="android.widget.TextView")
- #获取texts中的文本
- print(f'当前说明书列表数据:{texts}')
- current_texts = []
- self.loggerMT.info(f'说明书111')
- for tv in texts:
- try:
- txt = tv.get_text().strip()
- # txt = tv.info['text'].strip()
- except Exception:
- continue
- if txt and txt != "展开全文":
- current_texts.append(txt)
- self.loggerMT.info(f'说明书222')
- print(f'当前说明书列表数据:{current_texts}')
- # 去重
- if current_texts:
- current_texts = [t for t in current_texts if t not in all_texts]
- all_texts.extend(current_texts)
- # 判断是否到底
- # if not scroll_view.info.get("scrollable"):
- # break
- # 判断是否到底
- if len(all_texts) == last_length:
- break
- last_length = len(all_texts)
- # self.d.swipe_ext("up", scale=0.7)
- #向上滑动一次
- self.d.swipe(200, 1000, 200, 300, 0.2)
- time.sleep(0.2)
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- # 5. 成对解析
- res_data = {"有效期": "", "生产单位": "", "批准文号": ""}
- for i in range(len(all_texts) - 1):
- key = all_texts[i]
- val = all_texts[i + 1]
- if key in res_data:
- res_data[key] = val
- print(f'说明书文本共 {len(all_texts)} 条,提取结果: {res_data}')
- # time.sleep(1000000)
- return res_data
- '''
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- if self.d.xpath('//*[@text="查看详细说明"]').exists:
- self.d.xpath('//*[@text="查看详细说明"]').click()
- else:
- for i in range(8):
- if self.d.xpath('//*[@text="查看全部"]').exists:
- print('开始点击查看全部')
- break
- self.d.swipe_ext('down', 0.3)
- time.sleep(1)
- if self.d.xpath('//*[@text="查看全部"]').exists:
- print('开始点击查看全部2')
- break
- if self.d.xpath('//*[@text="查看全部"]').exists:
- self.d.xpath('//*[@text="查看全部"]').click()
- else:
- res_data = {
- "有效期": '',
- "生产单位": '',
- "批准文号": ''
- }
- self.loggerMT.info('获取到的说明书信息为空。')
- return res_data
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- # self.d.xpath('//*[@text="加载更多"]').click_exists()
- # loop_page = 5
- # new_list = list()
- # new_list = []
- for ii in range(8):
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- time.sleep(0.2)
- break
- else:
- self.d.swipe(200, 1000, 200, 300, 0.3)
- # self.d.swipe_ext("up", scale=0.3)
- for iii in range(10):
- if self.d.xpath('//*[@text="生产单位"]').exists and self.d.xpath('//*[@text="批准文号"]').exists:
- break
- else:
- self.d.swipe(200, 1300, 200, 300, 0.3)
- # self.d.swipe_ext("up", scale=0.3)
- instruction_path = self.screenshot_instruction()
- print(f"instruction_path= {instruction_path}")
- time.sleep(2)
- ocr_res = self.get_ocr_res_image(instruction_path)
- # print(f'ocr_res:{ocr_res}')
- if ocr_res:
- # 获取有效期的下一个数据
- validity = self.get_next_data(ocr_res, '有效期')
- # 获取批准文号的下一个数据
- approval_number = self.get_next_data(ocr_res, '批准文号')
- # 获取生产单位的下一个数据
- manufacturer = self.get_next_data(ocr_res, '生产单位')
- else:
- validity = ''
- approval_number = ''
- manufacturer = ''
- # print("有效期:", validity)
- # print("批准文号:", approval_number)
- # print("生产单位:", manufacturer)
- res_data = {
- "有效期": validity,
- "生产单位": manufacturer,
- "批准文号": approval_number
- }
- print(f"res_data={res_data}")
- time.sleep(1)
- self.delete_instruction_screenshot(instruction_path)
- return res_data
- def has_instructions(self):
- """
- 是否有说明书
- :return:
- """
- # 没有说明书的无法采集具体数据
- time.sleep(self.get_sleep_time())
- is_has_instructions = False
- for i in range(8):
- if self.d.xpath('//*[@text="说明"]').exists:
- print(f"第{i}次有说明书1")
- is_has_instructions = True
- break
- self.d.swipe_ext('down', 0.3)
- time.sleep(1)
- # detail_info = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info
- # bounds = detail_info['bounds']
- # height = bounds['bottom'] - bounds['top']
- # if self.d.xpath('//*[@text="进店"]').exists and height > 100:
- if self.d.xpath('//*[@text="说明"]').exists:
- is_has_instructions = True
- print(f"第{i}次有说明书2")
- break
- # is_has_instructions = self.d.xpath('//*[@text="说明"]').exists
- return is_has_instructions
- def has_shop(self):
- """
- 是否有进店按钮
- :return:
- """
- # self.d.swipe_ext('up', 0.1)
- time.sleep(self.get_sleep_time())
- is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists
- return is_has_enter_shop
- # 获取商品对应的店铺信息
- def get_license_info_ex(self):
- # self.enter_shop()
- self.safe_exec(self.enter_shop)
- # self.enter_shoper()
- result = self.safe_exec(self.enter_shoper)
- if result == False:
- license_info_data = {'contact_address': '', 'qualification_number': '', 'business_license_company': '',
- 'business_license_address': ''}
- return license_info_data
- for i in range(10):
- if self.d.xpath('//*[@text="查看商家资质"]').exists:
- print(f"第{i}次有商家资质")
- break
- else:
- print(f"第{i}次没有商家资质")
- time.sleep(self.get_sleep_time())
- # 获取地址
- # contact_address = self.get_shop_address()
- contact_address = self.safe_exec(self.get_shop_address)
- # time.sleep(50000)
- ###
- # self.scan_shoper_license()
- self.safe_exec(self.scan_shoper_license)
- # 获取资质编码
- # qualification_number = self.get_qualification_number()
- qualification_number = self.safe_exec(self.get_qualification_number)
- # qualification_number 不为None继续下一步
- if qualification_number:
- # 营业执照公司名称
- business_license_company = ''
- # 营业执照地址
- business_license_address = ''
- self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- cropped_screenshot_path = self.screenshot_the_business_license(qualification_number)
- print(f'cropped_screenshot_path:{cropped_screenshot_path}')
- # if qualification_number:
- # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- # else:
- # cropped_screenshot_path = 'cropped_screenshot.png'
- # ocr_res = self.get_ocr_res('cropped_screenshot.png')
- ocr_res = self.get_ocr_res(cropped_screenshot_path)
- print(f'ocr_res:{ocr_res}')
- # 获取ocr_res 中的地址、单位名称
- if ocr_res:
- if '单位名称' in ocr_res.keys():
- business_license_company = ocr_res['单位名称']
- if '地址' in ocr_res.keys():
- business_license_address = ocr_res['地址']
- license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number,
- 'business_license_company': business_license_company,
- 'business_license_address': business_license_address}
- else:
- license_info_data = {'contact_address': contact_address, 'qualification_number': '',
- 'business_license_company': '', 'business_license_address': ''}
- return license_info_data
- """暂不用该功能
- def get_license_info(self):
- self.enter_shop()
- self.enter_shoper()
- self.scan_shoper_license()
- # 获取资质编码
- qualification_number = self.get_qualification_number()
- if qualification_number:
- table_license_info = self.get_table_license_info(qualification_number)
- if table_license_info:
- return {
- '单位名称': table_license_info[0],
- '地址': table_license_info[1],
- '社会信用代码': table_license_info[2]
- }
- else:
- # operate_no = random.randint(0, 1)
- self.d.click(0.603, 0.27)
- # if operate_no == 0:
- # self.d.xpath('//*[@text="营业执照"]').click()
- # else:
- # self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license()
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- return ocr_res
- # operate_no = random.randint(0, 1)
- self.d.click(0.603, 0.27)
- # if operate_no == 0:
- # self.d.xpath('//*[@text="营业执照"]').click()
- # else:
- # self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license()
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- return ocr_res
- """
- def distinct_target(self):
- result = False
- position_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- is_position = self.d.xpath(position_xpath).exists
- is_position2 = self.d.xpath(position_xpath2).exists
- xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- is_position5 = self.d.xpath(xpath).exists
- is_position6 = self.d.xpath(xpath2).exists
- is_position7 = self.d.xpath(xpath3).exists
- is_position8 = self.d.xpath(xpath4).exists
- xpath5 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath6 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- is_position9 = self.d.xpath(xpath5).exists
- is_position10 = self.d.xpath(xpath6).exists
- # print(f"is_position = {is_position}")
- # print(f"is_position2 = {is_position2}")
- if is_position or is_position2 or is_position5 or is_position6 or is_position7 or is_position8:
- result = True
- if result == False:
- print("---检测没有回到列表页---")
- else:
- print("---检测回到了列表页---")
- return result
- # return is_position
- def enter_target_page(self):
- self.d.xpath('//*[@content-desc="看病买药"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click()
- time.sleep(self.get_sleep_time())
- self.d.send_keys(self.search_key, clear=True)
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@text="搜索"]').click()
- time.sleep(self.get_sleep_time())
- # 增加点击快递送
- self.click_express_send()
- time.sleep(self.get_sleep_time())
- def click_express_send(self):
- # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.HorizontalScrollView[last()]'
- slide_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- slide_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- slide_xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- slide_xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- for i in range(1, 3):
- if self.d.xpath(slide_xpath).exists:
- bounds = self.d.xpath(slide_xpath).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动1')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(slide_xpath2).exists:
- bounds = self.d.xpath(slide_xpath2).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动2')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(slide_xpath3).exists:
- bounds = self.d.xpath(slide_xpath3).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动3')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(slide_xpath4).exists:
- bounds = self.d.xpath(slide_xpath4).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动4')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- max_retry = 5 # 最多尝试次数
- for idx in range(1, max_retry + 1):
- # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]'
- xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- # print(f"xpath:{xpath}")
- # scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.HorizontalScrollView")
- if self.d.xpath(xpath).exists:
- self.d.xpath(xpath).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath快递送成功")
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(xpath2).exists:
- self.d.xpath(xpath2).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath2快递送成功")
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(xpath3).exists:
- self.d.xpath(xpath3).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath3快递送成功")
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(xpath4).exists:
- self.d.xpath(xpath4).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath4快递送成功")
- time.sleep(self.get_sleep_time())
- break
- else:
- print(f"第{idx}次点击xpath或xpath2或xpath3快递送都失败")
- time.sleep(self.get_sleep_time())
- # xpath2= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- # if self.d.xpath(xpath2).exists:
- # self.d.xpath(xpath2).click()
- # print(f"第{idx}次点击xpath2快递送成功")
- # time.sleep(self.get_sleep_time())
- # break
- """暂不用该功能
- def get_table_license_info(self, qualification_number):
- try:
- sql = f'select business_license_company,city,credit_code from mt_drug where credit_code = "{qualification_number}"'
- self.mysql_client.cur.execute(sql)
- res = self.mysql_client.cur.fetchone()
- return res
- except:
- return None
- """
- # def get_clipboard(self):
- # """通过ADB获取Android手机剪贴板内容"""
- # try:
- # result = subprocess.run(
- # ["adb", "shell", "am", "broadcast", "-a", "clipper.get"],
- # capture_output=True,
- # text=True,
- # timeout=5
- # )
- # print(f"获取剪贴板结果: {result.stdout}")
- # # 解析返回信息中的剪贴板内容
- # for line in result.stdout.splitlines():
- # if "data=" in line:
- # return line.split("data=")[1].strip()
- # return ""
- # except Exception as e:
- # print("获取剪贴板失败:", e)
- # return ""
- # def get_clipboard(self):
- # """读取 Android 剪贴板(系统自带命令)"""
- # try:
- # text = subprocess.check_output(
- # ["adb", "shell", "cmd", "clipboard", "get"],
- # text=True, timeout=5, stderr=subprocess.STDOUT
- # ).strip()
- # print(f"获取剪贴板结果: {text}")
- # return text if text else ""
- # except Exception as e:
- # print("获取剪贴板失败:", e)
- # return ""
- def get_clipboard(self):
- time.sleep(1)
- self.loggerMT.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息
- clipboard_content = self.d.clipboard
- if clipboard_content is None:
- return ''
- return clipboard_content.strip()
- # return self.d.clipboard.strip()
- def clear_clipboard(self):
- self.d.set_clipboard("", "text/plain")
- # def clear_clipboard(self):
- # """清空手机剪贴板:写入空字符串(subprocess 版)"""
- # try:
- # subprocess.run(
- # ["adb", "shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", " "],
- # check=True,
- # capture_output=True,
- # text=True,
- # timeout=5
- # )
- # except subprocess.CalledProcessError as e:
- # print("ADB 清空失败:", e.stderr)
- # def clear_clipboard():
- # """清空手机剪贴板:写入空字符串"""
- # try:
- # adb_shell(["shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", ""])
- # except subprocess.CalledProcessError as e:
- # print("ADB 清空失败:", e.output)
- # 获取一个商品的数据、商品对应的店铺的数据
- def get_product_link(self):
- product_link = ''
- # 两种可能的“···”按钮
- dots_xpaths = [
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]'
- ]
- max_retry = 5 # 最多尝试次数
- for idx in range(1, max_retry + 1):
- if product_link: # 已经拿到则退出
- break
- for xp in dots_xpaths:
- if self.d.xpath(xp).exists:
- print(f'{idx}-进入分享点点点')
- self.loggerMT.info(f'{idx}-进入分享点点点')
- # #先清空剪贴板的内容
- # self.clear_clipboard()
- # print("清空剪贴板内容成功。")
- self.d.xpath(xp).click()
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- link_xpath = '//*[@text="复制链接"]'
- if self.d.xpath(link_xpath).exists:
- self.d.xpath(link_xpath).click()
- time.sleep(1)
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'{idx}-商品链接:{product_link}')
- self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- break # 找到并执行后跳出内层循环
- else:
- print(f'{idx}-商品链接:{product_link}')
- self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- product_link = ''
- # self.d.xpath('//*[@text="复制链接"]').click_exists()
- # time.sleep(1)
- # product_link = self.get_clipboard()
- # time.sleep(0.5)
- # print(f'{idx}-商品链接:{product_link}')
- # self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- # break # 找到并执行后跳出内层循环
- if not product_link and idx < max_retry:
- time.sleep(0.5) # 最后一次不需要再等待
- return product_link
- def integrate_data(self):
- product_link = ''
- product, specifications = self.safe_exec(self.get_title) # 药品,规格
- if product:
- # product, specifications = title_info
- temp_search_key = self.search_key
- if self.search_key == '天士力养血清脑丸2.5g*9袋':
- temp_search_key = '[天士力]养血清脑丸2.5g*9袋'
- else:
- self.swipe_back(1)
- return
- min_price = self.drug_price() # 最低价格
- sales_num = self.drug_sale_num() # 销售数量
- snapshot_url = '' # 网页快照
- # 在这里截图存放到OSS;#采集图片存放的oss_url;
- mt_screenshot = MTScreenshot(
- d=self.d,
- oss_config=self.oss_config,
- search_key=self.search_key # 添加这行
- )
- if self.d.xpath('//*[@text="自营"]').exists:
- shop = "美团自营大药房(快递电商)"
- # 爬取日期
- scrape_date = self.get_current_date()
- # scrape_date = "2025-07-18"
- dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- 'platform': '美团'}
- print(f'当前数据:{dup_data}')
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.swipe_back(1)
- return
- else:
- for i in range(8):
- if self.d.xpath('//*[@text="进店"]').exists:
- print('开始获取店铺名1')
- break
- self.d.swipe_ext('up', 0.3)
- time.sleep(1)
- # detail_info = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info
- # bounds = detail_info['bounds']
- # height = bounds['bottom'] - bounds['top']
- # if self.d.xpath('//*[@text="进店"]').exists and height > 100:
- if self.d.xpath('//*[@text="进店"]').exists:
- print('开始获取店铺名2')
- break
- shop = self.get_shop_name()
- # 爬取日期
- scrape_date = self.get_current_date()
- # scrape_date = "2025-07-18"
- dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- 'platform': '美团'}
- print(f'当前数据:{dup_data}')
- # 获取店铺信息开始
- # 暂时不获取店铺信息 start
- is_has_enter_shop = self.has_shop()
- # 需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品
- shop_is_exists = self.shop_is_exists_database(shop)
- # 存在进店 并且店铺的名称不包含美团官方的字样
- print(f"已采集{self.shop_data_num}家店铺数据")
- if is_has_enter_shop and '美团官方' not in shop and '美团自营' not in shop and not shop_is_exists and self.shop_data_num < 500:
- # license_info = self.get_license_info_ex()
- license_info = self.safe_exec(self.get_license_info_ex)
- contact_address = license_info['contact_address']
- qualification_number = license_info['qualification_number']
- business_license_company = license_info['business_license_company']
- business_license_address = license_info['business_license_address']
- save_shop_data = {
- 'shop': shop,
- 'contact_address': contact_address,
- 'qualification_number': qualification_number,
- 'scrape_date': scrape_date,
- 'business_license_company': business_license_company,
- 'business_license_address': business_license_address,
- 'platform': '美团'
- }
- self.save_shop_info_to_database(save_shop_data)
- self.shop_data_num += 1 # 店铺数据数量+1
- self.swipe_back(2)
- else:
- print('不采集店铺信息')
- # 获取店铺信息结束
- # 暂时不获取店铺信息 end
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.swipe_back(1)
- return
- # 商品链接
- product_link = self.get_product_link()
- if not shop:
- print('未获取到店铺名:开始回退')
- self.swipe_back(1)
- return
- if not shop or '自营' in shop:
- self.swipe_back(1)
- return
- time.sleep(self.get_sleep_time())
- # 生产日期为空
- manufacture_date = ''
- business_license_company = ''
- credit_code = ''
- city = ''
- province = ''
- expiry_date = ''
- manufacturer = ''
- approval_number = ''
- # 暂时不获取说明书信息 start
- # 是否存在说明书
- # is_has_instructions = self.has_instructions()
- # 有的药品没有说明书,直接默认
- if self.search_key == '天士力养血清脑丸2.5g*9袋':
- expiry_date = ''
- manufacturer = ''
- approval_number = ''
- if self.search_key == '今维多赐多康牌蛋白粉':
- expiry_date = '18个月'
- manufacturer = '华润圣海健康科技有限公司'
- approval_number = '食健备G202437001992'
- elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4':
- expiry_date = '24个月'
- manufacturer = '浙江华润三九众益制药有限公司'
- approval_number = '国药准字H20090152'
- elif self.search_key == '999可调式生理性海水鼻腔喷雾50':
- expiry_date = '3年'
- manufacturer = '江苏萨瑞斯医疗科技有限公司'
- approval_number = '苏械注准20212140025'
- elif self.search_key == '999蒲地蓝消炎片':
- expiry_date = '24个月'
- manufacturer = '特一药业集团股份有限公司'
- approval_number = '国药准字Z20063596'
- elif self.search_key == '999养胃舒颗粒':
- expiry_date = '36个月'
- manufacturer = '合肥华润神鹿药业有限公司'
- approval_number = '国药准字Z34020289'
- elif self.search_key == '999糠酸莫米松凝胶15':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20080010'
- elif self.search_key == '999黄芪精':
- expiry_date = '36个月'
- manufacturer = '台州南峰药业有限公司'
- approval_number = '国药准字Z33020783'
- elif self.search_key == '999复方感冒灵颗粒':
- expiry_date = '24个月'
- manufacturer = '华润三九(郴州)制药有限公司'
- approval_number = '国药准字Z43020334'
- elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20074155'
- elif self.search_key == '史达功右美沙芬愈创甘油醚糖浆120':
- expiry_date = '暂定24个月,具体有效期以实物说明书为准'
- manufacturer = '史达德药业(北京)有限公司'
- approval_number = '国药准字H11021837'
- elif self.search_key == '999速复康布洛芬缓释胶囊':
- expiry_date = '24个月'
- manufacturer = '北京红林制药有限公司'
- approval_number = '国药准字H20074172'
- elif self.search_key == '999复方板蓝根颗粒15g*9袋/盒':
- expiry_date = '24个月'
- manufacturer = '重庆科瑞东和制药有限责任公司'
- approval_number = '国药准字Z50020420'
- elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g':
- expiry_date = '24个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20073954'
- elif self.search_key == '999维生素C咀嚼片':
- expiry_date = '24个月'
- manufacturer = '甘肃成纪生物药业有限公司'
- approval_number = '国药准字H62021166'
- elif self.search_key == '999强力枇杷露120ml':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字Z36021533'
- elif self.search_key == '999强力枇杷露150ml':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字Z36021533'
- elif self.search_key == '999抗病毒口服液10ml*10' or self.search_key == '999抗病毒口服液10ml*12':
- expiry_date = '24个月'
- manufacturer = '杭州华润老桐君药业有限公司'
- approval_number = '国药准字Z33020518'
- elif self.search_key == '999精氨酸布洛芬颗粒':
- expiry_date = '暂定36个月'
- manufacturer = '华润三九(唐山)药业有限公司'
- approval_number = '国药准字H20070139'
- elif self.search_key == '999糠酸莫米松乳膏10g支':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20074090'
- elif self.search_key == '999选平硝酸咪康唑乳膏20g':
- expiry_date = '24个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20074079'
- elif self.search_key == '999感冒清热颗粒(无糖)6g':
- expiry_date = '36个月'
- manufacturer = '合肥华润神鹿药业有限公司'
- approval_number = '国药准字Z20055023'
- elif self.search_key == '999银菊清咽颗粒':
- expiry_date = '30个月'
- manufacturer = '合肥华润神鹿药业有限公司'
- approval_number = '国药准字Z20026680'
- elif self.search_key == '999阿奇霉素片':
- expiry_date = '48个月'
- manufacturer = '浙江华润三九众益制药有限公司'
- approval_number = '国药准字H20084458'
- elif self.search_key == '999补脾益肠丸':
- expiry_date = '24个月'
- manufacturer = '惠州市九惠制药股份有限公司'
- approval_number = '国药准字Z44023376'
- elif self.search_key == '999壮骨关节丸6g*20':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44023377'
- elif self.search_key == '999壮骨关节胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z20080055'
- elif self.search_key == '999正天丸6g*15':
- expiry_date = '30个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020711'
- elif self.search_key == '999正天胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z20010142'
- elif self.search_key == '三九胃泰胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020704'
- elif self.search_key == '三九胃泰颗粒20g*10':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020705'
- elif self.search_key == '999感冒灵颗粒':
- expiry_date = '24个月'
- manufacturer = '华润三九(枣庄)药业有限公司'
- approval_number = '国药准字Z44021940'
- elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20':
- expiry_date = '36个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字H44024170'
- elif self.search_key == '三九胃泰颗粒20g*6袋':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020705'
- elif self.search_key == '顺峰康王酮康他索乳膏':
- expiry_date = '24个月'
- manufacturer = '广东华润顺峰药业有限公司'
- approval_number = '国药准字H10980204'
- elif self.search_key == '999糠酸莫米松凝胶10':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20080010'
- elif self.search_key == '999板蓝根颗粒10g*20':
- expiry_date = '36个月'
- manufacturer = '广东恒诚制药股份有限公司'
- approval_number = '国药准字Z44021520'
- elif self.search_key == '999复方氨酚烷胺胶囊' or self.search_key == '999复方氨酚烷胺胶囊12粒' or self.search_key == '999复方氨酚烷胺胶囊10粒' or self.search_key == '999复方氨酚烷胺胶囊6粒':
- expiry_date = '36个月'
- manufacturer = '华润三九(唐山)药业有限公司'
- approval_number = '国药准字H13021912'
- elif self.search_key == '999咽炎片0.26g*12片*2板':
- expiry_date = '24个月'
- manufacturer = '华润三九(黄石)药业有限公司'
- approval_number = '国药准字Z42021062'
- elif self.search_key == '999小儿止咳糖浆120' or self.search_key == '999小儿止咳糖浆225':
- expiry_date = '24个月'
- manufacturer = '华润三九(雅安)药业有限公司'
- approval_number = '国药准字Z51020675'
- elif self.search_key == '999小儿感冒颗粒6g*10' or self.search_key == '999小儿感冒颗粒6g*24':
- expiry_date = '36个月'
- manufacturer = '华润三九(枣庄)药业有限公司'
- approval_number = '国药准字Z37021392'
- elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋' or self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋':
- expiry_date = '36个月'
- manufacturer = '华润三九(黄石)药业有限公司'
- approval_number = '国药准字H42022510'
- elif self.search_key == '999感冒灵胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44021939'
- elif self.search_key == '999小儿咽扁颗粒8g*10袋':
- expiry_date = '24个月'
- manufacturer = '华润三九(黄石)药业有限公司'
- approval_number = '国药准字Z42021105'
- elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋':
- expiry_date = '18个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z20100067'
- elif self.search_key == '999感冒清热颗粒12g*18':
- expiry_date = '36个月'
- manufacturer = '山东新大陆制药有限公司'
- approval_number = '国药准字Z37020066'
- elif self.search_key == '999小柴胡颗粒10g*15':
- expiry_date = '24个月'
- manufacturer = '广东一力罗定制药有限公司'
- approval_number = '国药准字Z44020709'
- elif self.search_key == '999布洛芬混悬液':
- expiry_date = '24个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20223755'
- else:
- is_has_instructions = self.safe_exec(self.has_instructions)
- # 说明书等信息
- if is_has_instructions:
- print('开始获取说明书信息')
- # instructions_info = self.get_instructions_data()
- instructions_info = self.safe_exec(self.get_instructions_data)
- if instructions_info['有效期'] is not None:
- expiry_date = instructions_info['有效期'].strip('。')
- if instructions_info['生产单位'] is not None:
- manufacturer = instructions_info['生产单位'].strip('。')
- if instructions_info['批准文号'] is not None:
- approval_number = instructions_info['批准文号'].strip('。')
- else:
- expiry_date = None
- manufacturer = None
- approval_number = None
- # 暂时不获取说明书信息 end
- self.unrelated_data = 0
- if self.search_key == '999小柴胡颗粒10g*15':
- save_search_key = '999小柴胡颗粒'
- else:
- save_search_key = self.search_key
- # 爬取省份
- scrape_province = '广东' # 这里先默认广东
- # 是否有货
- availability = ''
- save_data = {
- 'product': product,
- 'min_price': min_price,
- 'manufacture_date': manufacture_date,
- 'expiry_date': expiry_date,
- 'shop': shop,
- 'business_license_company': business_license_company,
- 'province': province,
- 'city': city,
- 'manufacturer': manufacturer,
- 'specification': specifications,
- 'approval_number': approval_number,
- 'product_link': product_link,
- 'scrape_date': scrape_date,
- 'scrape_province': scrape_province,
- 'availability': availability,
- 'credit_code': credit_code,
- 'platform': '美团',
- 'search_key': save_search_key,
- 'sales': sales_num,
- 'inventory': '',
- 'snapshot_url': snapshot_url
- }
- self.save_to_database(save_data)
- # time.sleep(100000)
- time.sleep(self.get_sleep_time())
- if self.distinct_target():
- print('已到达搜索列表页')
- else:
- for i in range(1):
- print('在详情页')
- self.swipe_back(1)
- time.sleep(self.get_sleep_time())
- # 最外部有个定位按钮
- if self.distinct_target():
- break
- # 主函数
- def main(self, device_id, retry_count=0):
- MAX_RETRY = 3 # 最大重试次数
- spider_no = 0
- self.connect_devices(device_id)
- time.sleep(self.get_sleep_time())
- # self.d.toast.show("测试toast", 20)
- # 启动全局弹窗监控
- self.monitor = SpiderMonitor(self)
- self.monitor.start()
- try:
- # 重新开启美团应用
- self.restart_app()
- # 搜索关键字
- # self.enter_target_page()
- self.safe_exec(self.enter_target_page)
- # print('开始滑动')
- # self.d.drag(300, 1400, 300, 400, 1)
- # time.sleep(100000)
- for idx in range(300):
- print(f'第{idx + 1}页')
- if spider_no > 30:
- time.sleep(60)
- spider_no = 0
- print('目前无关数据量: ', self.unrelated_data)
- # 检查是否需要暂停(验证码过多)
- if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY:
- print("频繁遇到验证码,暂停程序")
- # self.d.toast("请处理验证码后点击继续", 30)
- # 等待用户点击屏幕继续
- self.d.click(0, 0) # 无效点击,等待用户操作
- self.monitor.verification_count = 0
- if self.unrelated_data > 15:
- # 连续超过15个不达标的数据则停止采集
- print("连续超过15个不达标的数据则停止采集")
- return
- # 线程安全获取商品列表
- # drug_lis = self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all()
- # drug_lis = self.safe_list('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout', self.monitor)
- while True:
- if self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').exists:
- break
- time.sleep(1)
- drug_lis = self.safe_exec(
- self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all)
- lis_len = len(drug_lis)
- print(f'当前页面共有{lis_len}个商品')
- for idxx, drug_one in enumerate(drug_lis, start=1):
- bounds = drug_one.info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- # height = bottom - top
- print(f'当前商品bottom:{bottom}')
- print(f'当前商品top:{top}')
- # if 304 <= top and bottom <= 1475: # 默认高度241的才行
- if 304 <= top and bottom <= 1475: # 默认高度241的才行 1559
- # print('目标-->', drug_one.info)
- # drug_one.click()
- # 获取当前元素中的属性来判断是否要点击进入采集
- print(f"这页的第几个商品:{idxx}")
- product_title = ''
- price = ''
- shop_name = ''
- # 商品名称的xpath
- product_tittle_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- product_tittle_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(product_tittle_xpath).exists:
- product_title = self.d.xpath(product_tittle_xpath).text
- product_title = product_title[1:] if product_title.startswith('0') else product_title
- print(f"product_tittle_xpath列表当前商品名称:{product_title}")
- if '天士力' in self.search_key:
- temp_search_key = '养血清脑丸'
- if self.search_key == '天士力养血清脑丸2.5g*9袋':
- if '天士力' not in product_title or temp_search_key not in product_title:
- print(f"当前商品名称:{product_title} 不包含关键字:{temp_search_key}")
- continue
- elif '2.5g*9袋' not in product_title:
- print(f"当前商品名称:{product_title} 不包含2.5g*9袋品规")
- continue
- elif self.d.xpath(product_tittle_xpath2).exists:
- product_title = self.d.xpath(product_tittle_xpath2).text
- product_title = product_title[1:] if product_title.startswith('0') else product_title
- print(f"product_tittle_xpath2列表当前商品名称:{product_title}")
- if '天士力' in self.search_key:
- temp_search_key = '养血清脑丸'
- if self.search_key == '天士力养血清脑丸2.5g*9袋':
- if '天士力' not in product_title or temp_search_key not in product_title:
- print(f"当前商品名称:{product_title} 不包含关键字:{temp_search_key}")
- continue
- elif '2.5g*9袋' not in product_title:
- print(f"当前商品名称:{product_title} 不包含2.5g*9袋品规")
- continue
- else:
- print(f"列表当前商品名称不存在")
- # 价格
- price_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- price_xpath3 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- price_xpath1 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(price_xpath).exists:
- price_str = self.d.xpath(price_xpath).text
- print(f"price_xpath列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search(r'[\d\.]+', price_str).group())
- elif self.d.xpath(price_xpath3).exists:
- price_str = self.d.xpath(price_xpath3).text
- print(f"price_xpath3列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search(r'[\d\.]+', price_str).group())
- elif self.d.xpath(price_xpath1).exists:
- price_str = self.d.xpath(price_xpath1).text
- print(f"price_xpath1列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search(r'[\d\.]+', price_str).group())
- else:
- price_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(price_xpath2).exists:
- price_str = self.d.xpath(price_xpath2).text
- print(f"price_xpath2列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search(r'[\d\.]+', price_str).group())
- else:
- print(f"列表当前商品价格不存在")
- # price_str = self.d.xpath(f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]//*[starts-with(@text,"¥")]').text
- print(f'列表获取到价格:{price}')
- # 店铺名称的xpath
- shop_name_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]'
- shop_name_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]'
- if self.d.xpath(shop_name_xpath).exists:
- shop_name = self.d.xpath(shop_name_xpath).text
- print(f"shop_name_xpath列表当前商品店铺名称:{shop_name}")
- elif self.d.xpath(shop_name_xpath2).exists:
- shop_name = self.d.xpath(shop_name_xpath2).text
- print(f"shop_name_xpath2列表当前商品店铺名称:{shop_name}")
- else:
- print(f"列表当前商品店铺名称不存在")
- # 如果商品的名称、价格和生产厂家都不存在则直接下一条数据。 跳过一些不是商品的数据。
- if product_title == '' and price == '' and shop_name == '':
- continue
- scrape_date = self.get_current_date()
- if product_title and price and shop_name:
- # 判断数据表中是否存在
- dup_data = {'product': product_title, 'min_price': price, 'shop': shop_name,
- 'scrape_date': scrape_date, 'platform': '美团'}
- if self.data_is_exists(dup_data):
- print('列表存在相同数据不入库')
- continue
- self.safe_exec(drug_one.click)
- print('点击目标药品完毕')
- time.sleep(2)
- # 采集药品信息
- try:
- # self.integrate_data()
- self.safe_exec(self.integrate_data)
- # 检测下是否回退到列表页
- if self.distinct_target():
- print('回退到列表页', True)
- else:
- if self.d.xpath('//*[@text="搜索"]').exists:
- print("检测到搜索按钮,重新开始采集流程")
- if retry_count < MAX_RETRY:
- # 停止当前监控线程
- self.monitor.stop()
- self.monitor.join()
- # 递归重启采集
- return self.main(device_id, retry_count + 1)
- else:
- print("超过最大重试次数,终止程序")
- return
- else:
- print("无法恢复页面,终止采集")
- return
- # print('回退到列表页失败,终止采集')
- # return
- time.sleep(self.get_sleep_time())
- spider_no += 1
- except Exception as e:
- print(f'采集药品详情数据出错:{e}')
- # 增加阻塞的方法:
- if not self.distinct_target():
- for i in range(1):
- self.swipe_back(1)
- # 最外部有个定位按钮
- if self.distinct_target():
- break
- if i == 0 and not self.distinct_target():
- print('页面出错,退出采集')
- return
- else:
- continue
- if self.d.xpath('//*[@text="已经到底啦"]').exists:
- print('已经到达列表页最底部')
- return
- search_list = self.d.xpath('//android.support.v7.widget.RecyclerView').info
- bounds = search_list['bounds']
- # print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'])
- # 计算滑动距离
- scroll_distance = bounds['bottom'] - bounds['top'] # 正数
- start_y = 1600
- end_y = start_y - scroll_distance # 向上滑动,y 坐标减小
- # 确保 end_y 不小于 0
- end_y = max(end_y, 304) # 留出一点边距,避免滑出屏幕
- # print('滑动起点 y:', start_y, '终点 y:', end_y)
- # self.d.swipe(200, start_y, 200, end_y, 0.4)
- print('开始滑动')
- self.d.drag(300, 1400, 300, 400, 1)
- # self.safe_exec(self.d.drag, 300, 1400, 300, 400, 1)
- print('滑动结束')
- # print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'], 0.4)
- time.sleep(self.get_sleep_time())
- finally:
- # 确保监控线程被停止
- self.monitor.stop()
- self.monitor.join()
- def unitest(self):
- # # 创建目录
- # SCREENSHOT_DIR = Path('screenshot') # 注意这里的变化
- # SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
- # # 创建测试图片路径
- # qualification_number = '12345'
- # file_path = SCREENSHOT_DIR / f'{qualification_number}.png'
- # # 创建图片
- # img = Image.new('RGB', (300, 200), color='lightblue')
- # draw = ImageDraw.Draw(img)
- # # 添加文字
- # try:
- # font = ImageFont.truetype("arial.ttf", 20)
- # except:
- # font = ImageFont.load_default()
- # draw.text((50, 80), f"测试图片: {qualification_number}", fill='black', font=font)
- # draw.text((50, 110), "保存路径测试", fill='darkred', font=font)
- # draw.text((50, 140), f"目录: {SCREENSHOT_DIR}", fill='green', font=font)
- # # 保存图片
- # img.save(file_path)
- # print(f"✅ 图片已保存到: {file_path.resolve()}")
- # print(f"📁 文件大小: {file_path.stat().st_size} 字节")
- # print(f"📏 图片尺寸: {img.size}")
- # SCREENSHOT_DIR = Path('drug_data/mt/screenshot')
- # # 使用
- # qualification_number = '12345'
- # file_path = SCREENSHOT_DIR / f'{qualification_number}.png'
- # print(f"file_path={file_path}")
- # print("=== 路径信息 ===")
- # print(f"当前工作目录: {Path.cwd()}")
- # print(f"SCREENSHOT_DIR: {SCREENSHOT_DIR}")
- # print(f"file_path: {file_path}")
- # print(f"绝对路径: {file_path.resolve()}")
- # print(f"父目录: {file_path.parent}")
- # print(f"文件名: {file_path.name}")
- # print(f"是否存在: {file_path.exists()}")
- time.sleep(100000)
- """
- 单元测试
- :return:
- """
- save_data = {
- 'product': "[昆中药]舒肝颗粒(低糖型)",
- 'min_price': 14.0,
- 'manufacture_date': '',
- 'expiry_date': '36个月',
- 'shop': '美团自营大药房(快递电商)',
- 'business_license_company': '',
- 'province': '',
- 'city': '',
- 'manufacturer': '昆明中药厂有限公司',
- 'specification': '3g*16袋/盒',
- 'approval_number': '国药准字Z53021161',
- 'product_link': '',
- 'scrape_date': '2025/07/09',
- 'scrape_province': '广东',
- 'availability': '',
- 'credit_code': '',
- 'platform': '美团',
- 'search_key': '',
- 'sales': '',
- 'inventory': '',
- 'snapshot_url': ''
- }
- self.save_to_database(save_data)
- time.sleep(100000)
- pass
- def main():
- keys_list = [
- '天士力养血清脑丸2.5g*9袋',
- ]
- device_id = '95b2c764'
- # device_id = 'U8ONIJJJS4CELVD6'
- # device_id = 'LJ9PN7A6K7HQ9DPF'
- cycle_no = 0 # 轮次计数
- while True:
- cycle_no += 1
- logging.info(f'========== 第 {cycle_no} 轮采集开始 ==========')
- for idx, key in enumerate(keys_list, 1):
- logging.info(f'[{idx}/{len(keys_list)}] 开始采集关键字:{key}')
- try:
- mt = MT(key) # 用当前关键字实例化
- # mt.unitest()
- mt.main(device_id) # 执行一次完整采集
- logging.info(f'关键字 {key} 本轮采集完成')
- except Exception as e:
- # 发生异常直接跳过该关键字,继续下一轮
- logging.exception(f'关键字 {key} 采集异常:{e}')
- finally:
- # 关闭当前 MT 实例资源(如有需要)
- if hasattr(mt, 'close'):
- mt.close()
- # logging.info('本轮全部关键字采集完成,等待 2 小时后下一轮...')
- # time.sleep(1 * 3600) # 2 小时 = 7200 秒
- # keys = '小柴胡颗粒' # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒 小柴胡颗粒
- # mt = MT(keys) # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒
- # # mt.main('95b2c764')
- # mt.main('fcb3c749')
- if __name__ == '__main__':
- main()
- # scheduler = BlockingScheduler()
- # scheduler.add_job(main, 'cron', hour=21, minute=30, misfire_grace_time=120)
- # try:
- # scheduler.start()
- # except (KeyboardInterrupt, SystemExit):
- # pass
|