| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814 |
- import requests
- import base64
- import cv2
- import uiautomator2 as u2
- import time
- import subprocess
- import re
- import random
- import datetime
- import json
- from aip import AipOcr
- from apscheduler.schedulers.blocking import BlockingScheduler
- # from db_mysql import mysqlClient
- import threading
- from collections import deque
- import numpy as np
- import secrets
- import os
- import math
- import schedule
- # import pyperclip
- from config import Config
- from logger import setup_logger
- import logging
- from contextlib import contextmanager
- from typing import Dict, Any
- # from database import MySQLClient
- # 配置日志
- # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- setup_logger("mt_spider") # 初始化日志
- class SpiderMonitor(threading.Thread):
- """全局弹窗监控线程(增强版)"""
- def __init__(self, spider_instance):
- super().__init__(daemon=True)
- self.spider = spider_instance
- self.running = True
- self.pausing = threading.Event() # 主线程同步事件
- self.last_verification_time = 0
- self.verification_count = 0
- self.MAX_VERIFICATION_RETRY = 10
- self.recent_clicks = deque(maxlen=10) # 防重复点击
- self.logger = logging.getLogger("SpiderMonitor")
- self.TOKEN = "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk"
- self.API_URL = "http://api.jfbym.com/api/YmServer/customApi"
- self.d = self.spider.d
- self.verification_in_progress = threading.Event()
- self.loggerMT = logging.getLogger()
- self.verification_retry_count = 0 # 当前验证码重试次数
- self.last_verification_type = None
- # 可配置化弹窗规则
- self.popup_rules = {
- "simple": [
- ('//*[@text="确定"]', "点击确定"),
- ('//*[@text="允许"]', "点击允许"),
- ('//*[@text="关闭"]', "点击关闭"),
- ('//*[@resource-id="com.sankuai.meituan:id/close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/address_center_location_close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/location_close"]', "关闭按钮"),
- ('//*[@resource-id="com.sankuai.meituan:id/btn_close"]', "关闭按钮"),
- ],
- # "verification": [
- # '//*[contains(@text, "验证")]',
- # '//*[contains(@text, "滑块")]',
- # '//*[contains(@text, "依次点击")]',
- # '//*[contains(@text, "请点击")]',
- # '//*[contains(@text, "拖动滑块刚")]', #这个需要拖动滑块至最右边,然后再截图
- # '//*[contains(@text, "请输入图片中的内容")]',
- # '//*[contains(@text, "用最短线连接")]',
- # '//*[contains(@text, "请按语序依次点击")]',
- # '//*[contains(@text, "请向右滑动滑块")]',
- # '//*[contains(@text, "请拖动下方滑块完成拼图")]',
- # '//*[contains(@resource-id, "captcha")]'
- # ]
- "verification": [
- ('//*[contains(@text, "请点击")]', "click_side"),
- ('//*[contains(@text, "请输入图片中的内容")]', "Numbers_English"),
- ('//*[contains(@text, "请向右滑动滑块")]', "Swipe_right"),
- ('//*[contains(@text, "请依次点击下图图标")]', "Click_images"),
- ('//*[contains(@text, "请拖动下方滑块完成拼图")]', "slider"),
- ('//*[contains(@text, "拖动滑块刚")]', "complexs"), # 这个需要拖动滑块至最右边,然后再截图
- ('//*[contains(@text, "请按语序依次点击")]', "Click_images"),
- ('//*[contains(@text, "用最短线连接")]', "Shortest_connection"),
- ]
- }
- def run(self):
- while self.running:
- try:
- handled = self.check_and_handle_popup()
- time.sleep(2 if handled else 1)
- except Exception as e:
- self.logger.exception("监控线程异常: %s", e)
- time.sleep(3)
- def _is_recent_click(self, xpath):
- """防止重复点击同一个弹窗"""
- key = f"{xpath}_{int(time.time())}"
- if key in self.recent_clicks:
- return True
- self.recent_clicks.append(key)
- return False
- @staticmethod
- def get_sleep_time():
- # return random.randint(5, 8)
- return random.randint(1, 3)
- def human_slide(self, start_x, start_y, end_x, end_y, hold_time=0):
- """模拟真实人类滑动轨迹 - 连续变化的速度曲线,微小偏差"""
- points = []
- # 随机参数
- total_steps = random.randint(60, 85) # 更多步数使曲线更平滑
- # 计算滑动距离
- distance_x = end_x - start_x
- distance_y = end_y - start_y
- total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2)
- self.logger.info(f"滑块验证移动0")
- # 微小偏差设置 - 人类不完美的对齐
- # X方向偏差:1-6像素,70%概率过冲,30%欠冲
- if random.random() < 0.7:
- offset_x = random.randint(1, min(5, int(total_distance * 0.01)))
- else:
- offset_x = -random.randint(1, min(3, int(total_distance * 0.02)))
- # # Y方向微小偏差:±0-2像素
- # offset_y = random.randint(-2, 2)
- # 实际停止位置
- stop_x = end_x + offset_x
- stop_y = end_y
- # 物理参数:模拟手指滑动的物理过程
- # 使用加速度、最大速度、减速度模型
- accel_time_ratio = random.uniform(0.25, 0.35) # 加速阶段占总时间的比例
- decel_time_ratio = random.uniform(0.25, 0.35) # 减速阶段占总时间的比例
- max_speed = random.uniform(1.5, 2.2) # 最大速度倍数
- # 生成轨迹
- for i in range(total_steps):
- t = i / (total_steps - 1) # 时间进度 0-1
- # 物理速度曲线:连续变化的加速度过程
- if t < accel_time_ratio:
- # 加速阶段:从0加速到最大速度
- phase_t = t / accel_time_ratio
- # 使用平滑的加速曲线(二次函数)
- speed_factor = max_speed * phase_t * phase_t
- elif t < 1 - decel_time_ratio:
- # 匀速阶段:保持最大速度
- speed_factor = max_speed
- # 加入轻微的随机波动,模拟人类手部自然抖动
- speed_factor += random.uniform(-0.05, 0.05)
- else:
- # 减速阶段:从最大速度减速到0
- phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio
- # 使用平滑的减速曲线(二次函数,末尾更平缓)
- speed_factor = max_speed * (1 - phase_t * phase_t)
- self.logger.info(f"滑块验证移动1")
- # 计算位移(积分速度得到位置)
- # 使用贝塞尔曲线计算位置,让运动更自然
- if t < accel_time_ratio:
- # 加速阶段的位置
- phase_t = t / accel_time_ratio
- progress = (max_speed / 3) * phase_t * phase_t * phase_t
- elif t < 1 - decel_time_ratio:
- # 匀速阶段的位置
- phase_t = (t - accel_time_ratio) / (1 - accel_time_ratio - decel_time_ratio)
- # 匀速阶段的位移加上加速阶段完成的位移
- accel_distance = (max_speed / 3) # 加速阶段完成的位移
- progress = accel_distance + (1 - 2 * accel_distance) * phase_t
- else:
- # 减速阶段的位置
- phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio
- # 从减速起点平滑过渡到终点
- progress = 1 - (max_speed / 3) * (1 - phase_t) * (1 - phase_t) * (1 - phase_t)
- # 限制进度在0-1之间
- progress = max(0, min(1, progress))
- # 添加自然的手部抖动
- if t < 0.1 or t > 0.9:
- # 开始和结束:非常小的抖动
- jitter_x = random.randint(-1, 1)
- jitter_y = random.randint(-1, 1)
- elif t < 0.3 or t > 0.7:
- # 过渡阶段:小抖动
- jitter_x = random.randint(-2, 2)
- jitter_y = random.randint(-2, 2)
- else:
- # 中间快速阶段:稍大抖动
- jitter_x = random.randint(-2, 2) if random.random() < 0.3 else 0
- jitter_y = random.randint(-2, 2) if random.random() < 0.3 else 0
- # 计算当前位置
- current_x = start_x + (stop_x - start_x) * progress + jitter_x
- current_y = start_y + (stop_y - start_y) * progress + jitter_y
- self.logger.info(f"滑块验证移动2")
- # 确保轨迹单调性(不会回退)
- if points:
- if distance_x > 0: # 向右滑动
- current_x = max(points[-1][0], current_x)
- elif distance_x < 0: # 向左滑动
- current_x = min(points[-1][0], current_x)
- # 时间延迟 - 基于当前速度计算
- # 速度越快,延迟越短
- if t < 0.1: # 开始阶段
- delay = random.uniform(0.002, 0.008)
- elif t < 0.9: # 中间阶段
- # 延迟与速度成反比
- base_delay = 0.008
- speed_delay_factor = 1.0 / (speed_factor + 0.5)
- delay = base_delay * speed_delay_factor + random.uniform(-0.002, 0.002)
- delay = max(0.005, min(delay, 0.015))
- else: # 结束阶段
- # 逐渐增加延迟
- slow_factor = 1.0 + (t - 0.9) * 10
- delay = random.uniform(0.015, 0.025) * slow_factor
- points.append((current_x, current_y, delay))
- self.logger.info(f"滑块验证移动3")
- # 确保最后一点是实际停止位置
- if points:
- points[-1] = (stop_x, stop_y, 0)
- # 执行滑动
- if points:
- # 按下起点
- self.d.touch.down(points[0][0], points[0][1])
- time.sleep(random.uniform(0.002, 0.006))
- # 移动轨迹
- for i, point in enumerate(points[1:]):
- self.d.touch.move(point[0], point[1])
- self.logger.info(f"滑块验证移动{point[0]},{point[1]}")
- # 最后阶段可能的微小停顿(人类犹豫)
- # progress = (i + 1) / len(points[1:])
- # if progress > 0.98:
- # time.sleep(random.uniform(0.001, 0.003))
- time.sleep(point[2])
- # 抬起手指
- self.d.touch.up(points[-1][0], points[-1][1])
- # 滑动后的随机延迟
- hold_time = random.uniform(1, 2)
- time.sleep(hold_time)
- return points
- # 数英
- def Numbers_English_verify(self):
- time.sleep(5)
- rotate_image_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.Image[1]'
- if not self.d.xpath(rotate_image_xpath).exists:
- print("数英图片元素不存在")
- rotate_img_path = "Numbers_English.png"
- try:
- rotate_image = self.d.xpath(rotate_image_xpath)
- rotate_image.screenshot().save(rotate_img_path)
- print(f"数英图片截图保存成功: {rotate_img_path}")
- except Exception as e:
- print(f"数英图片截图失败: {e}")
- try:
- with open(rotate_img_path, 'rb') as f:
- image_data = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- "type": 10103,
- "image": image_data
- }
- headers = {
- "Content-Type": "application/json"
- }
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status()
- result = response.json()
- if result.get("code") == 10000 and result.get("data", {}).get("code") == 0:
- Numbers_English_verify_data = result["data"]
- print(f"API返回: {Numbers_English_verify_data}")
- captcha_text = Numbers_English_verify_data.get("data")
- print(f"验证码: {captcha_text}")
- if self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]| //*[@resource-id="com.sankuai.meituan:id/titans_webview_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]').exists:
- self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]| //*[@resource-id="com.sankuai.meituan:id/titans_webview_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.EditText[1]').click()
- time.sleep(1)
- self.d.send_keys(captcha_text)
- time.sleep(5)
- self.d.xpath(
- '//*[@text="验证"] | //*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.widget.Button[1]').click()
- time.sleep(3)
- return True
- else:
- print("API返回错误")
- return False
- except Exception as e:
- print(f"数英验证码处理异常: {e}")
- return False
- # 滑块
- def slider_verify(self):
- time.sleep(5)
- try:
- slider_slot_xpath = '//*[@resource-id="puzzleSliderDrag"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[1]'
- slider_main_xpath = '//*[@resource-id="puzzleImageMain"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]'
- slider_slot_img_path = "slider_slot.png"
- slider_main_img_path = "slider_main.png"
- if self.d.xpath(slider_slot_xpath).exists:
- self.d.xpath(slider_slot_xpath).screenshot().save("slider_slot.png")
- else:
- print("slider_slot_xpath not exist")
- self.logger.info(f"slider_slot_xpath not exist")
- if self.d.xpath(slider_main_xpath).exists:
- self.d.xpath(slider_main_xpath).screenshot().save("slider_main.png")
- else:
- print("slider_main_xpath not exist")
- self.logger.info(f"slider_main_xpath not exist")
- slider_slide_distance = 0
- try:
- with open('slider_main.png', 'rb') as f:
- b = base64.b64encode(f.read()).decode()
- # API请求配置
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- "type": 22222,
- "image": b
- }
- headers = {
- "Content-Type": "application/json"
- }
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status()
- result = response.json()
- if result.get("code") == 10000 and result.get("data", {}).get("code") == 0:
- slider_verify_data = result["data"]
- print(f"slider_verify_data={slider_verify_data}")
- slider_slide_distance = slider_verify_data["data"]
- print(slider_slide_distance)
- else:
- print("api 返回错误 此时滑块验证可能呈图片形式存在")
- except Exception as e:
- return {
- "success": False,
- "error_msg": f"处理异常: {str(e)}"
- }
- slider_slide_distance = float(slider_verify_data["data"])
- # 获取滑块元素
- try:
- slider_xpath = '//*[@resource-id="puzzleSliderBox"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.view.View[1]'
- slider = self.d.xpath(slider_xpath)
- slider_info = slider.info
- bounds = slider_info['bounds']
- start_x = ((bounds['left'] + bounds['right']) / 2) + random.uniform(-4, 4)
- start_y = ((bounds['top'] + bounds['bottom']) / 2) + random.uniform(-3, 3) # ●
- end_x = start_x + slider_slide_distance + random.uniform(-3, 3) # ●
- end_y = start_y + random.uniform(-1, 1)
- # self.swipe(start_x, start_y, end_x, end_y,
- # duration=random.uniform(1.2, 2.0),
- # deviation=random.randint(20, 40))
- self.human_slide(start_x, start_y, end_x, end_y)
- time.sleep(2) #
- return True
- except Exception as e:
- print(f"滑动操作时出错: {e}")
- return False
- time.sleep(2)
- # 检查验证码是否消失
- slider_xpaths = [
- '//*[@text="请拖动下方滑块完成拼图"]',
- ]
- for xpath in slider_xpaths:
- if self.d.xpath(xpath).exists:
- return False
- return True
- except Exception as e:
- self.logger.error(f"滑块验证失败: {e}")
- return False
- # 点击
- def Click_images(self):
- time.sleep(5)
- try:
- # 1. 定位图标元素
- Click_images_xpath = '//*[@resource-id="com.sankuai.meituan:id/titans_main_layout"] | //*[@resource-id="com.sankuai.meituan:id/h5_container"] | //*[@resource-id="root"]'
- # 检查元素是否存在
- if not self.d.xpath(Click_images_xpath).exists:
- print("图标元素不存在")
- self.logger.info("图标元素不存在")
- return False
- # 获取图片元素在屏幕中的坐标
- Click_image_element = self.d.xpath(Click_images_xpath)
- Click_image_element_info = Click_image_element.info
- bounds = Click_image_element_info['bounds']
- # 计算图片左上角在屏幕中的坐标
- image_left = bounds['left']
- image_top = bounds['top']
- image_width = bounds['right'] - bounds['left']
- image_height = bounds['bottom'] - bounds['top']
- print(f"图片位置: left={image_left}, top={image_top}, width={image_width}, height={image_height}")
- # 2. 截图
- image_img_path = "Click_images.png"
- if self.d.xpath(Click_images_xpath).exists:
- self.d.xpath(Click_images_xpath).screenshot().save("Click_images.png")
- else:
- print("图标元素不存在,无法截图")
- self.logger.info("图标元素不存在,无法截图")
- return False
- try:
- with open('Click_images.png', 'rb') as f:
- c = base64.b64encode(f.read()).decode()
- # API请求配置
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", # 注册后登录去用户中心获取token
- "type": 88888, # 打码类型ID
- "image": c # 图片base64字符串
- }
- headers = {
- "Content-Type": "application/json"
- }
- # 发送请求
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status() # 检查HTTP请求是否成功
- result = response.json()
- print(f"API返回结果: {result}")
- if result.get("code") == 10000 and result.get("data", {}).get("code") == 0:
- verify_data = result.get("data", {})
- print(f"verify_data={verify_data}")
- # 获取坐标数据字符串,格式为:"188,165|99,128|91,209|235,116"
- coords_str = verify_data.get("data", "")
- if not coords_str:
- print("未返回坐标数据")
- return False
- print(f"坐标字符串: {coords_str}")
- # 分割坐标字符串
- coords_list = coords_str.split('|')
- print(coords_list)
- # 依次点击每个坐标
- for coord in coords_list:
- try:
- # 分割字符串并转换为整数(这是图片内的相对坐标)
- x_img_str, y_img_str = coord.split(',')
- x_img = int(x_img_str.strip())
- y_img = int(y_img_str.strip())
- print(f"图片相对坐标: x={x_img}, y={y_img}")
- # 转换为屏幕绝对坐标
- x_screen = image_left + x_img # ●
- y_screen = image_top + y_img
- print(f"屏幕绝对坐标: x={x_screen}, y={y_screen}")
- # 点击转换后的屏幕坐标
- self.d.click(x_screen, y_screen)
- time.sleep(self.get_sleep_time())
- except Exception as e:
- print(f"处理坐标 {coord} 失败: {e}")
- continue
- time.sleep(self.get_sleep_time() * 2) # 给系统一些响应时间
- return True
- else:
- error_msg = result.get("msg", "未知错误")
- print(f"识别失败: {error_msg}")
- return False
- except requests.exceptions.RequestException as e:
- print(f"API请求失败: {e}")
- return False
- except Exception as e:
- print(f"识别过程出错: {e}")
- return False
- except Exception as e:
- self.logger.error(f"点击图标失败: {e}")
- return False
- # # 检查验证是否成功
- # if not self.d.xpath('//*[@text="请依次点击下图图标"] | //*[@text="请按语序依次点击"]').exists:
- # print("所有坐标点击完成,验证成功")
- # return True
- # else:
- # print("所有坐标点击完成,但验证文本仍然存在,可能验证失败")
- # return False
- # def Click_images(self):
- # try:
- # image_xpath = '//*[@resource-id="com.sankuai.meituan:id/titans_main_layout"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]'
- # image_element = self.d.xpath(image_xpath)
- # image_element_info = image_element.info
- # bounds = image_element_info['bounds']
- # image_left = bounds['left']
- # image_top = bounds['top']
- # image_width = bounds['right'] - bounds['left']
- # image_height = bounds['bottom'] - bounds['top']
- # print(f"图片位置: left={image_left}, top={image_top}, width={image_width}, height={image_height}")
- # #
- # image_img_path = "image.png"
- # if self.d.xpath(image_xpath).exists:
- # self.d.xpath(image_xpath).screenshot().save("image.png")
- # else:
- # print("image_xpath not exist")
- # try:
- # with open('image.png', 'rb') as f:
- # c = base64.b64encode(f.read()).decode()
- # url = "http://api.jfbym.com/api/YmServer/customApi"
- # data = {
- # "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- # "type": 50009,
- # "image": c
- # }
- # headers = {
- # "Content-Type": "application/json"
- # }
- # # 发送请求
- # response = requests.post(url, headers=headers, json=data, timeout=30)
- # response.raise_for_status()
- # result = response.json()
- # if result.get("code") == 10000 and result.get("data", {}).get("code") == 0:
- # side_data = result["data"]
- # print(f"side_data={side_data}")
- # big_click_xpath = side_data["data"]
- # else:
- # print("api 返回错误")
- # return False
- # except Exception as e:
- # return {
- # "success": False,
- # "error_msg": f"处理异常: {str(e)}"
- # }
- # coords_str = side_data["data"]
- # if coords_str:
- # x_img_str, y_img_str = coords_str.split(',')
- # x_img = int(x_img_str.strip())
- # y_img = int(y_img_str.strip())
- # x_screen = image_left + x_img
- # y_screen = image_top + y_img
- # self.d.click(x_screen, y_screen)
- # time.sleep(self.get_sleep_time())
- # return True
- # except Exception as e:
- # self.logger.error(f"依次点击验证失败: {e}")
- # return False
- # 向右滑动
- def Swipe_right(self):
- time.sleep(5)
- start_x = 0
- start_y = 0
- end_x = 0
- end_y = 0
- distance = 0
- if self.d.xpath(
- '//*[@resource-id="yodaBoxWrapper"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]').exists:
- Swipe = self.d.xpath(
- '//*[@resource-id="yodaBoxWrapper"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]')
- Swipe_info = Swipe.info
- bound = Swipe_info['bounds']
- Swipe_distance = bound['right'] - bound['left']
- if self.d.xpath(
- '//*[@resource-id="yodaBox"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1] ').exists:
- xpath = self.d.xpath(
- '//*[@resource-id="yodaBox"] | //*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]/android.view.View[1]')
- xpath_info = xpath.info
- bounds = xpath_info['bounds']
- start_x = (bounds['left'] + bounds['right']) // 2
- start_y = (bounds['top'] + bounds['bottom']) // 2
- distance = Swipe_distance - (bounds['right'] - bounds['left'])
- end_x = start_x + distance
- end_y = start_y
- print(f"滑动距离: {distance}像素")
- print(f"起点: ({start_x}, {start_y}), 终点: ({end_x}, {end_y})")
- # 确保滑块到达最右端
- end_x += 10
- self.Swipe_right_human_slide(start_x, start_y, end_x, end_y)
- time.sleep(2)
- return True
- else:
- print("未找到滑块元素")
- return False
- else:
- print("未找到轨道元素")
- return False
- def Swipe_right_human_slide(self, start_x, start_y, end_x, end_y):
- """模拟人类滑动轨迹"""
- # 生成带加速度的轨迹
- points = []
- total_steps = 50
- distance_x = end_x - start_x
- distance_y = end_y - start_y
- previous_x = start_x # 用于记录上一个 x 坐标值
- for i in range(total_steps):
- # 非线性进度(慢-快-慢)
- ratio = (i / total_steps)
- if ratio < 0.3:
- progress = 0.5 * (ratio / 0.3) ** 2
- elif ratio < 0.7:
- progress = 0.5 + (ratio - 0.3) * 1.25
- else:
- progress = 0.9 + 0.5 * ((ratio - 0.7) / 0.3) ** 0.5
- # 添加随机抖动
- # offset_x = np.random.randint(-2, 3)
- # offset_y = np.random.randint(-2, 3)
- offset_x = np.random.randint(-1, 1) # 控制抖动范围
- offset_y = np.random.randint(-1, 1)
- x = start_x + distance_x * min(progress, 0.99) + offset_x
- y = start_y + distance_y * min(progress, 0.99) + offset_y
- # 确保 x 坐标单调递增
- if x < previous_x and x < end_x:
- x = previous_x + 1
- if x > end_x:
- x = end_x
- previous_x = x
- points.append((x, y))
- # 变速延迟(移动越快延迟越短)
- delay = 0.002 + 0.01 * (1 - abs(0.5 - ratio))
- time.sleep(delay)
- print(f"points: {points}")
- self.loggerMT.info(f"points: {points}")
- # 执行滑动轨迹
- self.d.touch.down(points[0][0], points[0][1])
- for point in points[1:]:
- self.d.touch.move(point[0], point[1])
- self.d.touch.up(points[-1][0] + 2, points[-1][1])
- # print(f"points: {points}")
- # self.loggerPdd.info(f"points: {points}")
- # self.d.swipe_points(points, duration=0.05)
- # 拖动滑块刚
- def complexs(self):
- time.sleep(5)
- try:
- slider_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.view.View[1]'
- track_xpath = '//*[@text="请按照说明拖动滑块"]'
- if not self.d.xpath(slider_xpath).exists:
- print("滑块元素不存在")
- self.logger.info("滑块元素不存在")
- return False
- if not self.d.xpath(track_xpath).exists:
- print("滑轨元素不存在")
- self.logger.info("滑轨元素不存在")
- return False
- #
- slider_element = self.d.xpath(slider_xpath)
- slider_info = slider_element.info
- slider_bounds = slider_info['bounds']
- slider_left = slider_bounds['left']
- slider_top = slider_bounds['top']
- slider_right = slider_bounds['right']
- slider_bottom = slider_bounds['bottom']
- slider_width = slider_right - slider_left
- slider_height = slider_bottom - slider_top
- slider_center_x = slider_left + slider_width / 2
- slider_center_y = slider_top + slider_height / 2
- print(f"滑块中心: ({slider_center_x}, {slider_center_y})")
- #
- track_element = self.d.xpath(track_xpath)
- track_info = track_element.info
- track_bounds = track_info['bounds']
- track_left = track_bounds['left']
- track_right = track_bounds['right']
- track_width = track_right - track_left
- # 2. 滑到滑轨最右端
- right_end_center_x = track_right - slider_width / 2
- right_end_center_y = slider_center_y
- print(f"最右端滑块中心坐标: ({right_end_center_x}, {right_end_center_y})")
- try:
- self.d.touch.down(slider_center_x, slider_center_y)
- time.sleep(0.1)
- # 生成滑动轨迹
- points = self.Swipe_trajectory(slider_center_x, slider_center_y, right_end_center_x,
- right_end_center_y)
- for point in points[1:]:
- self.d.touch.move(point[0], point[1])
- time.sleep(0.002)
- print("滑块已到达最右端")
- except Exception as e:
- print(f"滑动到最右端失败: {e}")
- return
- capture_xpath1 = '// *[ @ text = "身份核实"] / android.view.View[1] / android.view.View[1] / android.widget.TextView[1]'
- capture_xpath2 = '// * [ @ text = "身份核实"] / android.view.View[1] / android.view.View[1] / android.view.View[1]'
- capture_element1 = self.d.xpath(capture_xpath1)
- capture_element2 = self.d.xpath(capture_xpath2)
- capture_info1 = capture_element1.info
- capture_info2 = capture_element2.info
- capture_info1_bounds = capture_info1['bounds']
- capture_info2_bounds = capture_info2['bounds']
- capture_label_left = capture_info1_bounds['left']
- capture_label_top = capture_info1_bounds['top']
- capture_label_right = capture_info1_bounds['right']
- capture_label_bottom = capture_info1_bounds['bottom']
- capture_left = capture_info2_bounds['left']
- capture_top = capture_info2_bounds['top']
- capture_right = capture_info2_bounds['right']
- capture_bottom = capture_info2_bounds['bottom']
- capture_label_width = capture_label_right - capture_label_left
- capture_label_height = capture_label_bottom - capture_label_top
- capture_width = capture_right - capture_left
- capture_height = capture_bottom - capture_top
- print(
- f"截图区域1(提示文本): left={capture_label_left}, top={capture_label_top}, width={capture_label_width}, height={capture_label_height}")
- print(
- f"截图区域2(图片): left={capture_left}, top={capture_top}, width={capture_width}, height={capture_height}")
- # 截图并保存--2张图
- screenshot_label_path = "capture_label_area.png"
- screenshot_image_path = "capture_area.png"
- try:
- full_screenshot = self.d.screenshot()
- from PIL import Image
- import io
- if isinstance(full_screenshot, bytes):
- img = Image.open(io.BytesIO(full_screenshot))
- else:
- img = full_screenshot
- #
- # 裁剪指定区域1
- cropped_img_1 = img.crop(
- (capture_label_left, capture_label_top, capture_label_right, capture_label_bottom))
- cropped_img_1.save(screenshot_label_path)
- print(f"截图2已保存: {screenshot_label_path}")
- # 裁剪指定区域2
- cropped_img_2 = img.crop((capture_left, capture_top, capture_right, capture_bottom))
- cropped_img_2.save(screenshot_image_path)
- print(f"截图2已保存: {screenshot_image_path}")
- except Exception as e:
- print(f"截图失败: {e}")
- self.d.touch.up(right_end_center_x, right_end_center_y)
- return
- try:
- with open(screenshot_label_path, 'rb') as f:
- label_image_data = base64.b64encode(f.read()).decode()
- with open(screenshot_image_path, 'rb') as f:
- image_data = base64.b64encode(f.read()).decode()
- # API请求配置
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- "type": 29013,
- "image": image_data,
- "label_image": label_image_data
- }
- headers = {
- "Content-Type": "application/json"
- }
- # 发送请求
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status()
- result = response.json()
- print(f"API返回结果: {result}")
- if result.get("code") == 10000 and result.get("data", {}).get("code") == 0:
- verify_data = result.get("data", {})
- print(f"verify_data={verify_data}")
- data_str = verify_data.get("data", "")
- if not data_str:
- print("云码未返回有效的data值")
- # 抬起并返回
- self.d.touch.up(right_end_center_x, right_end_center_y)
- return
- try:
- data_value = int(data_str)
- print(f"云码返回的像素距离: {data_value}")
- gray_line_target_x = 108 + data_value - 44
- slider_target_center_x = gray_line_target_x
- print(f"滑块中心目标X坐标: {slider_target_center_x}")
- # 5. 计算需要往回拖动的距离
- # 当前滑块在最右端,中心X = 612
- current_slider_center_x = 612
- back_distance = 504 - data_value - 44
- print(f"需要往回拖动的距离: {back_distance}")
- # 执行往回拖动
- slider_element = self.d.xpath(slider_xpath)
- slider_info = slider_element.info
- slider_bounds = slider_info['bounds']
- current_slider_center_x = (slider_bounds['left'] + slider_bounds['right']) / 2
- # 确保目标位置在滑轨范围内
- min_x = track_left + slider_width / 2
- max_x = track_right - slider_width / 2
- slider_target_center_x = max(min_x, min(slider_target_center_x, max_x))
- # 计算实际需要滑动的距离
- actual_distance = slider_target_center_x - current_slider_center_x
- print(f"实际需要滑动的距离: {actual_distance}")
- # 往回拖动
- try:
- # 往回拖动的轨迹
- back_points = self.Swipe_trajectory(
- right_end_center_x, right_end_center_y,
- slider_target_center_x, right_end_center_y
- )
- # 移动到每个点
- for point in back_points[1:]:
- self.d.touch.move(point[0], point[1])
- time.sleep(0.002)
- time.sleep(3)
- self.d.touch.up(slider_target_center_x, right_end_center_y)
- except Exception as e:
- print(f"往回拖动失败: {e}")
- self.d.touch.up(right_end_center_x, right_end_center_y)
- except ValueError as e:
- print(f"解析云码返回的data值失败: {e}")
- return False
- except Exception as e:
- print(f"计算滑块位置失败: {e}")
- return False
- else:
- error_msg = result.get("msg", "未知错误")
- print(f"识别失败: {error_msg}")
- return False
- except requests.exceptions.RequestException as e:
- print(f"API请求失败: {e}")
- return False
- except Exception as e:
- print(f"识别过程出错: {e}")
- return False
- except Exception as e:
- self.logger.error(f"滑块验证失败: {e}")
- return False
- def Swipe_trajectory(self, start_x, start_y, end_x, end_y):
- """生成滑动轨迹点"""
- points = []
- total_steps = 50
- distance_x = end_x - start_x
- distance_y = end_y - start_y
- previous_x = start_x
- for i in range(total_steps):
- # 非线性进度(慢-快-慢)
- ratio = (i / total_steps)
- if ratio < 0.3:
- progress = 0.5 * (ratio / 0.3) ** 2
- elif ratio < 0.7:
- progress = 0.5 + (ratio - 0.3) * 1.25
- else:
- progress = 0.9 + 0.5 * ((ratio - 0.7) / 0.3) ** 0.5
- # 添加随机抖动
- offset_x = np.random.randint(-1, 1)
- offset_y = np.random.randint(-1, 1)
- x = start_x + distance_x * min(progress, 0.99) + offset_x
- y = start_y + distance_y * min(progress, 0.99) + offset_y
- # 确保 x 坐标单调递增
- if x < previous_x and x < end_x:
- x = previous_x + 1
- if x > end_x:
- x = end_x
- previous_x = x
- points.append((x, y))
- return points
- # 最短线连接
- def Shortest_connection(self):
- time.sleep(5)
- try:
- art_text_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[1]'
- color_points_xpath = '//*[@text="身份核实"]/android.view.View[1]/android.view.View[1]/android.view.View[2]/android.view.View[1]/android.widget.Image[1]'
- art_text_img_path = "art_text.png"
- color_points_img_path = "color_points.png"
- if self.d.xpath(art_text_xpath).exists:
- self.d.xpath(art_text_xpath).screenshot().save(art_text_img_path)
- self.logger.info("艺术字截图成功")
- else:
- self.logger.warning("艺术字元素不存在")
- return False
- if self.d.xpath(color_points_xpath).exists:
- self.d.xpath(color_points_xpath).screenshot().save(color_points_img_path)
- self.logger.info("颜色元素截图成功")
- else:
- self.logger.warning("颜色点元素不存在")
- return False
- color_name = ""
- try:
- with open('art_text.png', 'rb') as f:
- c = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- "type": 10118,
- "image": c
- }
- headers = {
- "Content-Type": "application/json"
- }
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status()
- result = response.json()
- self.logger.info(f"云码API返回结果: {result}")
- if result.get("code") == 0:
- color_name = result.get("data", "")
- if "鼗" in color_name:
- color_name == "紫色"
- elif result.get("code") == 10000 and "data" in result:
- inner_data = result.get("data", {})
- if isinstance(inner_data, dict) and inner_data.get("code") == 0:
- color_name = inner_data.get("data", "")
- if "鼗" in color_name:
- color_name == "紫色"
- elif isinstance(inner_data, str):
- color_name = inner_data
- if "鼗" in color_name:
- color_name == "紫色"
- else:
- self.logger.error(f"云码API返回异常: {result}")
- return False
- if not color_name:
- self.logger.error("未识别到颜色名称")
- return False
- self.logger.info(f"识别到的颜色名称: {color_name}")
- except Exception as e:
- self.logger.error(f"云码API调用异常: {e}")
- return False
- # 颜色的坐标
- relative_coordinates = self.find_color_coordinates(color_points_img_path, color_name)
- if not relative_coordinates:
- self.logger.warning(f"未在图片中找到 {color_name} 颜色的坐标")
- return False
- color_element = self.d.xpath(color_points_xpath).info
- element_bounds = color_element['bounds']
- element_left = element_bounds['left']
- element_top = element_bounds['top']
- element_width = element_bounds['right'] - element_bounds['left']
- element_height = element_bounds['bottom'] - element_bounds['top']
- # 读取截图
- try:
- color_points_img = cv2.imread(color_points_img_path)
- if color_points_img is None:
- self.logger.error("无法读取截图")
- return False
- screenshot_height, screenshot_width = color_points_img.shape[:2]
- except Exception as e:
- self.logger.error(f"读取截图尺寸失败: {e}")
- # 如果无法读取,使用元素尺寸作为默认值
- screenshot_width = element_width
- screenshot_height = element_height
- # 屏幕坐标
- screen_coordinates = []
- for (rx, ry) in relative_coordinates:
- if screenshot_width > 0 and screenshot_height > 0:
- # 计算缩放比例
- scale_x = element_width / screenshot_width
- scale_y = element_height / screenshot_height
- sx = element_left + int(rx * scale_x)
- sy = element_top + int(ry * scale_y)
- else:
- # 如果无法获取截图尺寸,直接使用相对坐标加上元素起始位置
- sx = element_left + rx
- sy = element_top + ry
- screen_coordinates.append((sx, sy))
- self.logger.info(f"相对坐标({rx}, {ry}) -> 屏幕坐标({sx}, {sy})")
- # . 计算最短路径
- if len(screen_coordinates) < 2:
- self.logger.warning("需要至少2个点才能连接")
- return False
- unvisited = screen_coordinates.copy()
- path = [unvisited.pop(0)]
- while unvisited:
- last_point = path[-1]
- nearest_idx = min(range(len(unvisited)),
- key=lambda i: ((last_point[0] - unvisited[i][0]) ** 2 +
- (last_point[1] - unvisited[i][1]) ** 2) ** 0.5)
- path.append(unvisited.pop(nearest_idx))
- self.logger.info(f"最短路径点顺序: {path}")
- curved_path = self.human_like_path(path)
- # 连接
- result = self.simulate_human_drawing(curved_path)
- if result:
- self.logger.info("最短线连接成功")
- time.sleep(3)
- return True
- else:
- self.logger.warning("最短线连接失败")
- return False
- except Exception as e:
- self.logger.error(f"最短线连接验证失败: {e}")
- return False
- def find_color_coordinates(self, image_path, color_name):
- """查找指定颜色"""
- self.logger.info(f"开始查找颜色: {color_name}, 图片路径: {image_path}")
- # 颜色HSV范围映射
- color_ranges = {
- "红色": (([0, 120, 70], [10, 255, 255]), ([170, 120, 70], [180, 255, 255])),
- "红的": (([0, 120, 70], [10, 255, 255]), ([170, 120, 70], [180, 255, 255])),
- "绿色": (([35, 50, 50], [85, 255, 255]),),
- "蓝色": (([90, 50, 50], [130, 255, 255]),),
- "黄色": (([20, 100, 100], [30, 255, 255]),),
- "橙色": (([5, 100, 100], [15, 255, 255]),),
- "紫色": (([130, 50, 50], [160, 255, 255]),),
- "黑色": (([0, 0, 0], [180, 255, 50]),),
- "白色": (([0, 0, 200], [180, 30, 255]),),
- "黑色": (([0, 0, 0], [180, 255, 50]),),
- "褐色": (([10, 100, 20], [20, 255, 200]),),
- "橘色": (([5, 150, 150], [15, 255, 255])),
- "褐色": (([10, 50, 20], [20, 255, 150])),
- }
- if color_name not in color_ranges:
- self.logger.warning(f"不支持的颜色: {color_name}")
- return []
- # 读取图像
- image = cv2.imread(image_path)
- if image is None:
- self.logger.error(f"无法读取图像: {image_path}")
- return []
- # 转换到HSV颜色空间
- hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
- # 根据颜色名称获取HSV范围
- color_range = color_ranges[color_name]
- # 创建颜色掩码
- if color_name == "红色":
- lower1 = np.array(color_range[0][0])
- upper1 = np.array(color_range[0][1])
- lower2 = np.array(color_range[1][0])
- upper2 = np.array(color_range[1][1])
- mask1 = cv2.inRange(hsv, lower1, upper1)
- mask2 = cv2.inRange(hsv, lower2, upper2)
- mask = cv2.bitwise_or(mask1, mask2)
- else:
- lower = np.array(color_range[0][0])
- upper = np.array(color_range[0][1])
- mask = cv2.inRange(hsv, lower, upper)
- # 形态学操作去除噪点
- kernel = np.ones((3, 3), np.uint8)
- mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
- mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
- # 查找轮廓
- contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- # 获取每个轮廓的中心点(相对坐标)
- coordinates = []
- min_area = 30 # 最小面积阈值
- for i, contour in enumerate(contours):
- area = cv2.contourArea(contour)
- if area > min_area:
- # 计算轮廓的中心点
- M = cv2.moments(contour)
- if M["m00"] != 0:
- cx = int(M["m10"] / M["m00"])
- cy = int(M["m01"] / M["m00"])
- coordinates.append((cx, cy))
- self.logger.info(f"轮廓{i}: 面积={area}, 中心点=({cx}, {cy})")
- else:
- self.logger.info(f"轮廓{i}: 面积={area}, 无法计算中心点")
- self.logger.info(f"找到 {len(coordinates)} 个 {color_name} 坐标点")
- return coordinates
- def human_like_path(self, points):
- """生成模拟人类的弯曲路径"""
- if len(points) < 2:
- return points
- curved_path = []
- for i in range(len(points) - 1):
- start = points[i]
- end = points[i + 1]
- # 在两点之间添加弯曲点
- mid_x = (start[0] + end[0]) / 2
- mid_y = (start[1] + end[1]) / 2
- # 计算随机偏移,模拟人类手绘误差
- if abs(end[0] - start[0]) > abs(end[1] - start[1]):
- # 水平方向为主,在垂直方向添加偏移
- offset_x = 0
- offset_y = random.uniform(-15, 15)
- else:
- # 垂直方向为主,在水平方向添加偏移
- offset_x = random.uniform(-15, 15)
- offset_y = 0
- # 控制点(在中间点添加偏移)
- control_x = mid_x + offset_x
- control_y = mid_y + offset_y
- # 使用二次贝塞尔曲线生成弯曲路径
- curved_path.append(start)
- for t in np.arange(0.1, 1.0, 0.1):
- # 二次贝塞尔曲线公式
- x = (1 - t) ** 2 * start[0] + 2 * (1 - t) * t * control_x + t ** 2 * end[0]
- y = (1 - t) ** 2 * start[1] + 2 * (1 - t) * t * control_y + t ** 2 * end[1]
- curved_path.append((int(x), int(y)))
- # 添加最后一个点
- curved_path.append(points[-1])
- return curved_path
- def simulate_human_drawing(self, path):
- """模拟人类绘制路径"""
- if len(path) < 2:
- return False
- try:
- # 获取第一个点
- start_x, start_y = path[0]
- self.d.touch.down(start_x, start_y)
- time.sleep(random.uniform(0.05, 0.1))
- # 依次移动到路径中的每个点
- for i in range(1, len(path)):
- target_x, target_y = path[i]
- # 添加随机抖动,模拟人类手部颤抖
- jitter_x = random.randint(-2, 2)
- jitter_y = random.randint(-2, 2)
- self.d.touch.move(target_x + jitter_x, target_y + jitter_y)
- # 添加随机延迟
- delay = random.uniform(0.01, 0.03)
- time.sleep(delay)
- time.sleep(random.uniform(0.1, 0.2))
- self.d.touch.up(path[-1][0], path[-1][1])
- print("模拟人类绘制完成")
- return True
- except Exception as e:
- print(f"模拟绘制失败: {e}")
- return False
- # 空间推理
- def click_side(self):
- try:
- image_xpath = '//*[@resource-id="com.sankuai.meituan:id/titans_main_layout"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]'
- image_element = self.d.xpath(image_xpath)
- image_element_info = image_element.info
- bounds = image_element_info['bounds']
- image_left = bounds['left']
- image_top = bounds['top']
- image_width = bounds['right'] - bounds['left']
- image_height = bounds['bottom'] - bounds['top']
- print(f"图片位置: left={image_left}, top={image_top}, width={image_width}, height={image_height}")
- #
- image_img_path = "image.png"
- if self.d.xpath(image_xpath).exists:
- self.d.xpath(image_xpath).screenshot().save("image.png")
- else:
- print("image_xpath not exist")
- try:
- with open('image.png', 'rb') as f:
- c = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- "type": 50009,
- "image": c
- }
- headers = {
- "Content-Type": "application/json"
- }
- # 发送请求
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status()
- result = response.json()
- if result.get("code") == 10000 and result.get("data", {}).get("code") == 0:
- verify_data = result["data"]
- print(f"verify_data={verify_data}")
- big_click_xpath = verify_data["data"]
- else:
- print("api 返回错误")
- except Exception as e:
- return {
- "success": False,
- "error_msg": f"处理异常: {str(e)}"
- }
- coords_str = verify_data["data"]
- if coords_str:
- x_img_str, y_img_str = coords_str.split(',')
- x_img = int(x_img_str.strip())
- y_img = int(y_img_str.strip())
- x_screen = image_left + x_img
- y_screen = image_top + y_img
- # 点击转换后的屏幕坐标
- self.d.click(x_screen, y_screen)
- time.sleep(self.get_sleep_time())
- return True
- except Exception as e:
- self.logger.error(f"空间推理验证失败: {e}")
- return False
- # 人工处理
- def _handle_generic_captcha(self, xpath):
- """处理通用验证码"""
- time.sleep(5)
- self.logger.warning("通用验证码弹窗触发,等待人工处理...")
- start_time = time.time()
- timeout = 60 * 60
- while time.time() - start_time < timeout:
- if not self.d.xpath(xpath).exists:
- self.logger.info("验证码已处理完成")
- return True
- time.sleep(2)
- self.logger.warning("验证码处理超时")
- return False
- def check_and_handle_popup(self):
- d = self.spider.d
- exists, captcha_type, xpath = self.is_any_verification_popup_exists(d)
- if not exists:
- for simple_xpath, desc in self.popup_rules["simple"]:
- if d.xpath(simple_xpath).exists and not self._is_recent_click(simple_xpath):
- self.logger.info(f"检测到简单弹窗: {desc}")
- d.xpath(simple_xpath).click()
- return True
- # 处理广告弹窗
- if d.xpath('//*[contains(@text, "广告")]').exists:
- w, h = d.info['displayWidth'], d.info['displayHeight']
- d.click(w - 50, 50)
- self.logger.info("关闭广告弹窗")
- return True
- return False
- # 开始处理
- now = time.time()
- # 防止过于频繁触发
- if now - self.last_verification_time < 30 and self.verification_in_progress.is_set():
- return False
- self.last_verification_time = now
- self.verification_count += 1
- if captcha_type != self.last_verification_type:
- self.logger.info(f"验证码类型变化: {self.last_verification_type} -> {captcha_type}")
- self.verification_retry_count = 0
- self.last_verification_type = captcha_type
- self.verification_retry_count += 1
- self.logger.warning(
- f"检测到验证码弹窗,类型: {captcha_type},重试次数: {self.verification_retry_count}/{self.MAX_VERIFICATION_RETRY}")
- if self.verification_retry_count > self.MAX_VERIFICATION_RETRY:
- self.logger.error("重试次数超限,重启应用")
- self._handle_verification_failure()
- return False
- self.verification_in_progress.set()
- self.pausing.set()
- self.logger.info("已设置主线程暂停事件")
- # 处理
- try:
- if captcha_type == "Numbers_English":
- self.logger.info(f"开始处理通用数验证")
- result = self.Numbers_English_verify()
- elif captcha_type == "Swipe_right":
- self.logger.info(f"开始处理向右滑动")
- result = self.Swipe_right()
- elif captcha_type == "Click_images":
- self.logger.info(f"开始处理依次点击图片或语序")
- result = self.Click_images()
- elif captcha_type == "slider":
- self.logger.info(f"开始处理滑块验证")
- result = self.slider_verify()
- elif captcha_type == "complexs":
- self.logger.info(f"开始处理拖动滑块刚")
- result = self.complexs()
- elif captcha_type == "Shortest_connection":
- self.logger.info(f"开始处理最短距离连接")
- result = self.Shortest_connection()
- elif captcha_type == "click_side":
- self.logger.info(f"开始处理空间推理")
- result = self.click_side()
- else:
- self.logger.info(f"等待人工处理")
- result = self._handle_generic_captcha(xpath)
- except Exception as e:
- self.logger.error(f"验证码处理异常: {e}")
- result = False
- time.sleep(5)
- verification_cleared, remaining_type = self.wait_for_verification_clear(d, timeout=7)
- if verification_cleared:
- self.logger.info(f"第{self.verification_retry_count}次验证成功")
- time.sleep(3)
- self._handle_verification_success()
- return True
- else:
- self.logger.warning(f"第{self.verification_retry_count}次验证失败,仍有验证码: {remaining_type}")
- if self.verification_retry_count >= self.MAX_VERIFICATION_RETRY:
- self._handle_verification_failure()
- return False
- else:
- self.verification_in_progress.clear()
- self.pausing.clear()
- time.sleep(2)
- return self.check_and_handle_popup()
- '''
- # 1. 处理简单弹窗
- for xpath, desc in self.popup_rules["simple"]:
- if d.xpath(xpath).exists and not self._is_recent_click(xpath):
- self.logger.info("检测到弹窗: %s", desc)
- d.xpath(xpath).click()
- return True
- # 2. 处理验证码弹窗
- for xpath in self.popup_rules["verification"]:
- if d.xpath(xpath).exists:
- now = time.time()
- if now - self.last_verification_time < 30:
- return False # 30秒内不重复触发
- self.last_verification_time = now
- self.verification_count += 1
- self.logger.warning("验证码弹窗触发,等待人工处理...")
- if self.verification_count > self.MAX_VERIFICATION_RETRY:
- self.logger.error("验证码重试超限,终止任务")
- self.spider.stop_all()
- return True
- self.pausing.set() # 通知主线程暂停
- d.toast.show("需要人工处理验证码", 120)
- # 等待人工处理
- start = time.time()
- # while time.time() - start < 120*60:
- # if not d.xpath(xpath).exists:
- # self.logger.info("验证码已处理")
- # d.toast.show("验证完成", 2)
- # self.pausing.clear() # 放行主线程
- # return True
- # time.sleep(5)
- while True:
- if not d.xpath(xpath).exists:
- self.logger.info("验证码已处理")
- d.toast.show("验证完成", 2)
- self.pausing.clear() # 放行主线程
- return True
- time.sleep(5)
- self.logger.warning("验证码超时,重启APP")
- self.spider.restart_app()
- return True
- # 3. 处理广告弹窗(点击右上角)
- if d.xpath('//*[contains(@text, "广告")]').exists:
- w, h = d.info['displayWidth'], d.info['displayHeight']
- d.click(w - 50, 50)
- self.logger.info("关闭广告弹窗")
- return True
- return False
- '''
- def is_any_verification_popup_exists(self, d=None):
- """
- 检查是否存在任何类型的验证码弹窗
- """
- if d is None:
- d = self.d
- for xpath, captcha_type in self.popup_rules["verification"]:
- if d.xpath(xpath).exists:
- return True, captcha_type, xpath
- # 特殊的验证码
- additional_indicators = [
- ('//*[contains(@resource-id, "com.sankuai.meituan:id/yoda_activity_rootView")]', "complexs"),
- ('//*[contains(@text, "拖动滑块刚")]', "complexs"),
- ]
- for xpath, captcha_type in additional_indicators:
- if d.xpath(xpath).exists:
- return True, captcha_type, xpath
- return False, None, None
- def wait_for_verification_clear(self, d=None, timeout=10):
- """
- 等待验证码完全消失
- """
- if d is None:
- d = self.d
- start_time = time.time()
- while time.time() - start_time < timeout:
- exists, captcha_type, _ = self.is_any_verification_popup_exists(d)
- if not exists:
- return True, None
- time.sleep(1)
- exists, captcha_type, _ = self.is_any_verification_popup_exists(d)
- if exists:
- self.logger.info(f"超时,类型: {captcha_type}")
- return False, captcha_type
- else:
- return True, None
- def _handle_verification_success(self):
- """验证成功后的处理"""
- time.sleep(5)
- self.verification_retry_count = 0
- self.last_verification_type = None
- self.verification_in_progress.clear()
- self.pausing.clear()
- self.verification_count = 0
- self.logger.info("验证成功,清除暂停状态")
- def _handle_verification_failure(self):
- """验证失败 - 等待人工处理"""
- self.logger.error("验证码处理失败,等待人工处理...")
- # 重置验证状态,但不重启应用
- self.verification_retry_count = 0
- self.last_verification_type = None
- self.verification_in_progress.clear()
- # 设置等待人工处理的超时时间(30分钟)
- timeout = 30 * 60
- start_time = time.time()
- # 持续监控验证码状态,等待人工处理
- while time.time() - start_time < timeout:
- # 检查是否还有验证码存在
- exists, captcha_type, xpath = self.is_any_verification_popup_exists()
- if not exists:
- # 验证码已消失,可能是人工处理成功
- self.logger.info("验证码已消失,人工处理成功")
- # 额外等待确保页面稳定
- time.sleep(3)
- # 清除暂停状态,放行线程
- self.pausing.clear()
- self.logger.info("人工处理完成,放行线程")
- return
- # 每隔10秒检查一次
- time.sleep(10)
- # 超时处理
- self.logger.warning("等待人工处理超时,尝试继续执行")
- # 强制清除暂停状态,放行线程
- self.pausing.clear()
- self.logger.warning("已超时,强制清除暂停状态,放行线程")
- def stop(self):
- self.running = False
- def get_access_token():
- AppKey = "tRK2RhyItCSh6BzyT4CNVXQa"
- AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh"
- token_url = 'https://aip.baidubce.com/oauth/2.0/token'
- url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}"
- payload = ""
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- try:
- return response.json()['access_token']
- except:
- return None
- def get_mysql():
- """
- 建立并返回一个到数据库的连接对象
- """
- import pymysql
- return pymysql.connect(
- host=Config.DB_HOST, # "localhost", # 修改后的主机
- port=Config.DB_PORT, # 3306, # 添加端口号
- user=Config.DB_USER, # 'root', # 修改后的用户名
- password=Config.DB_PASSWORD, # 修改后的密码
- db=Config.DB_NAME, # "drug_data", # 修改后的数据库名
- charset='utf8mb4'
- )
- class TaskReporter:
- """任务上报管理器(线程安全)"""
- def __init__(self):
- self.tasks_data = {} # 存储每个任务的数据
- self.lock = threading.Lock()
- def start_task(self, task_id: int, start_page: int, end_page: int):
- """记录任务开始"""
- with self.lock:
- self.tasks_data[task_id] = {
- 'task_id': task_id,
- 'start_time': int(time.time()),
- 'end_time': None,
- 'start_page': start_page,
- 'end_page': end_page,
- 'actual_end_page': start_page, # 实际结束页数
- 'real_count': 0, # 实际采集数量
- 'status': 'running', # running, completed, failed
- 'finish_status': 0, # 0:未完成,1:已完成
- }
- def update_task_progress(self, task_id: int,
- actual_end_page: int = None,
- real_count: int = None):
- """更新任务进度(线程安全)"""
- with self.lock:
- if task_id in self.tasks_data:
- if actual_end_page is not None:
- self.tasks_data[task_id]['actual_end_page'] = actual_end_page
- if real_count is not None:
- self.tasks_data[task_id]['real_count'] = real_count
- def end_task(self, task_id: int, status: str = 'completed',
- finish_status: int = 0, force_end_page: int = None):
- """记录任务结束并上报"""
- with self.lock:
- if task_id in self.tasks_data:
- data = self.tasks_data[task_id]
- data['end_time'] = int(time.time())
- data['status'] = status
- data['finish_status'] = finish_status
- if force_end_page is not None:
- data['actual_end_page'] = force_end_page
- # 准备上报数据
- report_data = {
- "collect_task_allocate_id": data['task_id'],
- "status": 3 if data['status'] == 'completed' else 4,
- "finish_status": data['finish_status'],
- 'real_count': data['real_count'],
- 'start_time': data['start_time'],
- 'end_time': data['end_time'],
- 'start_page': data['start_page'],
- 'end_page': data['actual_end_page']
- }
- # 调用上报接口
- self._call_report_api(report_data)
- def _call_report_api(self, data: Dict[str, Any]):
- """调用上报接口"""
- try:
- url = 'http://schedule.dfwy.tech/api/collect_equipment_execute/result_report'
- resp = requests.post(url, json=data, timeout=10)
- if resp.status_code == 200:
- print(f"任务 {data['collect_task_allocate_id']} 上报成功")
- # self.loggerMT.info(f"任务 {data['collect_task_allocate_id']} 上报成功")
- else:
- print(f"任务 {data['collect_task_allocate_id']} 上报失败: {resp.status_code}")
- # self.loggerMT.info(f"任务 {data['collect_task_allocate_id']} 上报失败: {resp.status_code}")
- except Exception as e:
- print(f"上报接口调用异常: {e}")
- # 全局上报管理器
- reporter = TaskReporter()
- class MT:
- def __init__(self, key):
- # self.package_name = 'com.sankuai.meituan'
- self.package_name = Config.PACKAGE_NAME
- self.access_token = get_access_token()
- self.city2province = self.get_city_info()
- self.APP_ID = '116857964'
- self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh'
- self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3'
- self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY)
- # host = Config.DB_HOST #"localhost"
- # user = Config.DB_USER #"root"
- # password = Config.DB_PASSWORD #"dfwy2025"
- # database = Config.DB_NAME #"drug_data"
- # port = Config.DB_PORT#3306
- # print(f'数据库配置:host:{host},user:{user},password:{password},database:{database},port:{port}')
- self.table_name = Config.DB_AUTO_DRUG_TABLE # "mt_drug"
- self.shop_table_name = Config.DB_SHOP_TABLE
- # print(f'数据库表名:table_name:{self.table_name},shop_table_name:{self.shop_table_name}')
- # self.mysql_client = mysqlClient(host, user, password, database, port)
- self.loggerMT = logging.getLogger()
- self.search_key = key # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒
- self.unrelated_data = 0 # 无关数据数量
- self.shop_data_num = 0 # 店铺数据数量
- # === 新增:采集统计 ===
- self.collected_count = 0 # 实际采集的商品数量
- self.task_id = None # 任务ID
- self.start_time = None # 任务开始时间
- self.current_page = 0 # 当前页码
- self.task_start_page = 0 # 任务开始页码
- self.task_end_page = 0 # 任务结束页码
- self.task_product_name = '' # 产品名称
- self.task_product_specs = '' # 品规
- # ====================
- def update_task_status(self, status):
- """更新任务状态到数据库"""
- if not self.task_id:
- return
- try:
- retrieve_conn = get_retrieve_mysql()
- cursor = retrieve_conn.cursor()
- update_time = time.time()
- update_sql = """
- UPDATE retrieve_collect_task_allocate
- SET status = %s, \
- update_time = %s
- WHERE id = %s \
- """
- cursor.execute(update_sql, (status, update_time, self.task_id))
- retrieve_conn.commit()
- self.loggerMT.info(f"任务 {self.task_id} 状态更新为 {status}")
- except Exception as e:
- self.loggerMT.error(f"更新任务状态失败: {e}")
- finally:
- if 'cursor' in locals():
- cursor.close()
- if 'retrieve_conn' in locals():
- retrieve_conn.close()
- def stop_app(self):
- self.d.app_stop(self.package_name)
- time.sleep(5)
- def start_app(self):
- self.d.app_start(self.package_name)
- time.sleep(5)
- def restart_app(self):
- """
- 重启app
- :return:
- """
- self.stop_app()
- self.start_app()
- time.sleep(5) # 增加稳定等待时间
- # 强制检查弹窗
- self.monitor.check_and_handle_popup()
- @staticmethod
- def get_sleep_time():
- # return random.randint(5, 8)
- return random.randint(1, 3)
- @staticmethod
- def get_current_date():
- return datetime.datetime.now().strftime('%Y/%m/%d')
- @staticmethod
- def get_city_info():
- """
- 获取所有的省市数据
- :return:
- """
- file_path = '../kailin_city.json'
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- province = {province_one["id"]: province_one for province_one in data['province']}
- city2province = dict()
- city = data['city']
- for city_one in city:
- name = city_one['name']
- pid = city_one['pid']
- if len(str(pid)) > 2:
- pid = int(re.match('^\d{2}', str(pid)).group())
- city2province[name] = province[pid]['name']
- return city2province
- # 将30分钟后采集的数据上报到服务端
- def up_data_to_service(self, collect_task_allocate_id, status, finish_status, real_count, start_time, end_time,
- start_page, end_page):
- # 1. 请求地址
- url = 'http://schedule.dfwy.tech/api/collect_equipment_execute/result_report'
- # 2. 请求参数(字典形式)
- json_data = {
- "collect_task_allocate_id": collect_task_allocate_id,
- "status": status,
- "finish_status": finish_status,
- 'real_count': real_count,
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_page': start_page,
- 'end_page': end_page
- }
- # 3. 发送 POST 并拿到结果
- # resp = requests.post(url, json=json_data, headers=headers)
- resp = requests.post(url, json=json_data)
- # 4. 打印结果
- print("状态码:", resp.status_code)
- # print("响应体(文本):\n", resp.text) # 纯文本
- print("响应体(JSON):\n", resp.json()) # 如果返回的是 JSON,可改用这行
- def get_shop_name(self):
- """
- 获取店铺名
- :return:
- """
- try:
- shop_name = self.d.xpath(
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到店铺名:{shop_name}')
- return shop_name
- except:
- try:
- shop_name = self.d.xpath(
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到店铺名2:{shop_name}')
- return shop_name
- except Exception as e:
- # 点击店铺曲获取店铺名称
- print("点击店铺进入后获取店铺名称")
- self.enter_shop()
- shop_xpath = '//*[@resource-id="com.sankuai.meituan:id/layout_header_view"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]//android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(shop_xpath).exists:
- shop_name = self.d.xpath(shop_xpath).text
- self.swipe_back(1)
- return shop_name
- else:
- print(f'获取店铺名出错:{e}')
- return None
- def get_qualification_number(self):
- """
- 获取资质编号
- :return:
- """
- try:
- qualification_number_str = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text
- qualification_number = qualification_number_str.strip('资质编号:').strip()
- return qualification_number
- except:
- return None
- def get_shop_address(self):
- try:
- xpath = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView'
- if self.d.xpath(xpath).exists:
- shop_address = self.d.xpath(xpath).text
- print(f'111-获取到店铺地址:{shop_address}')
- if '发货时间' in shop_address:
- print(f'店铺地址包含发货时间,再次获取店铺地址')
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.TextView'
- if self.d.xpath(xpath2).exists:
- shop_address = self.d.xpath(xpath2).text
- print(f'222-获取到店铺地址:{shop_address}')
- else:
- print(f'222-xpath2获取店铺地址失败')
- else:
- shop_address = ''
- print(f'333-获取到店铺地址:{shop_address}')
- return shop_address
- except:
- print(f'获取店铺地址出错-get_shop_address')
- return None
- def enter_detail(self):
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/recycler"]/android.widget.FrameLayout[1]').click()
- time.sleep(self.get_sleep_time())
- def save_to_database(self, data):
- print(f'保存数据到数据库:{data}')
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- # add_sql = "insert into delete_friend_table(delete_user_name,delete_user_id,delete_content,delete_time) value(%s,%s,%s,%s)"
- add_sql = f"""
- INSERT INTO {self.table_name}
- (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform, search_key)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- # cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], self.get_current_date(), data['scrape_province'], data['availability'], data['credit_code'], data['platform']))
- cur.execute(add_sql,
- (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'],
- data['business_license_company'], data['province'], data['city'], data['manufacturer'],
- data['specification'], data['approval_number'], data['product_link'], data['scrape_date'],
- data['scrape_province'], data['availability'], data['credit_code'], data['platform'],
- data['search_key']))
- conn.commit() # 提交数据
- # self.mysql_client.insert(self.table_name, data)
- print(f"存入数据库成功")
- # === 新增:更新采集计数 ===
- self.collected_count += 1
- if self.task_id:
- # 更新上报进度
- reporter.update_task_progress(
- task_id=self.task_id,
- real_count=self.collected_count
- )
- def save_shop_info_to_database(self, data):
- print(f'保存店铺数据到数据库:{data}')
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- add_sql = f"""
- INSERT INTO {self.shop_table_name}
- (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
- cur.execute(add_sql, (data['shop'], data['contact_address'], data['qualification_number'],
- data['business_license_company'], data['business_license_address'], data['scrape_date'],
- data['platform']))
- conn.commit() # 提交数据
- # self.mysql_client.insert(self.shop_table_name, data)
- print(f'存入店铺信息到数据库成功')
- def swipe_up(self):
- """
- 上滑
- :return:
- """
- screen_width = self.d.info['displayWidth']
- screen_height = self.d.info['displayHeight']
- duration_rate = random.uniform(0, 0.3)
- self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate)
- no = random.uniform(0, 1)
- if no > 0.85:
- # 有的时候卡着 再稍微往上滑一点点
- self.d.swipe_ext("up", 0.1)
- time.sleep(self.get_sleep_time())
- def swipe_back(self, no):
- """
- 返回
- :param no: 回退次数
- :return:
- """
- for idx in range(no):
- self.d.press('back')
- time.sleep(self.get_sleep_time())
- def drug_price(self):
- """
- 获取药品价格
- :return:
- """
- try:
- price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text
- price = float(re.search('[\d\.]+', price_str).group())
- print(f'获取到价格:{price}')
- return price
- except Exception as e:
- print(f'提取价格出错-->{e}')
- return None
- def restart_uiautomator_services(self, device_id):
- """
- 重启atx的uiautomator 服务
- :param device_id:
- :return:
- """
- stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop'
- start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d'
- # result = subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- # print(result.stdout)
- subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- def connect_devices(self, device_id):
- """
- 连接设备
- :return:
- """
- try:
- self.d = u2.connect_usb(device_id)
- # 设置隐形等待时间
- # self.d.implicitly_wait(5)
- self.restart_uiautomator_services(device_id)
- print(f'连接到设备:{device_id}')
- except Exception as e:
- print(f'{device_id} 连接错误: {e}')
- raise Exception(e)
- def get_ocr_res(self, img):
- try:
- # img地址
- print(f'开始识别图片:{img}')
- request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license"
- # 二进制方式打开图片文件
- f = open(img, 'rb')
- img = base64.b64encode(f.read())
- params = {"image": img}
- # access_token = get_access_token()
- request_url = request_url + "?access_token=" + self.access_token
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- response = requests.post(request_url, data=params, headers=headers)
- if response:
- res = response.json()
- new_dic = dict()
- for ite in res['words_result'].keys():
- new_dic[ite] = res['words_result'][ite]['words']
- print('资质数据信息', new_dic)
- return new_dic
- else:
- return None
- except:
- return None
- def remove_watermark(self, img_path):
- """
- 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据
- :param img_path: 图片路径
- :return: 二进制图片数据
- """
- img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
- endswith = os.path.splitext(img_path)[1]
- new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8)
- _, img_binary = cv2.imencode(endswith, new)
- return img_binary
- def get_ocr_res_image(self, img):
- try:
- image = self.remove_watermark(img)
- # image_file = open(img,'wb')
- # image_file.write(image)
- # res_image = self.client.basicAccurate(image) # 高精度
- res_image = self.client.basicGeneral(image)
- # print(f'百度api返回结果:{res_image}')
- # print(res_image.get('words_result', ''))
- # new_dic = dict()
- data = res_image.get('words_result', '')
- print(f'百度api返回结果:{data}')
- # full_text = ';'.join(item['words'] for item in data)
- # address = ''
- # for item in data:
- # if '企业注册号' in item['words']:
- # print('come in 111')
- # reg_number = item['words'].split(':', 1)[1].strip()
- # elif '企业名称' in item['words']:
- # print('come in 222')
- # company_name = item['words'].split(':', 1)[1].strip()
- # elif '所:' in item['words']:
- # print('come in 333')
- # address = item['words'].split(':', 1)[1].strip()
- # # 输出结果
- # print("企业注册号:", reg_number)
- # print("企业名称:", company_name)
- # print("住所:", address)
- return data
- except:
- return None
- def screenshot_the_business_license(self, qualification_number):
- screenshot_path = 'screenshot1.png'
- self.d.screenshot(screenshot_path)
- img = cv2.imread(screenshot_path)
- # 指定裁剪区域 (left, top, right, bottom)
- left = 0
- top = 480
- right = 720
- bottom = 1420
- cropped_img = img[top:bottom, left:right]
- if qualification_number:
- cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- else:
- cropped_screenshot_path = 'cropped_screenshot.png'
- cv2.imwrite(cropped_screenshot_path, cropped_img)
- return cropped_screenshot_path
- def screenshot_instruction(self):
- # 获取当前时间
- current_time = datetime.datetime.now()
- # 格式化为时分秒
- time_str = current_time.strftime("%H-%M-%S")
- # 生成随机的 8 位字符串
- random_str = secrets.token_hex(4) # 生成 4 个字节的随机字符串,转换为 8 位十六进制字符串
- print(time_str)
- screenshot_path = 'instructionscreenshot1-' + time_str + '-' + random_str + '.png'
- self.d.screenshot(screenshot_path)
- return screenshot_path
- def extract_specification(self, text):
- """提取药品规格信息"""
- # 方法1:简单去除到期信息
- pattern = r'^[^【]+'
- match = re.search(pattern, text)
- if match:
- return match.group(0).strip()
- return text
- # 获取商品title
- def get_title(self):
- # try:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- # except:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text
- # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text
- def _inner():
- temp_search_key = self.search_key
- if "999" in self.search_key:
- if self.search_key == '999皮炎平曲安奈德益康唑乳膏30':
- temp_search_key = self.search_key.replace("999皮炎平", "")
- elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30':
- temp_search_key = self.search_key.replace("999必无忧", "")
- elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g':
- temp_search_key = self.search_key.replace("999必无忧", "")
- elif self.search_key == '999速复康布洛芬缓释胶囊':
- temp_search_key = self.search_key.replace("999速复康", "")
- elif self.search_key == '999选平硝酸咪康唑乳膏20g':
- temp_search_key = self.search_key.replace("999选平", "")
- elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20':
- temp_search_key = self.search_key.replace("999皮炎平", "")
- else:
- temp_search_key = self.search_key.replace("999", "")
- else:
- if self.search_key == '史达功右美沙芬愈创甘油醚糖浆120':
- temp_search_key = self.search_key.replace("史达功", "")
- temp_search_key = temp_search_key.replace("120", "")
- elif self.search_key == '三九胃泰养胃舒颗粒8袋':
- temp_search_key = self.search_key.replace("三九胃泰", "")
- temp_search_key = temp_search_key.replace("8袋", "")
- elif self.search_key == '今维多赐多康牌蛋白粉':
- temp_search_key = self.search_key.replace("今维多", "")
- elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4':
- temp_search_key = self.search_key.replace("佳美舒", "")
- temp_search_key = temp_search_key.replace("4", "")
- elif self.search_key == '三九胃泰颗粒20g*10':
- temp_search_key = self.search_key.replace("20g*10", "")
- elif self.search_key == '三九胃泰颗粒20g*6袋':
- temp_search_key = self.search_key.replace("20g*6袋", "")
- elif self.search_key == '顺峰康王酮康他索乳膏':
- temp_search_key = self.search_key.replace("顺峰康王", "")
- if self.search_key == '999糠酸莫米松凝胶15':
- temp_search_key = temp_search_key.replace("15", "")
- elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30':
- temp_search_key = temp_search_key.replace("30", "")
- elif self.search_key == '999复方金银花颗粒10g':
- temp_search_key = temp_search_key.replace("10g", "")
- elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒':
- temp_search_key = temp_search_key.replace("15g*15袋/盒", "")
- elif self.search_key == '999复方氨酚烷胺胶囊6粒':
- temp_search_key = temp_search_key.replace("6粒", "")
- elif self.search_key == '999可调式生理性海水鼻腔喷雾50':
- temp_search_key = temp_search_key.replace("50", "")
- elif self.search_key == '999止泻利颗粒15g*8':
- temp_search_key = temp_search_key.replace("15g*8", "")
- elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30':
- temp_search_key = temp_search_key.replace("30", "")
- elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g':
- temp_search_key = temp_search_key.replace("15g", "")
- elif self.search_key == '999复方苦参肠炎康片12片':
- temp_search_key = temp_search_key.replace("12片", "")
- elif self.search_key == '999强力枇杷露16袋':
- temp_search_key = temp_search_key.replace("16袋", "")
- elif self.search_key == '999三蛇胆川贝膏138':
- temp_search_key = temp_search_key.replace("138", "")
- elif self.search_key == '999强力枇杷露120ml':
- temp_search_key = temp_search_key.replace("120ml", "")
- elif self.search_key == '999强力枇杷露150ml':
- temp_search_key = temp_search_key.replace("150ml", "")
- elif self.search_key == '999抗病毒口服液10ml*10':
- temp_search_key = temp_search_key.replace("10ml*10", "")
- elif self.search_key == '999抗病毒口服液10ml*12':
- temp_search_key = temp_search_key.replace("10ml*12", "")
- elif self.search_key == '999糠酸莫米松乳膏10g支':
- temp_search_key = temp_search_key.replace("10g支", "")
- elif self.search_key == '999选平硝酸咪康唑乳膏20g':
- temp_search_key = temp_search_key.replace("20g", "")
- elif self.search_key == '999感冒清热颗粒(无糖)6g':
- temp_search_key = temp_search_key.replace("(无糖)6g", "")
- elif self.search_key == '999壮骨关节丸6g*20':
- temp_search_key = temp_search_key.replace("6g*20", "")
- elif self.search_key == '999正天丸6g*15':
- temp_search_key = temp_search_key.replace("6g*15", "")
- elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20':
- temp_search_key = temp_search_key.replace("20", "")
- elif self.search_key == '999糠酸莫米松凝胶10':
- temp_search_key = temp_search_key.replace("10", "")
- elif self.search_key == '999板蓝根颗粒10g*20':
- temp_search_key = temp_search_key.replace("10g*20", "")
- elif self.search_key == '999复方氨酚烷胺胶囊10粒':
- temp_search_key = temp_search_key.replace("10粒", "")
- elif self.search_key == '999复方氨酚烷胺胶囊12粒':
- temp_search_key = temp_search_key.replace("12粒", "")
- elif self.search_key == '999咽炎片0.26g*12片*2板':
- temp_search_key = temp_search_key.replace("0.26g*12片*2板", "")
- elif self.search_key == '999小儿止咳糖浆120':
- temp_search_key = temp_search_key.replace("120", "")
- elif self.search_key == '999小儿止咳糖浆225':
- temp_search_key = temp_search_key.replace("225", "")
- elif self.search_key == '999小儿感冒颗粒6g*10':
- temp_search_key = temp_search_key.replace("6g*10", "")
- elif self.search_key == '999小儿感冒颗粒6g*24':
- temp_search_key = temp_search_key.replace("6g*24", "")
- elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋':
- temp_search_key = temp_search_key.replace("6g*10袋", "")
- elif self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋':
- temp_search_key = temp_search_key.replace("6g*20袋", "")
- elif self.search_key == '999小儿咽扁颗粒8g*10袋':
- temp_search_key = temp_search_key.replace("8g*10袋", "")
- elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋':
- temp_search_key = temp_search_key.replace("2.5g*10袋", "")
- elif self.search_key == '999感冒清热颗粒12g*18':
- temp_search_key = temp_search_key.replace("12g*18", "")
- # elif self.search_key == '三九胃泰颗粒':
- # self.search_key = '三九胃泰' #兼容三九胃泰 温胃舒颗粒
- print(f'获取商品title时的搜索关键字:{temp_search_key}')
- # title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text
- # 初始化
- drugs_name = ''
- specifications = ''
- title = ''
- # 循环的获取title为了有时间来处理人机验证
- for m in range(1, 6000):
- if self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').exists:
- title = self.safe_exec(
- lambda: self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text
- )
- print(f"第{m}次获取title成功")
- break
- else:
- time.sleep(3)
- # return drugs_name, specifications
- # drugs_name = ''
- # specifications = ''
- # try:
- # title_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- # title_xpath_2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- # if self.d.xpath(title_xpath).exists:
- # title = self.d.xpath(title_xpath).text
- # print(f"title_xpath获取的title={title}")
- # if temp_search_key not in title:
- # return drugs_name, specifications
- # elif self.d.xpath(title_xpath_2).exists:
- # title = self.d.xpath(title_xpath_2).text
- # print(f"title_xpath_2获取的title={title}")
- # if temp_search_key not in title:
- # return drugs_name, specifications
- # else:
- # print('title_xpath不存在,请确认')
- # return drugs_name, specifications
- # # title = self.d.xpath(f'//*[contains(@text, "{temp_search_key}")]').text
- # except Exception as e:
- # print(f"发生异常: {e}")
- # return drugs_name, specifications
- # 奇怪:有的时候title取出来的记过第一位会多一个0
- # title = self.safe_exec(self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text)
- # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- title = title[1:] if title.startswith('0') else title
- print(f'获取到药品标题:{title}')
- # 从里面匹配出药品名和规格
- # drugs_name
- # specifications
- # match = re.search(r'([^\d]+)([\d\D]+)', title)
- if self.search_key == '999赐多康大豆':
- return title, '1罐'
- if self.search_key == "999感冒清热颗粒":
- match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title)
- else:
- match = re.match(r'(\[[^\]]+\])(.*?)\s*((?:\d+\S*|\(.+))$', title)
- if match:
- # drugs_name = match.group(1).strip() + match.group(2).strip()
- drugs_name = title
- specifications = match.group(3).strip()
- print("药品名:", drugs_name)
- print("规格:", specifications)
- # 如果品规中包含到期则需要再次的正则处理
- if '到期' in specifications:
- specifications = self.extract_specification(specifications)
- # print('完整药名:', drugs_name + specifications)
- return drugs_name, specifications
- else:
- if title == '999抗病毒口服液10ml*12' or title == '999抗病毒口服液':
- drugs_name = title
- specifications = '10ml*12支/盒'
- return drugs_name, specifications
- elif title == '999抗病毒口服液10ml*10':
- drugs_name = title
- specifications = '10ml*10支/盒'
- return drugs_name, specifications
- elif title == '999小柴胡颗粒':
- drugs_name = title
- specifications = '10g*9袋/盒'
- return drugs_name, specifications
- elif title == '999养胃舒颗粒':
- drugs_name = title
- specifications = '10g*6袋/盒'
- return drugs_name, specifications
- elif title == '三九胃泰胶囊':
- drugs_name = title
- specifications = '0.5g*24粒/盒'
- return drugs_name, specifications
- elif title == '999补脾益肠丸':
- drugs_name = title
- specifications = '6g*15袋/盒'
- return drugs_name, specifications
- elif title == '999复方感冒灵颗粒':
- drugs_name = title
- specifications = '14g*9袋/盒'
- return drugs_name, specifications
- else:
- print("没有匹配到预期格式")
- drugs_name = title
- specifications = ''
- return drugs_name, specifications
- # 用 safe_exec 包装内部逻辑,确保验证码阻塞
- return self.safe_exec(_inner)
- def enter_shop(self):
- """
- 进店,方便提取资质环境
- :return:
- """
- # self.d.xpath('//*[@text="进店"]').click()
- self.d.xpath('//*[@text="店铺"]').click()
- time.sleep(self.get_sleep_time())
- def enter_shoper(self):
- """
- 进入商家
- :return:
- """
- is_shoper_exists = 0
- for i in range(10):
- if self.d.xpath('//*[@text="商家"]').exists:
- print(f'第{i}次商家存在')
- is_shoper_exists = 1
- break
- else:
- print(f'第{i}次商家不存在')
- time.sleep(self.get_sleep_time())
- if is_shoper_exists == 1:
- self.d.xpath('//*[@text="商家"]').click()
- time.sleep(self.get_sleep_time())
- return True
- else:
- return False
- # 点击查看商家资质
- def scan_shoper_license(self):
- exist_shoper = 0
- for i in range(10):
- if self.d.xpath('//*[@text="查看商家资质"]').exists:
- print(f'第{i}次查看商家资质存在')
- exist_shoper = 1
- break
- else:
- print(f'第{i}次查看商家资质不存在')
- time.sleep(self.get_sleep_time())
- if exist_shoper == 1:
- self.d.xpath('//*[@text="查看商家资质"]').click()
- time.sleep(self.get_sleep_time())
- else:
- self.swipe_back(1)
- # 验证商品的信息是否在数据库中已存在
- def data_is_exists(self, data):
- """
- 检查指定数据是否已存在于数据库表中(仅检查存在性)
- 参数:
- data: 包含查询条件的字典,键为列名,值为条件值
- 返回:
- True: 数据存在
- False: 数据不存在
- None: 检查过程中出错
- """
- # dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- # 'platform': '美团'}
- # 1. 验证必要字段
- required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform']
- if not all(key in data for key in required_keys):
- missing = [key for key in required_keys if key not in data]
- logging.error(f"缺少必要字段: {', '.join(missing)}")
- return None
- try:
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- # query_sql = f"SELECT * FROM {self.table_name} WHERE product = '{data['product']}' AND min_price = '{data['min_price']}' AND shop = '{data['shop']}' AND scrape_date = '{data['scrape_date']}' AND platform = '{data['platform']}'"
- # cur.execute(query_sql)
- query_sql = """
- SELECT * FROM {}
- WHERE product = %s
- AND min_price = %s
- AND shop = %s
- AND scrape_date = %s
- AND platform = %s
- """.format(self.table_name)
- cur.execute(query_sql, (
- data['product'],
- data['min_price'],
- data['shop'],
- data['scrape_date'],
- data['platform']
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
- # 验证店铺信息是否在数据库中已存在
- def shop_is_exists_database(self, shop):
- try:
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- query_sql = """
- SELECT * FROM {}
- WHERE shop = %s
- """.format(self.shop_table_name)
- cur.execute(query_sql, (
- shop
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
- def wait_if_verifying(self, monitor, timeout=120):
- """验证码处理期间阻塞主线程"""
- start = time.time()
- while monitor.pausing.is_set() and time.time() - start < timeout:
- time.sleep(1)
- # def safe_xpath(self, xpath, timeout=10):
- # """线程安全 xpath 查找"""
- # self.wait_if_verifying(self.monitor)
- # return self.d.xpath(xpath).wait(timeout=timeout)
- def wait_for_ready(self, monitor, timeout=86400):
- """进入每一页前都先等验证码"""
- start = time.time()
- while monitor.pausing.is_set() and time.time() - start < timeout:
- time.sleep(1)
- # 额外保险:如果验证码突然在这一秒才弹,再主动扫一次
- monitor.check_and_handle_popup()
- def safe_list(self, xpath, monitor):
- """线程安全地拿商品列表"""
- self.wait_for_ready(monitor)
- return self.d.xpath(xpath).all()
- def safe_exec(self, func, *args, **kwargs):
- """
- 万能安全壳:执行 func 前检查验证码,
- 若监控线程已置位 pausing,则一直阻塞直到放行。
- """
- # 强制等待一小段时间,让监控线程有机会检测
- time.sleep(0.1)
- while self.monitor.pausing.is_set():
- time.sleep(1)
- # 双重检查
- # if self.monitor.pausing.is_set():
- # print("检测到暂停标志,等待验证码处理...")
- # while self.monitor.pausing.is_set():
- # time.sleep(1)
- # 执行前再快速检查一次
- # self.monitor.check_and_handle_popup()
- # 执行真正逻辑
- return func(*args, **kwargs)
- def get_next_data(self, data, target):
- for i, item in enumerate(data):
- if item['words'] == target:
- if i + 1 < len(data):
- return data[i + 1]['words']
- return None
- def delete_instruction_screenshot(self, screenshot_path):
- # 删除截图文件
- try:
- os.remove(screenshot_path)
- print(f"截图文件已删除:{screenshot_path}")
- except FileNotFoundError:
- print(f"文件未找到,无法删除:{screenshot_path}")
- except Exception as e:
- print(f"删除文件时出错:{e}")
- '''
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- self.d.xpath('//*[@text="查看详细说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- self.d.xpath('//*[@text="加载更多"]').click_exists()
- loop_page = 5
- # new_list = list()
- new_list = []
- for i in range(loop_page):
- self.d.xpath('//*[@text="加载更多"]').click_exists()
- time.sleep(0.2)
- if i == 0:
- self.d.swipe(200, 1000, 200, 300, 0.4)
- else:
- self.d.swipe(200, 1000, 200, 62)
- time.sleep(0.2)
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- time.sleep(0.2)
- all_tt = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all()
- for idx in range(1, len(all_tt) + 1):
- all_tt1 = self.d.xpath(
- f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all()
- # print(f'当前说明书列表数据:{all_tt1}')
- for tt in all_tt1:
- if tt.text and tt.text != '展开全文':
- new_list.append(tt.text)
- if i == 0:
- height = 938
- else:
- drug_box = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info
- bounds = drug_box['bounds']
- height = bounds['bottom'] - bounds['top']
- if height < 938:
- # print('说明书翻页到底部')
- break
- # 展开全文
- new_list = [item for item in new_list if item != '展开全文']
- print(f'当前说明书列表数据:{new_list}')
- # expiry_date_index = next(idx for idx, i in enumerate(new_list) if i == '有效期')
- # manufacturer_index = next(idx for idx, i in enumerate(new_list) if i == '生产单位')
- # approval_number_index = next(idx for idx, i in enumerate(new_list) if i == '批准文号')
- # res_data = {
- # "有效期": new_list[expiry_date_index + 1],
- # "生产单位": new_list[manufacturer_index + 1],
- # "批准文号": new_list[approval_number_index + 1]
- # }
- res_data = {
- "有效期": (new_list[new_list.index("有效期") + 1]) if "有效期" in new_list and new_list.index("有效期") + 1 < len(new_list) else "",
- "生产单位": (new_list[new_list.index("生产单位") + 1]) if "生产单位" in new_list and new_list.index("生产单位") + 1 < len(new_list) else "",
- "批准文号": (new_list[new_list.index("批准文号") + 1]) if "批准文号" in new_list and new_list.index("批准文号") + 1 < len(new_list) else ""
- }
- print(f'当前说明书字典数据:{res_data}')
- return res_data
- '''
- '''
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- self.d.xpath('//*[@text="查看详细说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- # 1) 先向上滑动一次,触发“加载更多”出现
- self.d.swipe(200, 1000, 200, 300, 0.4)
- time.sleep(0.3)
- # 2) 再进入“出现就点”的循环
- while self.d.xpath('//*[@text="加载更多"]').click_exists(timeout=1):
- time.sleep(0.2)
- self.d.swipe(200, 1000, 200, 300, 0.4)
- # self.d.swipe(200, 1000, 200, 62)
- time.sleep(0.2)
- # 一次性获取所有文本
- texts = [
- node.text.strip()
- # for node in self.d.xpath('//android.widget.TextView').all()
- for node in self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.TextView').all()
- if node.text and node.text.strip() and node.text != '加载更多'
- ]
- print(f'当前说明书列表数据:{texts}')
- # 提取关键字段
- def safe_get(key):
- # try:
- # idx = texts.index(key)
- # return texts[idx + 1] if idx + 1 < len(texts) else ""
- # except ValueError:
- # return ""
- try:
- idx = next(i for i, text in enumerate(texts) if text == key)
- return texts[idx + 1] if idx + 1 < len(texts) else ""
- except StopIteration:
- return ""
- res_data = {
- "有效期": safe_get("有效期"),
- "生产单位": safe_get("生产单位"),
- "批准文号": safe_get("批准文号")
- }
- print(f'当前说明书字典数据:{res_data}')
- return res_data
- '''
- '''
- def get_instructions_data(self):
- """
- 说明书键值对采集:连续两个 TextView 为一对,精确提取
- """
- # 1. 进入说明书
- self.d(text="说明").click()
- time.sleep(0.5)
- self.d(text="查看详细说明").click()
- time.sleep(0.5)
- # self.d(text="加载更多").click_exists(timeout=0.5)
- # 2. 找到说明书最外层 ScrollView(页面主体)
- scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.ScrollView")
- count = scroll_view.count
- print(f"找到的 ScrollView 数量: {count}")
- if not scroll_view.exists:
- return {"有效期": "", "生产单位": "", "批准文号": ""}
- # 3. 在 ScrollView 内再定位真正包含键值对的容器
- # 绝大多数美团说明书页面对应的是 ScrollView > ViewGroup > 若干 TextView
- kv_container = scroll_view.child(className="android.view.ViewGroup")
- if not kv_container.exists:
- kv_container = scroll_view # 降级:直接对 ScrollView 取子孙 TextView
- # 4. 滑动到底并收集所有 TextView(保留顺序)
- all_texts = []
- max_swipe = 5
- last_length = 0
- for _ in range(max_swipe):
- texts = kv_container.child(className="android.widget.TextView")
- #获取texts中的文本
- print(f'当前说明书列表数据:{texts}')
- current_texts = []
- self.loggerMT.info(f'说明书111')
- for tv in texts:
- try:
- txt = tv.get_text().strip()
- # txt = tv.info['text'].strip()
- except Exception:
- continue
- if txt and txt != "展开全文":
- current_texts.append(txt)
- self.loggerMT.info(f'说明书222')
- print(f'当前说明书列表数据:{current_texts}')
- # 去重
- if current_texts:
- current_texts = [t for t in current_texts if t not in all_texts]
- all_texts.extend(current_texts)
- # 判断是否到底
- # if not scroll_view.info.get("scrollable"):
- # break
- # 判断是否到底
- if len(all_texts) == last_length:
- break
- last_length = len(all_texts)
- # self.d.swipe_ext("up", scale=0.7)
- #向上滑动一次
- self.d.swipe(200, 1000, 200, 300, 0.2)
- time.sleep(0.2)
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- # 5. 成对解析
- res_data = {"有效期": "", "生产单位": "", "批准文号": ""}
- for i in range(len(all_texts) - 1):
- key = all_texts[i]
- val = all_texts[i + 1]
- if key in res_data:
- res_data[key] = val
- print(f'说明书文本共 {len(all_texts)} 条,提取结果: {res_data}')
- # time.sleep(1000000)
- return res_data
- '''
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- if self.d.xpath('//*[@text="查看详细说明"]').exists:
- self.d.xpath('//*[@text="查看详细说明"]').click()
- else:
- for i in range(8):
- if self.d.xpath('//*[@text="查看全部"]').exists:
- print('开始点击查看全部')
- break
- self.d.swipe_ext('down', 0.3)
- time.sleep(1)
- if self.d.xpath('//*[@text="查看全部"]').exists:
- print('开始点击查看全部2')
- break
- if self.d.xpath('//*[@text="查看全部"]').exists:
- self.d.xpath('//*[@text="查看全部"]').click()
- else:
- res_data = {
- "有效期": '',
- "生产单位": '',
- "批准文号": ''
- }
- self.loggerMT.info('获取到的说明书信息为空。')
- return res_data
- # time.sleep(random.randint(3, 5))
- time.sleep(0.5)
- # self.d.xpath('//*[@text="加载更多"]').click_exists()
- # loop_page = 5
- # new_list = list()
- # new_list = []
- for ii in range(8):
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- time.sleep(0.2)
- break
- else:
- self.d.swipe(200, 1000, 200, 300, 0.3)
- for iii in range(10):
- if self.d.xpath('//*[@text="生产单位"]').exists and self.d.xpath('//*[@text="批准文号"]').exists:
- break
- else:
- self.d.swipe(200, 1300, 200, 300, 0.3)
- instruction_path = self.screenshot_instruction()
- print(f"instruction_path= {instruction_path}")
- time.sleep(2)
- ocr_res = self.get_ocr_res_image(instruction_path)
- # print(f'ocr_res:{ocr_res}')
- if ocr_res:
- # 获取有效期的下一个数据
- validity = self.get_next_data(ocr_res, '有效期')
- # 获取批准文号的下一个数据
- approval_number = self.get_next_data(ocr_res, '批准文号')
- # 获取生产单位的下一个数据
- manufacturer = self.get_next_data(ocr_res, '生产单位')
- else:
- validity = ''
- approval_number = ''
- manufacturer = ''
- # print("有效期:", validity)
- # print("批准文号:", approval_number)
- # print("生产单位:", manufacturer)
- res_data = {
- "有效期": validity,
- "生产单位": manufacturer,
- "批准文号": approval_number
- }
- print(f"res_data={res_data}")
- time.sleep(1)
- self.delete_instruction_screenshot(instruction_path)
- return res_data
- def has_instructions(self):
- """
- 是否有说明书
- :return:
- """
- # 没有说明书的无法采集具体数据
- time.sleep(self.get_sleep_time())
- is_has_instructions = False
- for i in range(8):
- if self.d.xpath('//*[@text="说明"]').exists:
- print(f"第{i}次有说明书1")
- is_has_instructions = True
- break
- self.d.swipe_ext('down', 0.3)
- time.sleep(1)
- # detail_info = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info
- # bounds = detail_info['bounds']
- # height = bounds['bottom'] - bounds['top']
- # if self.d.xpath('//*[@text="进店"]').exists and height > 100:
- if self.d.xpath('//*[@text="说明"]').exists:
- is_has_instructions = True
- print(f"第{i}次有说明书2")
- break
- # is_has_instructions = self.d.xpath('//*[@text="说明"]').exists
- return is_has_instructions
- def has_shop(self):
- """
- 是否有进店按钮
- :return:
- """
- # self.d.swipe_ext('up', 0.1)
- time.sleep(self.get_sleep_time())
- is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists
- return is_has_enter_shop
- # 获取商品对应的店铺信息
- def get_license_info_ex(self):
- # self.enter_shop()
- self.safe_exec(self.enter_shop)
- # self.enter_shoper()
- result = self.safe_exec(self.enter_shoper)
- if result == False:
- license_info_data = {'contact_address': '', 'qualification_number': '', 'business_license_company': '',
- 'business_license_address': ''}
- return license_info_data
- for i in range(10):
- if self.d.xpath('//*[@text="查看商家资质"]').exists:
- print(f"第{i}次有商家资质")
- break
- else:
- print(f"第{i}次没有商家资质")
- time.sleep(self.get_sleep_time())
- # 获取地址
- # contact_address = self.get_shop_address()
- contact_address = self.safe_exec(self.get_shop_address)
- # time.sleep(50000)
- ###
- # self.scan_shoper_license()
- self.safe_exec(self.scan_shoper_license)
- # 获取资质编码
- # qualification_number = self.get_qualification_number()
- qualification_number = self.safe_exec(self.get_qualification_number)
- # qualification_number 不为None继续下一步
- if qualification_number:
- # 营业执照公司名称
- business_license_company = ''
- # 营业执照地址
- business_license_address = ''
- self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- cropped_screenshot_path = self.screenshot_the_business_license(qualification_number)
- print(f'cropped_screenshot_path:{cropped_screenshot_path}')
- # if qualification_number:
- # cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- # else:
- # cropped_screenshot_path = 'cropped_screenshot.png'
- # ocr_res = self.get_ocr_res('cropped_screenshot.png')
- ocr_res = self.get_ocr_res(cropped_screenshot_path)
- print(f'ocr_res:{ocr_res}')
- # 获取ocr_res 中的地址、单位名称
- if ocr_res:
- if '单位名称' in ocr_res.keys():
- business_license_company = ocr_res['单位名称']
- if '地址' in ocr_res.keys():
- business_license_address = ocr_res['地址']
- license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number,
- 'business_license_company': business_license_company,
- 'business_license_address': business_license_address}
- else:
- license_info_data = {'contact_address': contact_address, 'qualification_number': '',
- 'business_license_company': '', 'business_license_address': ''}
- return license_info_data
- """暂不用该功能
- def get_license_info(self):
- self.enter_shop()
- self.enter_shoper()
- self.scan_shoper_license()
- # 获取资质编码
- qualification_number = self.get_qualification_number()
- if qualification_number:
- table_license_info = self.get_table_license_info(qualification_number)
- if table_license_info:
- return {
- '单位名称': table_license_info[0],
- '地址': table_license_info[1],
- '社会信用代码': table_license_info[2]
- }
- else:
- # operate_no = random.randint(0, 1)
- self.d.click(0.603, 0.27)
- # if operate_no == 0:
- # self.d.xpath('//*[@text="营业执照"]').click()
- # else:
- # self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license()
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- return ocr_res
- # operate_no = random.randint(0, 1)
- self.d.click(0.603, 0.27)
- # if operate_no == 0:
- # self.d.xpath('//*[@text="营业执照"]').click()
- # else:
- # self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license()
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- return ocr_res
- """
- def distinct_target(self):
- result = False
- position_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- is_position = self.d.xpath(position_xpath).exists
- is_position2 = self.d.xpath(position_xpath2).exists
- xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- is_position5 = self.d.xpath(xpath).exists
- is_position6 = self.d.xpath(xpath2).exists
- is_position7 = self.d.xpath(xpath3).exists
- is_position8 = self.d.xpath(xpath4).exists
- # print(f"is_position = {is_position}")
- # print(f"is_position2 = {is_position2}")
- if is_position or is_position2 or is_position5 or is_position6 or is_position7 or is_position8:
- result = True
- return result
- # return is_position
- def click_element_with_retry(self, xpath, max_retries=5, timeout=5):
- """
- 带重试机制的点击函数
- """
- for attempt in range(max_retries):
- try:
- if self.d.xpath(xpath).exists:
- self.d.xpath(xpath).click()
- print(f"第{attempt + 1}次尝试点击成功")
- return True
- else:
- print(f"第{attempt + 1}次尝试:元素不存在")
- except Exception as e:
- print(f"第{attempt + 1}次尝试失败: {e}")
- if attempt < max_retries - 1:
- time.sleep(1) # 等待1秒后重试
- print(f"经过{max_retries}次尝试后点击失败")
- return False
- def enter_target_page(self):
- self.d.xpath('//*[@content-desc="看病买药"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click()
- time.sleep(self.get_sleep_time())
- self.d.send_keys(self.search_key, clear=True)
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@text="搜索"]').click()
- time.sleep(self.get_sleep_time())
- # content_frame = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]').exists
- # print(content_frame)
- # position_xpath1 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- # position_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]'
- # if self.d.xpath(position_xpath1).exists:
- # print("position_xpath1 exist")
- # elif self.d.xpath(position_xpath2).exists:
- # print("position_xpath2 exist")
- # else:
- # print("position_xpath not exist")
- # time.sleep(10000)
- # 增加点击快递送
- self.click_express_send()
- time.sleep(self.get_sleep_time())
- # 增加点击价格每次进来都需要点击,为了配合从多少页到多少页的采集
- # 使用
- self.click_element_with_retry('//*[@text="价格"]')
- # self.d.xpath('//*[@text="价格"]').click()
- time.sleep(self.get_sleep_time())
- def click_express_send(self):
- # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]//android.widget.HorizontalScrollView[last()]'
- slide_xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- slide_xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- slide_xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- slide_xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]'
- for i in range(1, 3):
- if self.d.xpath(slide_xpath).exists:
- bounds = self.d.xpath(slide_xpath).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动1')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(slide_xpath2).exists:
- bounds = self.d.xpath(slide_xpath2).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动2')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(slide_xpath3).exists:
- bounds = self.d.xpath(slide_xpath3).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动3')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(slide_xpath4).exists:
- bounds = self.d.xpath(slide_xpath4).info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- print(f'top={top}')
- print(f'bottom={bottom}')
- y = (top + bottom) // 2
- print(f'y={y}')
- self.loggerMT.info('开始滑动4')
- self.d.swipe(500, y, 100, y, 0.5)
- time.sleep(self.get_sleep_time())
- break
- max_retry = 5 # 最多尝试次数
- for idx in range(1, max_retry + 1):
- # xpath= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]'
- xpath = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath2 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath3 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- xpath4 = '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- # print(f"xpath:{xpath}")
- # scroll_view = self.d(resourceId="com.sankuai.meituan:id/container") .child(className="android.widget.HorizontalScrollView")
- if self.d.xpath(xpath).exists:
- self.d.xpath(xpath).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath快递送成功")
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(xpath2).exists:
- self.d.xpath(xpath2).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath2快递送成功")
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(xpath3).exists:
- self.d.xpath(xpath3).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath3快递送成功")
- time.sleep(self.get_sleep_time())
- break
- elif self.d.xpath(xpath4).exists:
- self.d.xpath(xpath4).click()
- # time.sleep(self.get_sleep_time())
- print(f"第{idx}次点击xpath4快递送成功")
- time.sleep(self.get_sleep_time())
- break
- else:
- print(f"第{idx}次点击xpath或xpath2或xpath3快递送都失败")
- time.sleep(self.get_sleep_time())
- # xpath2= '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.HorizontalScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- # if self.d.xpath(xpath2).exists:
- # self.d.xpath(xpath2).click()
- # print(f"第{idx}次点击xpath2快递送成功")
- # time.sleep(self.get_sleep_time())
- # break
- """暂不用该功能
- def get_table_license_info(self, qualification_number):
- try:
- sql = f'select business_license_company,city,credit_code from mt_drug where credit_code = "{qualification_number}"'
- self.mysql_client.cur.execute(sql)
- res = self.mysql_client.cur.fetchone()
- return res
- except:
- return None
- """
- # def get_clipboard(self):
- # """通过ADB获取Android手机剪贴板内容"""
- # try:
- # result = subprocess.run(
- # ["adb", "shell", "am", "broadcast", "-a", "clipper.get"],
- # capture_output=True,
- # text=True,
- # timeout=5
- # )
- # print(f"获取剪贴板结果: {result.stdout}")
- # # 解析返回信息中的剪贴板内容
- # for line in result.stdout.splitlines():
- # if "data=" in line:
- # return line.split("data=")[1].strip()
- # return ""
- # except Exception as e:
- # print("获取剪贴板失败:", e)
- # return ""
- # def get_clipboard(self):
- # """读取 Android 剪贴板(系统自带命令)"""
- # try:
- # text = subprocess.check_output(
- # ["adb", "shell", "cmd", "clipboard", "get"],
- # text=True, timeout=5, stderr=subprocess.STDOUT
- # ).strip()
- # print(f"获取剪贴板结果: {text}")
- # return text if text else ""
- # except Exception as e:
- # print("获取剪贴板失败:", e)
- # return ""
- def get_clipboard(self):
- time.sleep(1)
- self.loggerMT.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息
- clipboard_content = self.d.clipboard
- if clipboard_content is None:
- return ''
- return clipboard_content.strip()
- # return self.d.clipboard.strip()
- def clear_clipboard(self):
- self.d.set_clipboard("", "text/plain")
- # def clear_clipboard(self):
- # """清空手机剪贴板:写入空字符串(subprocess 版)"""
- # try:
- # subprocess.run(
- # ["adb", "shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", " "],
- # check=True,
- # capture_output=True,
- # text=True,
- # timeout=5
- # )
- # except subprocess.CalledProcessError as e:
- # print("ADB 清空失败:", e.stderr)
- # def clear_clipboard():
- # """清空手机剪贴板:写入空字符串"""
- # try:
- # adb_shell(["shell", "am", "broadcast", "-a", "clipper.set", "-e", "text", ""])
- # except subprocess.CalledProcessError as e:
- # print("ADB 清空失败:", e.output)
- # 获取一个商品的数据、商品对应的店铺的数据
- def get_product_link(self):
- product_link = ''
- # 两种可能的“···”按钮
- dots_xpaths = [
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]',
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]'
- ]
- max_retry = 5 # 最多尝试次数
- for idx in range(1, max_retry + 1):
- if product_link: # 已经拿到则退出
- break
- for xp in dots_xpaths:
- if self.d.xpath(xp).exists:
- print(f'{idx}-进入分享点点点')
- self.loggerMT.info(f'{idx}-进入分享点点点')
- # #先清空剪贴板的内容
- # self.clear_clipboard()
- # print("清空剪贴板内容成功。")
- self.d.xpath(xp).click()
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- link_xpath = '//*[@text="复制链接"]'
- if self.d.xpath(link_xpath).exists:
- self.d.xpath(link_xpath).click()
- time.sleep(1)
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'{idx}-商品链接:{product_link}')
- self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- break # 找到并执行后跳出内层循环
- else:
- print(f'{idx}-商品链接:{product_link}')
- self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- product_link = ''
- # self.d.xpath('//*[@text="复制链接"]').click_exists()
- # time.sleep(1)
- # product_link = self.get_clipboard()
- # time.sleep(0.5)
- # print(f'{idx}-商品链接:{product_link}')
- # self.loggerMT.info(f'{idx}-商品链接:{product_link}')
- # break # 找到并执行后跳出内层循环
- if not product_link and idx < max_retry:
- time.sleep(0.5) # 最后一次不需要再等待
- return product_link
- def integrate_data(self):
- # 测试说明书详情:
- # instructions_info = self.safe_exec(self.get_instructions_data)
- # time.sleep(1000000)
- # 测试店铺信息
- # license_info = self.safe_exec(self.get_license_info_ex)
- # time.sleep(1000000)
- # 测试定位地址
- # 获取链接开始
- # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- # 1、点击页面的... 先判断元素是否存在
- '''
- if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- print('1-进入分享点点点111')
- self.loggerMT.info('1-进入分享点点点111')
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- #点击分享商品
- # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- #获取剪切板的数据
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'1-商品链接:{product_link}')
- self.loggerMT.info(f'1-商品链接:{product_link}')
- #清空剪切板
- # self.clear_clipboard()
- # if self.d.xpath('//*[@text="加载更多"]').click_exists():
- # self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- # if self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').exists:
- # self.d.xpath('//android.support.v7.widget.RecyclerView/android.view.ViewGroup[3]/android.widget.ImageView[1]').click()
- # #获取剪切板的数据
- # product_link = self.get_clipboard()
- # time.sleep(0.5)
- # print(f'商品链接:{product_link}')
- # #清空剪切板
- # self.clear_clipboard()
- # else:
- # print('未找到分享按钮111')
- elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- print('1-进入分享点点点222')
- self.loggerMT.info('1-进入分享点点点222')
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- #获取剪切板的数据
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'1-商品链接:{product_link}')
- self.loggerMT.info(f'1-商品链接:{product_link}')
- #如果为获取到product_link 则等待0.5秒再获取
- if not product_link:
- time.sleep(0.5)
- if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- print('2-进入分享点点点111')
- self.loggerMT.info('2-进入分享点点点111')
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- #点击分享商品
- # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- #获取剪切板的数据
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'2-商品链接:{product_link}')
- self.loggerMT.info(f'2-商品链接:{product_link}')
- elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- print('2-进入分享点点点222')
- self.loggerMT.info('2-进入分享点点点222')
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- #获取剪切板的数据
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'2-商品链接:{product_link}')
- self.loggerMT.info(f'2-商品链接:{product_link}')
- #如果为获取到product_link 则等待0.5秒再获取
- if not product_link:
- time.sleep(0.5)
- if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- print('3-进入分享点点点111')
- self.loggerMT.info('3-进入分享点点点111')
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- #点击分享商品
- # if self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- #获取剪切板的数据
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'3-商品链接:{product_link}')
- self.loggerMT.info(f'3-商品链接:{product_link}')
- elif self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').exists:
- print('3-进入分享点点点222')
- self.loggerMT.info('3-进入分享点点点222')
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[3]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
- time.sleep(0.2)
- self.d.xpath('//*[@text="分享商品"]').click_exists()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- #获取剪切板的数据
- product_link = self.get_clipboard()
- time.sleep(0.5)
- print(f'3-商品链接:{product_link}')
- self.loggerMT.info(f'3-商品链接:{product_link}')
- '''
- # 获取链接结束
- """
- 整合数据
- :return:
- """
- # title_info = self.get_title() # 药品,规格
- # title_info = self.safe_exec(self.get_title) # 药品,规格
- product, specifications = self.safe_exec(self.get_title) # 药品,规格
- if product:
- # product, specifications = title_info
- # 如果关键字包含999 则 product必须包含999 和 999后面的那段字符串 ps 999感冒灵颗粒必须包含:"999"和"感冒灵颗粒"
- if '999' in self.search_key:
- if self.search_key == '999皮炎平曲安奈德益康唑乳膏30':
- temp_search_key = self.search_key.replace('999皮炎平', '')
- elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30':
- temp_search_key = self.search_key.replace('999必无忧', '')
- elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g':
- temp_search_key = self.search_key.replace('999必无忧', '')
- elif self.search_key == '999速复康布洛芬缓释胶囊':
- temp_search_key = self.search_key.replace('999速复康', '')
- elif self.search_key == '999选平硝酸咪康唑乳膏20g':
- temp_search_key = self.search_key.replace('999选平', '')
- elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20':
- temp_search_key = self.search_key.replace('999皮炎平', '')
- else:
- temp_search_key = self.search_key.replace('999', '')
- if self.search_key == '999糠酸莫米松凝胶15':
- temp_search_key = temp_search_key.replace('15', '')
- elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30':
- temp_search_key = temp_search_key.replace('30', '')
- elif self.search_key == '999抗病毒口服液10ml*6支/盒':
- temp_search_key = temp_search_key.replace("10ml*6支/盒", "")
- elif self.search_key == '999复方金银花颗粒10g':
- temp_search_key = temp_search_key.replace("10g", "")
- elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒':
- temp_search_key = temp_search_key.replace("15g*15袋/盒", "")
- elif self.search_key == '999复方氨酚烷胺胶囊6粒':
- temp_search_key = temp_search_key.replace("6粒", "")
- elif self.search_key == '999可调式生理性海水鼻腔喷雾50':
- temp_search_key = temp_search_key.replace("50", "")
- elif self.search_key == '999止泻利颗粒15g*8':
- temp_search_key = temp_search_key.replace("15g*8", "")
- elif self.search_key == '999必无忧盐酸特比萘芬喷雾剂30':
- temp_search_key = temp_search_key.replace("30", "")
- elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g':
- temp_search_key = temp_search_key.replace("15g", "")
- elif self.search_key == '999复方苦参肠炎康片12片':
- temp_search_key = temp_search_key.replace("12片", "")
- elif self.search_key == '999强力枇杷露16袋':
- temp_search_key = temp_search_key.replace("16袋", "")
- elif self.search_key == '999三蛇胆川贝膏138':
- temp_search_key = temp_search_key.replace("138", "")
- elif self.search_key == '999抗病毒口服液10ml*12':
- temp_search_key = temp_search_key.replace("10ml*12", "")
- elif self.search_key == '999抗病毒口服液10ml*10':
- temp_search_key = temp_search_key.replace("10ml*10", "")
- elif self.search_key == '999强力枇杷露120ml':
- temp_search_key = temp_search_key.replace("120ml", "")
- elif self.search_key == '999强力枇杷露150ml':
- temp_search_key = temp_search_key.replace("150ml", "")
- elif self.search_key == '999糠酸莫米松乳膏10g支':
- temp_search_key = temp_search_key.replace("10g支", "")
- elif self.search_key == '999选平硝酸咪康唑乳膏20g':
- temp_search_key = temp_search_key.replace("20g", "")
- elif self.search_key == '999感冒清热颗粒(无糖)6g':
- temp_search_key = temp_search_key.replace("(无糖)6g", "")
- elif self.search_key == '999壮骨关节丸6g*20':
- temp_search_key = temp_search_key.replace("6g*20", "")
- elif self.search_key == '999正天丸6g*15':
- temp_search_key = temp_search_key.replace("6g*15", "")
- elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20':
- temp_search_key = temp_search_key.replace("20", "")
- elif self.search_key == '999糠酸莫米松凝胶10':
- temp_search_key = temp_search_key.replace("10", "")
- elif self.search_key == '999板蓝根颗粒10g*20':
- temp_search_key = temp_search_key.replace("10g*20", "")
- elif self.search_key == '999复方氨酚烷胺胶囊10粒':
- temp_search_key = temp_search_key.replace("10粒", "")
- elif self.search_key == '999复方氨酚烷胺胶囊12粒':
- temp_search_key = temp_search_key.replace("12粒", "")
- elif self.search_key == '999咽炎片0.26g*12片*2板':
- temp_search_key = temp_search_key.replace("0.26g*12片*2板", "")
- elif self.search_key == '999小儿止咳糖浆120':
- temp_search_key = temp_search_key.replace("120", "")
- elif self.search_key == '999小儿止咳糖浆225':
- temp_search_key = temp_search_key.replace("225", "")
- elif self.search_key == '999小儿感冒颗粒6g*10':
- temp_search_key = temp_search_key.replace("6g*10", "")
- elif self.search_key == '999小儿感冒颗粒6g*24':
- temp_search_key = temp_search_key.replace("6g*24", "")
- elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋':
- temp_search_key = temp_search_key.replace("6g*10袋", "")
- elif self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋':
- temp_search_key = temp_search_key.replace("6g*20袋", "")
- elif self.search_key == '999小儿咽扁颗粒8g*10袋':
- temp_search_key = temp_search_key.replace("8g*10袋", "")
- elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋':
- temp_search_key = temp_search_key.replace("2.5g*10袋", "")
- elif self.search_key == '999感冒清热颗粒12g*18':
- temp_search_key = temp_search_key.replace("12g*18", "")
- if '999' not in product or temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- else:
- if self.search_key == '史达功右美沙芬愈创甘油醚糖浆120':
- temp_search_key = self.search_key.replace('史达功', '')
- temp_search_key = temp_search_key.replace('120', '')
- if '史达功' not in product or temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- elif self.search_key == '三九胃泰养胃舒颗粒8袋':
- temp_search_key = self.search_key.replace('三九胃泰', '')
- temp_search_key = temp_search_key.replace('8袋', '')
- if '三九胃泰' not in product or temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- elif self.search_key == '今维多赐多康牌蛋白粉':
- temp_search_key = self.search_key.replace('今维多', '')
- if '今维多' not in product or temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4':
- temp_search_key = self.search_key.replace('佳美舒', '')
- temp_search_key = temp_search_key.replace('4', '')
- if '佳美舒' not in product or temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- elif self.search_key == '三九胃泰颗粒20g*10':
- temp_search_key = self.search_key.replace('20g*10', '')
- if temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- elif self.search_key == '三九胃泰颗粒20g*6袋':
- temp_search_key = self.search_key.replace('20g*6袋', '')
- if temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- elif self.search_key == '顺峰康王酮康他索乳膏':
- temp_search_key = self.search_key.replace('顺峰康王', '')
- if '顺峰康王' not in product or temp_search_key not in product:
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- else:
- if self.search_key not in product.replace(' ', ''):
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- # if self.search_key not in product.replace(' ', ''):
- # self.swipe_back(1)
- # self.unrelated_data += 1
- # return
- else:
- self.swipe_back(1)
- return
- min_price = self.drug_price() # 最低价格
- # 商品链接
- product_link = self.get_product_link()
- # 判断是否有自营的文本,有的话不需要获取店铺的信息
- if self.d.xpath('//*[@text="自营"]').exists:
- shop = "美团自营大药房(快递电商)"
- # 爬取日期
- scrape_date = self.get_current_date()
- # scrape_date = "2025-07-18"
- dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- 'platform': '美团'}
- print(f'当前数据:{dup_data}')
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.swipe_back(1)
- return
- else:
- for i in range(8):
- if self.d.xpath('//*[@text="进店"]').exists:
- print('开始获取店铺名1')
- break
- self.d.swipe_ext('up', 0.3)
- time.sleep(1)
- # detail_info = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info
- # bounds = detail_info['bounds']
- # height = bounds['bottom'] - bounds['top']
- # if self.d.xpath('//*[@text="进店"]').exists and height > 100:
- if self.d.xpath('//*[@text="进店"]').exists:
- print('开始获取店铺名2')
- break
- shop = self.get_shop_name()
- # 爬取日期
- scrape_date = self.get_current_date()
- # scrape_date = "2025-07-18"
- dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- 'platform': '美团'}
- print(f'当前数据:{dup_data}')
- # 获取店铺信息开始
- # 暂时不获取店铺信息 start
- is_has_enter_shop = self.has_shop()
- # 需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品
- shop_is_exists = self.shop_is_exists_database(shop)
- # 存在进店 并且店铺的名称不包含美团官方的字样
- print(f"已采集{self.shop_data_num}家店铺数据")
- if is_has_enter_shop and '美团官方' not in shop and '美团自营' not in shop and not shop_is_exists and self.shop_data_num < 500:
- # license_info = self.get_license_info_ex()
- license_info = self.safe_exec(self.get_license_info_ex)
- contact_address = license_info['contact_address']
- qualification_number = license_info['qualification_number']
- business_license_company = license_info['business_license_company']
- business_license_address = license_info['business_license_address']
- save_shop_data = {
- 'shop': shop,
- 'contact_address': contact_address,
- 'qualification_number': qualification_number,
- 'scrape_date': scrape_date,
- 'business_license_company': business_license_company,
- 'business_license_address': business_license_address,
- 'platform': '美团'
- }
- self.save_shop_info_to_database(save_shop_data)
- self.shop_data_num += 1 # 店铺数据数量+1
- self.swipe_back(2)
- else:
- print('不采集店铺信息')
- # 获取店铺信息结束
- # 暂时不获取店铺信息 end
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.swipe_back(1)
- return
- if not shop:
- print('未获取到店铺名:开始回退')
- self.swipe_back(1)
- return
- if not shop or '自营' in shop:
- self.swipe_back(1)
- return
- time.sleep(self.get_sleep_time())
- # 生产日期为空
- manufacture_date = ''
- # 执政信息
- # if is_has_enter_shop:
- # license_info = self.get_license_info()
- # business_license_company = license_info["单位名称"]
- # credit_code = license_info['社会信用代码']
- # city_str = license_info['地址']
- # # 先把省份啥的替换掉
- # city_sub_str = re.sub(r'[u4e00-\u9fa5]+省', '', city_str)
- # try:
- # city = re.search(r'[\u4e00-\u9fa5]+?(市|区|县)', city_sub_str).group(0)
- # except:
- # city = city_sub_str
- # try:
- # province = self.city2province[city]
- # except:
- # province = ''
- # self.swipe_back(2)
- # else:
- # business_license_company = ''
- # credit_code = ''
- # city = ''
- # province = ''
- business_license_company = ''
- credit_code = ''
- city = ''
- province = ''
- expiry_date = ''
- manufacturer = ''
- approval_number = ''
- # 暂时不获取说明书信息 start
- # 是否存在说明书
- # is_has_instructions = self.has_instructions()
- # 有的药品没有说明书,直接默认
- if self.search_key == '今维多赐多康牌蛋白粉':
- expiry_date = '18个月'
- manufacturer = '华润圣海健康科技有限公司'
- approval_number = '食健备G202437001992'
- elif self.search_key == '佳美舒阿奇霉素肠溶胶囊4':
- expiry_date = '24个月'
- manufacturer = '浙江华润三九众益制药有限公司'
- approval_number = '国药准字H20090152'
- elif self.search_key == '999可调式生理性海水鼻腔喷雾50':
- expiry_date = '3年'
- manufacturer = '江苏萨瑞斯医疗科技有限公司'
- approval_number = '苏械注准20212140025'
- elif self.search_key == '999蒲地蓝消炎片':
- expiry_date = '24个月'
- manufacturer = '特一药业集团股份有限公司'
- approval_number = '国药准字Z20063596'
- elif self.search_key == '999养胃舒颗粒':
- expiry_date = '36个月'
- manufacturer = '合肥华润神鹿药业有限公司'
- approval_number = '国药准字Z34020289'
- elif self.search_key == '999糠酸莫米松凝胶15':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20080010'
- elif self.search_key == '999黄芪精':
- expiry_date = '36个月'
- manufacturer = '台州南峰药业有限公司'
- approval_number = '国药准字Z33020783'
- elif self.search_key == '999复方感冒灵颗粒':
- expiry_date = '24个月'
- manufacturer = '华润三九(郴州)制药有限公司'
- approval_number = '国药准字Z43020334'
- elif self.search_key == '999皮炎平曲安奈德益康唑乳膏30':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20074155'
- elif self.search_key == '史达功右美沙芬愈创甘油醚糖浆120':
- expiry_date = '暂定24个月,具体有效期以实物说明书为准'
- manufacturer = '史达德药业(北京)有限公司'
- approval_number = '国药准字H11021837'
- elif self.search_key == '999速复康布洛芬缓释胶囊':
- expiry_date = '24个月'
- manufacturer = '北京红林制药有限公司'
- approval_number = '国药准字H20074172'
- elif self.search_key == '999复方板蓝根颗粒15g*15袋/盒':
- expiry_date = '24个月'
- manufacturer = '重庆科瑞东和制药有限责任公司'
- approval_number = '国药准字Z50020420'
- elif self.search_key == '999必无忧盐酸特比萘芬乳膏15g':
- expiry_date = '24个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20073954'
- elif self.search_key == '999维生素C咀嚼片':
- expiry_date = '24个月'
- manufacturer = '甘肃成纪生物药业有限公司'
- approval_number = '国药准字H62021166'
- elif self.search_key == '999强力枇杷露120ml':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字Z36021533'
- elif self.search_key == '999强力枇杷露150ml':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字Z36021533'
- elif self.search_key == '999抗病毒口服液10ml*10' or self.search_key == '999抗病毒口服液10ml*12':
- expiry_date = '24个月'
- manufacturer = '杭州华润老桐君药业有限公司'
- approval_number = '国药准字Z33020518'
- elif self.search_key == '999精氨酸布洛芬颗粒':
- expiry_date = '暂定36个月'
- manufacturer = '华润三九(唐山)药业有限公司'
- approval_number = '国药准字H20070139'
- elif self.search_key == '999糠酸莫米松乳膏10g支':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20074090'
- elif self.search_key == '999选平硝酸咪康唑乳膏20g':
- expiry_date = '24个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20074079'
- elif self.search_key == '999感冒清热颗粒(无糖)6g':
- expiry_date = '36个月'
- manufacturer = '合肥华润神鹿药业有限公司'
- approval_number = '国药准字Z20055023'
- elif self.search_key == '999银菊清咽颗粒':
- expiry_date = '30个月'
- manufacturer = '合肥华润神鹿药业有限公司'
- approval_number = '国药准字Z20026680'
- elif self.search_key == '999阿奇霉素片':
- expiry_date = '48个月'
- manufacturer = '浙江华润三九众益制药有限公司'
- approval_number = '国药准字H20084458'
- elif self.search_key == '999补脾益肠丸':
- expiry_date = '24个月'
- manufacturer = '惠州市九惠制药股份有限公司'
- approval_number = '国药准字Z44023376'
- elif self.search_key == '999壮骨关节丸6g*20':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44023377'
- elif self.search_key == '999壮骨关节胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z20080055'
- elif self.search_key == '999正天丸6g*15':
- expiry_date = '30个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020711'
- elif self.search_key == '999正天胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z20010142'
- elif self.search_key == '三九胃泰胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020704'
- elif self.search_key == '三九胃泰颗粒20g*10':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020705'
- elif self.search_key == '999感冒灵颗粒':
- expiry_date = '24个月'
- manufacturer = '华润三九(枣庄)药业有限公司'
- approval_number = '国药准字Z44021940'
- elif self.search_key == '999皮炎平复方醋酸地塞米松乳膏20':
- expiry_date = '36个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字H44024170'
- elif self.search_key == '三九胃泰颗粒20g*6袋':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020705'
- elif self.search_key == '顺峰康王酮康他索乳膏':
- expiry_date = '24个月'
- manufacturer = '广东华润顺峰药业有限公司'
- approval_number = '国药准字H10980204'
- elif self.search_key == '999糠酸莫米松凝胶10':
- expiry_date = '36个月'
- manufacturer = '华润三九(南昌)药业有限公司'
- approval_number = '国药准字H20080010'
- elif self.search_key == '999板蓝根颗粒10g*20':
- expiry_date = '36个月'
- manufacturer = '广东恒诚制药股份有限公司'
- approval_number = '国药准字Z44021520'
- elif self.search_key == '999复方氨酚烷胺胶囊' or self.search_key == '999复方氨酚烷胺胶囊12粒' or self.search_key == '999复方氨酚烷胺胶囊10粒' or self.search_key == '999复方氨酚烷胺胶囊6粒':
- expiry_date = '36个月'
- manufacturer = '华润三九(唐山)药业有限公司'
- approval_number = '国药准字H13021912'
- elif self.search_key == '999咽炎片0.26g*12片*2板':
- expiry_date = '24个月'
- manufacturer = '华润三九(黄石)药业有限公司'
- approval_number = '国药准字Z42021062'
- elif self.search_key == '999小儿止咳糖浆120' or self.search_key == '999小儿止咳糖浆225':
- expiry_date = '24个月'
- manufacturer = '华润三九(雅安)药业有限公司'
- approval_number = '国药准字Z51020675'
- elif self.search_key == '999小儿感冒颗粒6g*10' or self.search_key == '999小儿感冒颗粒6g*24':
- expiry_date = '36个月'
- manufacturer = '华润三九(枣庄)药业有限公司'
- approval_number = '国药准字Z37021392'
- elif self.search_key == '999小儿氨酚黄那敏颗粒6g*10袋' or self.search_key == '999小儿氨酚黄那敏颗粒6g*20袋':
- expiry_date = '36个月'
- manufacturer = '华润三九(黄石)药业有限公司'
- approval_number = '国药准字H42022510'
- elif self.search_key == '999感冒灵胶囊':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44021939'
- elif self.search_key == '999小儿咽扁颗粒8g*10袋':
- expiry_date = '24个月'
- manufacturer = '华润三九(黄石)药业有限公司'
- approval_number = '国药准字Z42021105'
- elif self.search_key == '999小儿感冒宁颗粒2.5g*10袋':
- expiry_date = '18个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z20100067'
- elif self.search_key == '999感冒清热颗粒12g*18':
- expiry_date = '36个月'
- manufacturer = '山东新大陆制药有限公司'
- approval_number = '国药准字Z37020066'
- elif self.search_key == '999小柴胡颗粒':
- expiry_date = '24个月'
- manufacturer = '华润三九医药股份有限公司'
- approval_number = '国药准字Z44020709'
- else:
- is_has_instructions = self.safe_exec(self.has_instructions)
- # 说明书等信息
- if is_has_instructions:
- print('开始获取说明书信息')
- # instructions_info = self.get_instructions_data()
- instructions_info = self.safe_exec(self.get_instructions_data)
- if instructions_info['有效期'] is not None:
- expiry_date = instructions_info['有效期'].strip('。')
- if instructions_info['生产单位'] is not None:
- manufacturer = instructions_info['生产单位'].strip('。')
- if instructions_info['批准文号'] is not None:
- approval_number = instructions_info['批准文号'].strip('。')
- else:
- # 没有说明书不入库
- print('没有获取到说明书信息')
- self.swipe_back(1)
- return
- # 暂时不获取说明书信息 end
- self.unrelated_data = 0
- # 爬取省份
- scrape_province = '广东' # 这里先默认广东
- # 是否有货
- availability = ''
- save_data = {
- 'product': product,
- 'min_price': min_price,
- 'manufacture_date': manufacture_date,
- 'expiry_date': expiry_date,
- 'shop': shop,
- 'business_license_company': business_license_company,
- 'province': province,
- 'city': city,
- 'manufacturer': manufacturer,
- 'specification': specifications,
- 'approval_number': approval_number,
- 'product_link': product_link,
- 'scrape_date': scrape_date,
- 'scrape_province': scrape_province,
- 'availability': availability,
- 'credit_code': credit_code,
- 'platform': '美团',
- 'search_key': self.search_key,
- }
- self.save_to_database(save_data)
- # time.sleep(100000)
- time.sleep(self.get_sleep_time())
- if self.distinct_target():
- print('已到达搜索列表页')
- else:
- for i in range(1):
- print('在详情页')
- self.swipe_back(1)
- time.sleep(self.get_sleep_time())
- # 最外部有个定位按钮
- if self.distinct_target():
- break
- # 主函数
- # start_page:开始页,采集用
- # end_page:结束页,采集用
- # task_id:上报数据用
- # 添加max_duration_minutes参数
- def main(self, device_id, start_page, end_page, task_id, product_name, product_specs, max_duration_minutes=None,
- retry_count=0):
- # === 新增:初始化任务信息 ===
- self.task_id = task_id
- self.task_start_page = start_page
- self.task_end_page = end_page
- self.task_product_specs = product_specs
- self.task_product_name = product_name
- # self.current_page = start_page
- self.start_time = time.time()
- # === 新增:线程启动成功后更新状态为2 ===
- if self.task_id:
- try:
- self.update_task_status(2) # 状态2: 执行中
- self.loggerMT.info(f"任务 {task_id} 线程启动成功,状态已更新为2")
- except Exception as e:
- self.loggerMT.error(f"更新任务状态失败: {e}")
- # =====================================
- # 记录任务开始
- if task_id:
- reporter.start_task(task_id, start_page, end_page)
- # ========================
- # task_start_time = time.time() #任务开始时间
- task_scape_count = 0 # 任务采集数量初始化为0
- MAX_RETRY = 3 # 最大重试次数
- spider_no = 0
- # 计算超时时间(秒)
- timeout_seconds = None
- if max_duration_minutes:
- timeout_seconds = max_duration_minutes * 60
- self.connect_devices(device_id)
- time.sleep(self.get_sleep_time())
- # self.d.toast.show("测试toast", 20)
- # 启动全局弹窗监控
- self.monitor = SpiderMonitor(self)
- self.monitor.start()
- try:
- # 重新开启美团应用
- self.restart_app()
- # 搜索关键字
- # self.enter_target_page()
- self.safe_exec(self.enter_target_page)
- # === 新增:跳过前面的页面直到start_page start===
- if start_page > 1:
- self.loggerMT.info(f"跳过前 {start_page - 1} 页,从第 {start_page} 页开始采集")
- current_page = 1
- while current_page < start_page:
- # 检查是否需要暂停
- if self.monitor.pausing.is_set():
- self.wait_for_ready(self.monitor)
- # 检查是否到达底部
- if self.d.xpath('//*[@text="已经到底啦"]').exists:
- self.loggerMT.info(f"在第 {current_page} 页已到达底部,无法继续翻页")
- self.loggerMT.warning(f"未能到达目标页码 {start_page},实际只到达第 {current_page} 页")
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='completed',
- finish_status=1,
- force_end_page=end_page
- # force_end_page=idx
- )
- return
- # break
- # 滑动到下一页
- self.d.drag(300, 1400, 300, 400, 1)
- time.sleep(self.get_sleep_time())
- current_page += 1
- # 可选:添加页码日志
- self.loggerMT.debug(f"已翻到第 {current_page} 页")
- # 验证是否到达目标页码
- if current_page < start_page:
- self.loggerMT.error(f"翻页失败!目标页码:{start_page},实际到达:{current_page}")
- # 这里可以根据需要决定是否继续执行或抛出异常
- # return False 或 raise Exception
- else:
- self.loggerMT.info(f"成功翻到第 {start_page} 页,开始采集")
- for idx in range(start_page, end_page + 1):
- # === 新增:检查是否超过结束页 ===
- if idx > end_page:
- self.loggerMT.info(f"已采集到指定结束页 {end_page},停止采集")
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='completed',
- finish_status=1,
- force_end_page=end_page
- )
- return
- # === 新增:检查超时 ===
- if timeout_seconds and (time.time() - self.start_time) > timeout_seconds:
- print(f"任务 {task_id} 达到时间限制 {max_duration_minutes} 分钟,停止采集")
- self.loggerMT.info(f"任务 {task_id} 达到时间限制 {max_duration_minutes} 分钟,停止采集")
- # 上报未完成状态
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='completed',
- finish_status=0, # 0:未完成
- force_end_page=self.current_page
- )
- return
- # ====================
- # print(f'第{idx + 1}页')
- print(f'第{idx}页(指定范围: {start_page}-{end_page})')
- self.current_page = idx # 更新当前页码
- # === 新增:更新上报进度 ===
- if task_id:
- reporter.update_task_progress(
- task_id=task_id,
- actual_end_page=self.current_page
- )
- # ========================
- if spider_no > 30:
- time.sleep(60)
- spider_no = 0
- print('目前无关数据量: ', self.unrelated_data)
- # 检查是否需要暂停(验证码过多)
- if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY:
- print("频繁遇到验证码,暂停程序")
- # self.d.toast("请处理验证码后点击继续", 30)
- # 等待用户点击屏幕继续
- self.d.click(0, 0) # 无效点击,等待用户操作
- self.monitor.verification_count = 0
- if self.unrelated_data > 20:
- # 连续超过20个不达标的数据则停止采集
- self.loggerMT.info(f"连续20个数据不达标,品规:{self.search_key}")
- # === 新增:任务正常完成 ===
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='completed',
- finish_status=1, # 1:已完成
- force_end_page=end_page
- )
- # ========================
- return
- # 线程安全获取商品列表
- # drug_lis = self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all()
- # drug_lis = self.safe_list('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout', self.monitor)
- while True:
- if self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').exists:
- break
- time.sleep(1)
- drug_lis = self.safe_exec(
- self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all)
- lis_len = len(drug_lis)
- print(f'当前页面共有{lis_len}个商品')
- for idxx, drug_one in enumerate(drug_lis, start=1):
- bounds = drug_one.info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- # height = bottom - top
- print(f'当前商品bottom:{bottom}')
- print(f'当前商品top:{top}')
- # if 304 <= top and bottom <= 1475: # 默认高度241的才行
- if 304 <= top and bottom <= 1475: # 默认高度241的才行 1559
- # print('目标-->', drug_one.info)
- # drug_one.click()
- # 获取当前元素中的属性来判断是否要点击进入采集
- print(f"这页的第几个商品:{idxx}")
- product_title = ''
- price = ''
- shop_name = ''
- # 商品名称的xpath
- product_tittle_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- product_tittle_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(product_tittle_xpath).exists:
- product_title = self.d.xpath(product_tittle_xpath).text
- product_title = product_title[1:] if product_title.startswith('0') else product_title
- print(f"product_tittle_xpath列表当前商品名称:{product_title}")
- if self.task_product_name not in product_title or self.task_product_specs not in product_title:
- print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}")
- continue
- elif self.d.xpath(product_tittle_xpath2).exists:
- product_title = self.d.xpath(product_tittle_xpath2).text
- product_title = product_title[1:] if product_title.startswith('0') else product_title
- print(f"product_tittle_xpath2列表当前商品名称:{product_title}")
- print(f"search_key:{self.search_key}")
- if self.task_product_name not in product_title or self.task_product_specs not in product_title:
- print(f"当前商品名称:{product_title} 不包含关键字:{self.search_key}")
- continue
- else:
- print(f"列表当前商品名称不存在")
- # 价格
- price_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- price_xpath3 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(price_xpath).exists:
- price_str = self.d.xpath(price_xpath).text
- print(f"price_xpath列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search('[\d\.]+', price_str).group())
- elif self.d.xpath(price_xpath3).exists:
- price_str = self.d.xpath(price_xpath3).text
- print(f"price_xpath3列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search('[\d\.]+', price_str).group())
- else:
- price_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView'
- if self.d.xpath(price_xpath2).exists:
- price_str = self.d.xpath(price_xpath2).text
- print(f"price_xpath2列表当前商品价格:{price_str}")
- if price_str:
- price = float(re.search('[\d\.]+', price_str).group())
- else:
- print(f"列表当前商品价格不存在")
- # price_str = self.d.xpath(f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]//*[starts-with(@text,"¥")]').text
- print(f'列表获取到价格:{price}')
- # 店铺名称的xpath
- shop_name_xpath = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]'
- shop_name_xpath2 = f'//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[{idxx}]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.widget.FrameLayout[last()]/android.widget.TextView[1]'
- if self.d.xpath(shop_name_xpath).exists:
- shop_name = self.d.xpath(shop_name_xpath).text
- print(f"shop_name_xpath列表当前商品店铺名称:{shop_name}")
- elif self.d.xpath(shop_name_xpath2).exists:
- shop_name = self.d.xpath(shop_name_xpath2).text
- print(f"shop_name_xpath2列表当前商品店铺名称:{shop_name}")
- else:
- print(f"列表当前商品店铺名称不存在")
- # 如果商品的名称、价格和生产厂家都不存在则直接下一条数据。 跳过一些不是商品的数据。
- if product_title == '' and price == '' and shop_name == '':
- continue
- scrape_date = self.get_current_date()
- if product_title and price and shop_name:
- # 判断数据表中是否存在
- dup_data = {'product': product_title, 'min_price': price, 'shop': shop_name,
- 'scrape_date': scrape_date, 'platform': '美团'}
- if self.data_is_exists(dup_data):
- print('列表存在相同数据不入库')
- continue
- self.safe_exec(drug_one.click)
- print('点击目标药品完毕')
- time.sleep(2)
- # 采集药品信息
- try:
- # self.integrate_data()
- self.safe_exec(self.integrate_data)
- # 检测下是否回退到列表页
- if self.distinct_target():
- print('回退到列表页', True)
- else:
- if self.d.xpath('//*[@text="搜索"]').exists:
- print("检测到搜索按钮,重新开始采集流程")
- if retry_count < MAX_RETRY:
- # 停止当前监控线程
- self.monitor.stop()
- self.monitor.join()
- # 递归重启采集
- return self.main(device_id, retry_count + 1)
- else:
- print("超过最大重试次数,终止程序")
- return
- else:
- print("无法恢复页面,终止采集")
- return
- # print('回退到列表页失败,终止采集')
- # return
- time.sleep(self.get_sleep_time())
- spider_no += 1
- except Exception as e:
- print(f'采集药品详情数据出错:{e}')
- # 增加阻塞的方法:
- if not self.distinct_target():
- for i in range(1):
- self.swipe_back(1)
- # 最外部有个定位按钮
- if self.distinct_target():
- break
- if i == 0 and not self.distinct_target():
- print('页面出错,退出采集')
- return
- else:
- continue
- # 翻页逻辑(如果是最后一页则不再翻页)
- if idx < end_page:
- if self.d.xpath('//*[@text="已经到底啦"]').exists:
- self.loggerMT.info(f'在第 {idx} 页已到达列表最底部')
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='completed',
- finish_status=1,
- force_end_page=idx
- )
- return
- # 翻页
- print('开始滑动')
- self.d.drag(300, 1400, 300, 400, 1)
- print('滑动结束')
- time.sleep(self.get_sleep_time())
- # if self.d.xpath('//*[@text="已经到底啦"]').exists:
- # print('已经到达列表页最底部')
- # # === 新增:任务正常完成 ===
- # if task_id:
- # reporter.end_task(
- # task_id=task_id,
- # status='completed',
- # finish_status=1, # 1:已完成
- # force_end_page=end_page
- # )
- # # ========================
- # return
- # 采集完成,数据上报
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='completed',
- finish_status=1,
- force_end_page=end_page
- )
- '''
- search_list = self.d.xpath('//android.support.v7.widget.RecyclerView').info
- bounds = search_list['bounds']
- #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'])
- # 计算滑动距离
- scroll_distance = bounds['bottom'] - bounds['top'] # 正数
- start_y = 1600
- end_y = start_y - scroll_distance # 向上滑动,y 坐标减小
- # 确保 end_y 不小于 0
- end_y = max(end_y, 304) # 留出一点边距,避免滑出屏幕
- # print('滑动起点 y:', start_y, '终点 y:', end_y)
- # self.d.swipe(200, start_y, 200, end_y, 0.4)
- print('开始滑动')
- self.d.drag(300, 1400, 300, 400, 1)
- # self.safe_exec(self.d.drag, 300, 1400, 300, 400, 1)
- print('滑动结束')
- #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'], 0.4)
- time.sleep(self.get_sleep_time())
- '''
- except Exception as e:
- print(f"采集任务异常: {e}")
- # === 新增:异常结束上报 ===
- if task_id:
- reporter.end_task(
- task_id=task_id,
- status='failed',
- finish_status=0, # 未完成
- force_end_page=self.current_page
- )
- # ========================
- raise
- finally:
- # 确保监控线程被停止
- self.monitor.stop()
- self.monitor.join()
- def unitest(self):
- """
- 单元测试
- :return:
- """
- save_data = {
- 'product': "[昆中药]舒肝颗粒(低糖型)",
- 'min_price': 14.0,
- 'manufacture_date': '',
- 'expiry_date': '36个月',
- 'shop': '美团自营大药房(快递电商)',
- 'business_license_company': '',
- 'province': '',
- 'city': '',
- 'manufacturer': '昆明中药厂有限公司',
- 'specification': '3g*16袋/盒',
- 'approval_number': '国药准字Z53021161',
- 'product_link': '',
- 'scrape_date': '2025/07/09',
- 'scrape_province': '广东',
- 'availability': '',
- 'credit_code': '',
- 'platform': '美团'
- }
- self.save_to_database(save_data)
- time.sleep(100000)
- pass
- # retrieve database
- def get_retrieve_mysql():
- """
- 建立远端连接并返回一个到数据库的连接对象
- """
- import pymysql
- # return pymysql.connect(
- # host='39.108.116.125', # 修改后的主机
- # port=3306, # 添加端口号
- # user='drug_retrieve_master', # 修改后的用户名
- # password='6Y6zS4BKjLSBAEdP', # 修改后的密码
- # db='drug_retrieve_master', # 修改后的数据库名
- # charset='utf8mb4'
- # )
- return pymysql.connect(
- host='39.108.116.125', # 修改后的主机
- port=3306, # 添加端口号
- user='drug_retrieve', # 修改后的用户名
- password='Pem287cwM58jNpe2', # 修改后的密码
- db='drug_retrieve', # 修改后的数据库名
- charset='utf8mb4'
- )
- # def main():
- # #从配置的系统里面读取采集用到的设备和搜索关键词
- # #1、数据库的连接,从数据库中获取采集品规和设备adb码,启动程序进行采集,如果配置了采集时间,还需要支持到了时间终止采集,如果配置了采集的页数,需要滑动到指定的页数后再进行采集
- # #2、代码要支持多线程(线程池)的管理,每个线程有自己的生命周期。
- # #获取未开始的美团平台的采集任务
- # retrieve_conn = get_retrieve_mysql()
- # cursor = retrieve_conn.cursor()
- # query = "SELECT id,collect_equipment_id,product_name,start_page,end_page FROM retrieve_collect_task_allocate WHERE status = 1 AND platform = 4"
- # cursor.execute(query)
- # result = cursor.fetchone()
- # if result:
- # collect_equipment_id = result [1]
- # product_name = result[2]
- # start_page = result[3]
- # end_page = result[4]
- # print(f"collect_equipment_id={collect_equipment_id}")
- # print(f"product_name={product_name}")
- # if collect_equipment_id == 0:
- # print("设备id不存在")
- # return
- # if product_name == '':
- # print("采集关键字获取失败")
- # return
- # #通过 collect_equipment_id 获取设别adb码
- # device_query = "SELECT device_id FROM retrieve_collect_equipment WHERE id = %s and status = 0"
- # cursor.execute(device_query, (collect_equipment_id))
- # device_result = cursor.fetchone()
- # if device_result:
- # device_id = device_result[0]
- # print(f"device_id={device_id}")
- # else:
- # # self.loggerMT.info("没有可用的设备进行数据采集")
- # print("没有可用的设备进行数据采集")
- # return
- # else:
- # # self.loggerMT.info("MT 没有要采集的品规")
- # print("MT 没有要采集的品规")
- # return
- # key = product_name
- # try:
- # mt = MT(key) # 用当前关键字实例化
- # mt.main(device_id,start_page,end_page) # 执行一次完整采集
- # logging.info(f'关键字 {key} 本轮采集完成')
- # except Exception as e:
- # # 发生异常直接跳过该关键字,继续下一轮
- # logging.exception(f'关键字 {key} 采集异常:{e}')
- # finally:
- # # 关闭当前 MT 实例资源(如有需要)
- # if hasattr(mt, 'close'):
- # mt.close()
- class TimeoutException(Exception):
- pass
- # 如果需要并行处理(提高效率),可以使用线程池:
- def process_tasks_in_parallel(max_workers=12):
- """使用线程池并行处理多个任务""" """使用线程池并行处理多个任务,每个任务最多执行30分钟"""
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import concurrent.futures # ← 新增导入
- retrieve_conn = get_retrieve_mysql()
- cursor = retrieve_conn.cursor()
- query = """
- SELECT id, collect_equipment_id, product_name, start_page, end_page, duration, product_specs
- FROM retrieve_collect_task_allocate
- WHERE status = 1 \
- AND platform = 4 \
- """
- cursor.execute(query)
- results = cursor.fetchall()
- print(f"获取到的任务结果={results}")
- if not results:
- print("MT 没有要采集的品规")
- return
- # 准备任务列表
- tasks = []
- device_map = {}
- for result in results:
- task_id = result[0]
- collect_equipment_id = result[1]
- product_name = result[2]
- start_page = result[3]
- end_page = result[4]
- duration = result[5]
- product_specs = result[6]
- if collect_equipment_id != 0 and product_name and product_name.strip():
- # 缓存设备查询
- if collect_equipment_id not in device_map:
- device_query = "SELECT device_id FROM retrieve_collect_equipment WHERE id = %s AND status = 0"
- cursor.execute(device_query, (collect_equipment_id,))
- device_result = cursor.fetchone()
- device_map[collect_equipment_id] = device_result[0] if device_result else None
- if device_map[collect_equipment_id]:
- # ↓ 使用数据库中的duration,如果没有设置则用默认值30分钟
- duration_minutes = duration if duration is not None else 30
- tasks.append({
- 'task_id': task_id,
- 'device_id': device_map[collect_equipment_id],
- 'key': product_name.strip() + product_specs.strip(),
- 'start_page': start_page,
- 'end_page': end_page,
- 'duration_minutes': duration_minutes, # 存储执行时间限制(分钟)
- 'product_specs': product_specs, # 存储执行时间限制(分钟)
- 'product_name': product_name, # 存储执行时间限制(分钟)
- })
- cursor.close()
- retrieve_conn.close()
- if not tasks:
- print("没有有效的采集任务")
- return
- print(f"准备并行处理 {len(tasks)} 个任务")
- def process_single_task(task):
- """处理单个任务的函数"""
- task_start_time = time.time() # ← 记录开始时间
- # start_time = time.time()
- try:
- mt = MT(task['key'])
- # 执行采集,获取采集数量 关键数据:实际采集的数量,实际的页数
- mt.main(
- device_id=task['device_id'],
- start_page=task['start_page'],
- end_page=task['end_page'],
- task_id=task['task_id'],
- product_name=task['product_name'],
- product_specs=task['product_specs'],
- max_duration_minutes=task['duration_minutes'] # 传入时间限制
- )
- return {
- 'task_id': task['task_id'],
- 'success': True,
- 'collected_count': mt.collected_count,
- 'final_page': mt.current_page
- }
- except Exception as e:
- print(f"任务 {task['task_id']} 执行异常: {e}")
- return {
- 'task_id': task['task_id'],
- 'success': False,
- 'error': str(e)
- }
- finally:
- if 'mt' in locals() and hasattr(mt, 'close'):
- try:
- mt.close()
- except:
- pass
- # 使用线程池并行执行
- successful_tasks = 0
- failed_tasks = 0
- # total_execution_time = 0 # 初始化总执行时间变量
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- future_to_task = {
- executor.submit(process_single_task, task): task
- for task in tasks
- }
- # 处理完成的任务
- for future in as_completed(future_to_task):
- task = future_to_task[future]
- try:
- task_timeout = (task['duration_minutes'] + 5) * 60 # 加5分钟缓冲
- result = future.result(timeout=task_timeout) # 使用动态超时时间
- if result['success']:
- successful_tasks += 1
- print(f"任务 {result['task_id']}: 完成,采集 {result['collected_count']} 条数据")
- else:
- failed_tasks += 1
- print(f"任务 {result['task_id']}: 失败,错误: {result['error']}")
- except concurrent.futures.TimeoutError: # ← 捕获超时异常
- failed_tasks += 1
- print(f"任务 {task['task_id']}: 超时(限制 {task['duration_minutes']} 分钟)")
- # ↓ 超时后上报数据
- if task['task_id']:
- # 这里需要调用上报,但reporter可能没有这个任务的数据
- # 更好的方式是在MT.main中已经上报了
- pass
- except Exception as e:
- failed_tasks += 1
- print(f"任务 {task['task_id']}: 执行异常 {e}")
- # if (successful_tasks + failed_tasks) > 0:
- # avg_time = total_execution_time / (successful_tasks + failed_tasks)
- # avg_minutes = avg_time / 60
- # else:
- # avg_minutes = 0
- # total_minutes = total_execution_time / 60
- print(f"\n并行采集完成:")
- print(f"成功: {successful_tasks} 个")
- print(f"失败: {failed_tasks} 个")
- if __name__ == '__main__':
- # main()
- def run_collection():
- """执行采集任务"""
- try:
- print(f"【定时任务开始】时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- process_tasks_in_parallel(max_workers=12)
- print(f"【定时任务结束】时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- except Exception as e:
- print(f"【定时任务异常】: {e}")
- # 设置定时任务
- schedule.every(10).minutes.do(run_collection)
- # 立即执行一次
- run_collection()
- print("定时任务已设置,每40分钟执行一次采集")
- # 循环执行
- while True:
- schedule.run_pending()
- time.sleep(60) # 每分钟检查一次
- # process_tasks_in_parallel(max_workers=12) # 可以同时处理12个任务
- # scheduler = BlockingScheduler()
- # scheduler.add_job(main, 'cron', hour=21, minute=30, misfire_grace_time=120)
- # try:
- # scheduler.start()
- # except (KeyboardInterrupt, SystemExit):
- # pass
|