| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795 |
- import requests
- import base64
- from concurrent.futures import ThreadPoolExecutor
- import uiautomator2 as u2
- import time
- import sys
- import subprocess
- import re
- import random
- import json
- from aip import AipOcr
- import numpy as np
- import cv2
- import os
- from pdd_config import Config
- import logging
- from logger import setup_logger
- import pymysql
- from 拼多多盒数处理脚本.main import extract_box_number
- import datetime
- import threading
- setup_logger("pdd_spider") # 初始化日志
- def get_mysql():
- return pymysql.connect(
- host='120.24.49.2', # 修改后的主机
- port=3306, # 添加端口号
- user='drug_retrieve', # 修改后的用户名
- password='ksCt3xm6chzdkafj', # 修改后的密码
- db='drug_retrieve', # 修改后的数据库名
- charset='utf8mb4'
- )
- SCHEDULER_INTERVAL_SECONDS = 600
- PLATFORM_PDD = 3
- TASK_STATUS_PENDING = 1
- DEVICE_STATUS_IDLE = 0
- DEFAULT_MAX_COUNTS_LIMIT = 300
- # True: 从数据库读取并调度任务;False: 从 device_list 直接读取任务
- USE_DB_TASK_SOURCE = False
- dispatch_lock = threading.Lock()
- running_task_ids = set()
- running_device_ids = set()
- worker_threads = {}
- scheduler_stop_event = threading.Event()
- scheduler_timer = None
- def parse_optional_int(value, default=None):
- if value in (None, ""):
- return default
- try:
- return int(value)
- except (TypeError, ValueError):
- return default
- def fetch_pending_tasks():
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cursor:
- sql = """
- SELECT *
- FROM retrieve_collect_task_allocate
- WHERE platform = %s AND status = %s
- ORDER BY id ASC
- """
- cursor.execute(sql, (PLATFORM_PDD, TASK_STATUS_PENDING))
- return cursor.fetchall()
- except Exception as e:
- logging.exception(f"读取待执行任务失败: {e}")
- return []
- finally:
- if conn:
- conn.close()
- def fetch_idle_device_by_equipment_id(equipment_id):
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cursor:
- sql = """
- SELECT *
- FROM retrieve_collect_equipment
- WHERE name LIKE %s AND id = %s AND status = %s
- LIMIT 1
- """
- cursor.execute(sql, ('%pdd%', equipment_id, DEVICE_STATUS_IDLE))
- return cursor.fetchone()
- except Exception as e:
- logging.exception(f"读取空闲设备失败 equipment_id={equipment_id}: {e}")
- return None
- finally:
- if conn:
- conn.close()
- def build_task_payload(task_row, device_row):
- start_page = parse_optional_int(task_row[9] if len(task_row) > 9 else None, 0)
- end_page = parse_optional_int(task_row[10] if len(task_row) > 10 else None, None)
- max_counts_limit = parse_optional_int(
- task_row[11] if len(task_row) > 11 else None,
- DEFAULT_MAX_COUNTS_LIMIT
- )
- return {
- "task_id": task_row[0],
- "equipment_id": task_row[2],
- "enterprise_id": task_row[3],
- "platform": task_row[4],
- "title_key": task_row[5],
- "spec_list": task_row[6],
- "brand": task_row[7],
- "search_key": f"{task_row[7]}{task_row[5]}",
- "save_search_key": f"{task_row[7]}{task_row[5]}",
- "start_page": start_page,
- "end_page": end_page,
- "max_counts_limit": max_counts_limit,
- "sort": "升序",
- "device_id": device_row[2],
- "task_row": task_row,
- }
- def fetch_runnable_task_payloads():
- tasks = fetch_pending_tasks()
- if not tasks:
- logging.info("当前没有待执行任务")
- return []
- payloads = []
- reserved_equipment_ids = set()
- for task_row in tasks:
- task_id = task_row[0]
- equipment_id = task_row[2]
- with dispatch_lock:
- if task_id in running_task_ids:
- continue
- if equipment_id in reserved_equipment_ids:
- continue
- device_row = fetch_idle_device_by_equipment_id(equipment_id)
- if not device_row:
- logging.info(f"任务 {task_id} 对应设备 {equipment_id} 当前不空闲,跳过本轮")
- continue
- device_id = device_row[2]
- with dispatch_lock:
- if device_id in running_device_ids:
- logging.info(f"设备 {device_id} 已在本进程执行任务,跳过任务 {task_id}")
- continue
- running_task_ids.add(task_id)
- running_device_ids.add(device_id)
- reserved_equipment_ids.add(equipment_id)
- payloads.append(build_task_payload(task_row, device_row))
- return payloads
- def cleanup_finished_workers():
- dead_threads = []
- with dispatch_lock:
- for device_id, thread in worker_threads.items():
- if not thread.is_alive():
- dead_threads.append(device_id)
- for device_id in dead_threads:
- worker_threads.pop(device_id, None)
- def run_task_worker(task_payload):
- task_id = task_payload["task_id"]
- device_id = task_payload["device_id"]
- pdd = None
- try:
- logging.info(f"[任务 {task_id}] 开始执行,设备: {device_id}")
- print(task_payload)
- pdd = PDD(
- task_payload["search_key"],
- device_id,
- title_key=task_payload.get("title_key"),
- spec_list=task_payload.get("spec_list"),
- brand=task_payload.get("brand", ""),
- save_search_key=task_payload.get("save_search_key"),
- start_page=task_payload.get("start_page"),
- end_page=task_payload.get("end_page"),
- max_counts_limit=task_payload.get("max_counts_limit"),
- direct_shop_lookup=task_payload.get("direct_shop_lookup", False),
- sort=task_payload.get("sort"),
- platform=task_payload.get("platform"),
- task_id=task_payload.get("task_id"),
- enterprise_id=task_payload.get("enterprise_id"),
- )
- completed_normally = pdd.main(device_id, 1, 0)
- if completed_normally:
- logging.info(f"[任务 {task_id}] 执行完成,设备: {device_id}")
- else:
- logging.info(f"[任务 {task_id}] 已结束,设备: {device_id}")
- except Exception as e:
- end_page = task_payload.get("start_page")
- if pdd is not None:
- end_page = getattr(pdd, "page", end_page)
- pdd.finish_task_abnormally(end_page, f"任务执行异常: {e}")
- else:
- report_api(task_id, end_page=end_page, start=4, end_time=int(time.time()),finish_status=0)
- logging.exception(f"[任务 {task_id}] 执行异常,设备: {device_id},错误: {e}")
- finally:
- with dispatch_lock:
- running_task_ids.discard(task_id)
- running_device_ids.discard(device_id)
- worker_threads.pop(device_id, None)
- def dispatch_pending_tasks():
- cleanup_finished_workers()
- task_payloads = fetch_runnable_task_payloads()
- if not task_payloads:
- return
- for task_payload in task_payloads:
- device_id = task_payload["device_id"]
- try:
- thread = threading.Thread(
- target=run_task_worker,
- args=(task_payload,),
- daemon=True,
- name=f"pdd-{device_id}",
- )
- with dispatch_lock:
- worker_threads[device_id] = thread
- thread.start()
- logging.info(f"[任务 {task_payload['task_id']}] 已分发到设备 {device_id}")
- except Exception:
- with dispatch_lock:
- running_task_ids.discard(task_payload["task_id"])
- running_device_ids.discard(device_id)
- worker_threads.pop(device_id, None)
- raise
- def schedule_dispatch(delay_seconds=SCHEDULER_INTERVAL_SECONDS):
- global scheduler_timer
- if scheduler_stop_event.is_set():
- return
- scheduler_timer = threading.Timer(delay_seconds, scheduled_dispatch_job)
- scheduler_timer.daemon = False
- scheduler_timer.name = "pdd-scheduler"
- scheduler_timer.start()
- def scheduled_dispatch_job():
- try:
- dispatch_pending_tasks()
- except Exception as e:
- logging.exception(f"PDD 定时调度异常: {e}")
- finally:
- schedule_dispatch(SCHEDULER_INTERVAL_SECONDS)
- def report_api(task_id,page=None,start=None,end_page=None,end_time=None,finish_status=None):
- params = {
- "collect_task_allocate_id": task_id,
- "statr_page":page if page is not None else '',
- "end_page": end_page if end_page is not None else '',
- "status": start,
- "finish_status": finish_status if finish_status is not None else 0,
- "start_time": int(time.time()),
- "end_time": end_time if end_time is not None else '',
- }
- print(params)
- url = "http://schedule.dfwy.tech/api/collect_equipment_execute/result_report"
- res = requests.get(url, params=params, timeout=20)
- print(res.text)
- # 获取滑块验证中滑块需要移动的距离
- def slide_verify(img_path):
- with open(img_path, 'rb') as f:
- b = base64.b64encode(f.read()).decode() ## 图片二进制流base64字符串
- url = "http://api.jfbym.com/api/YmServer/customApi"
- data = {
- ## 关于参数,一般来说有3个;不同类型id可能有不同的参数个数和参数名,找客服获取
- "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk",
- "type": "22222",
- "images": b,
- }
- _headers = {
- "Content-Type": "application/json"
- }
- response = requests.request("POST", url, headers=_headers, json=data).json()
- print(response)
- if response.get("msg") == "识别成功":
- # 获取 data 中的 data 字段
- result = response.get("data", {}).get("data")
- if result:
- print(result) # 输出结果
- else:
- print("无法获取数据")
- else:
- print("识别未成功")
- return result
- class PDD:
- def __init__(
- self,
- search_key,
- device_id,
- title_key=None,
- spec_list=None,
- brand="",
- save_search_key=None,
- start_page=0,
- end_page=None,
- max_counts_limit=None,
- direct_shop_lookup=False,
- sort=None,
- platform = None,
- task_id = None,
- enterprise_id=None,
- ):
- self.package_name = 'com.xunmeng.pinduoduo'
- self.APP_ID = '116857964'
- self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh'
- self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3'
- self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY)
- self.table_name = "retrieve_scrape_data_999" # "pdd_drug"
- self.shop_table_name = "retrieve_scrape_data" # "pdd_shop_info"
- self.loggerPdd = logging.getLogger()
- self.clipboard = "" # 初始化剪切板的内容为空
- self.enterprise_id = enterprise_id
- self.task_id = task_id
- self.platform = platform
- self.sort = sort
- self.sort_key = 0
- self.search_key = search_key # 参苓健脾胃颗粒 香砂平胃颗粒 舒肝颗粒 清肺化痰丸
- self.title_key = title_key if title_key is not None else search_key
- self.spec_list = self._normalize_rule_list(spec_list)
- self.brand = brand
- self.save_search_key = save_search_key or search_key
- raw_start_page = parse_optional_int(start_page, 0)
- raw_end_page = parse_optional_int(end_page, None)
- if raw_end_page is not None and raw_end_page < raw_start_page:
- logging.warning(
- f"检测到页码配置可能反了,自动交换: start_page={raw_start_page}, end_page={raw_end_page}"
- )
- raw_start_page, raw_end_page = raw_end_page, raw_start_page
- self.start_page = max(raw_start_page, 0)
- self.end_page = raw_end_page
- self.max_counts_limit = max_counts_limit
- self.direct_shop_lookup = direct_shop_lookup
- self.unrelated_data = 0 # 无关数据数量
- self.device_id = device_id
- self.page = self.start_page
- if self.end_page is not None and self.end_page < self.start_page:
- self.end_page = self.start_page
- # 统计售罄数量
- self.sold_out_counts = 0
- # 程序启动时间
- self.program_start_time = self.app_start_time()
- # 统计商品数量
- # 最大量数据阈值
- self.max_counts = 0
- # 统计点击商品的次数
- self.click_counts = 0
- # 商品在列表的位置
- self.search_key_loc = 0
- self.finish_reported = False
- # oss配置
- self.oss_config = {
- "access_key_id": Config.access_key_id,
- "access_key_secret": Config.access_key_secret,
- "endpoint": Config.endpoint, # 例: oss-cn-beijing.aliyuncs.com
- "bucket_name": Config.bucket_name,
- "oss_prefix": Config.oss_prefix # OSS中存放截图的前缀(虚拟文件夹)
- }
- # 异常处理
- def wr_re(self, mod, device_id, sort=None, page=None):
- file_path = f'./ycwj/{device_id}_{self.title_key}.txt'
- if mod == "写":
- try:
- data = {
- "page": page if page else "",
- "sort": sort if sort else "",
- }
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f"进度保存成功:{sort},{page}页")
- except Exception as e:
- print("保存进度失败")
- elif mod == "读":
- try:
- if not os.path.exists(file_path):
- return None
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- print(self.sort)
- if self.sort and self.sort_key == 0:
- self.li_or_lo(self.sort)
- if data['page'] != '':
- progress_page = int(data['page'])
- self.page = max(progress_page, self.start_page)
- self.scroll_to_target_page(self.page)
- else:
- return None
- return data
- except Exception as e:
- print(f"读取进度失败", e)
- return None
- elif mod == "删":
- try:
- if os.path.exists(file_path):
- os.remove(file_path)
- print(f"进度文件已删除:{file_path}")
- except Exception as e:
- print(f"删除进度文件失败:{e}")
- return None
- def clear_progress_file(self):
- # self.wr_re("删", self.device_id, self.sort)
- pass
- def is_max_count_reached(self):
- return bool(self.max_counts_limit and self.max_counts >= self.max_counts_limit)
- def scroll_to_target_page(self, target_page):
- target_page = int(target_page or 0)
- if target_page <= 0:
- return
- for _ in range(target_page):
- end_y = 300
- self.d.swipe(200, 1400, 200, end_y, 0.4)
- time.sleep(self.get_sleep_time())
- def finish_task_normally(self, end_page, reason):
- if not self.finish_reported:
- if self.task_id:
- report_api(self.task_id, end_page=end_page, start=3, end_time=int(time.time()),finish_status=1)
- self.finish_reported = True
- print(reason)
- return True
- def finish_task_abnormally(self, end_page, reason, finish_status=0):
- if not self.finish_reported:
- if self.task_id:
- report_api(
- self.task_id,
- end_page=end_page,
- start=4,
- end_time=int(time.time()),
- finish_status=finish_status
- )
- self.finish_reported = True
- print(reason)
- return False
- def finish_task_with_max_count(self, end_page):
- return self.finish_task_normally(
- end_page,
- f"达到最大采集数量 {self.max_counts_limit},当前已采集 {self.max_counts} 条,停止任务"
- )
- # 排序
- def li_or_lo(self, key):
- if key == "升序":
- self.sort_key += 1
- self.d.xpath('//*[@text="价格"]').click()
- n = self.d.xpath('//*[@text="总价低到高"]')
- if n.exists:
- n.click()
- time.sleep(self.get_sleep_time())
- if key == "降序":
- self.sort_key += 1
- self.d.xpath('//*[@text="价格"]').click()
- n = self.d.xpath('//*[@text="单粒价格低到高"]')
- if n:
- n.click()
- else:
- self.d.xpath('//*[@text="价格"]').click()
- # 返回列表页
- def back_to_list_page(self):
- for i in range(10):
- if self.distinct_target():
- return True
- print(f'第{i}次尝试退回到列表页')
- self.swipe_back(1)
- time.sleep(1)
- print('页面出错,没有退回到列表页')
- return False
- def get_drug_lis(self, idx):
- if idx == 0:
- drug_lis = self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all()
- else:
- for i in range(1, 6):
- drug_lis = self.d.xpath(
- f'/hierarchy/android.widget.FrameLayout[{i}]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all()
- if drug_lis:
- break
- return drug_lis
- # 代码运行那时候的时间
- def app_current_time(self):
- return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- def slide_link(self):
- value_tag = None
- if self.d.xpath('//*[@text="微信"]').exists:
- value_tag = self.d.xpath('//*[@text="微信"]').info['bounds']
- self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3)
- return
- if self.d.xpath('//*[@text="朋友圈"]').exists:
- value_tag = self.d.xpath('//*[@text="朋友圈"]').info['bounds']
- self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3)
- return
- if self.d.xpath('//*[@text="QQ好友"]').exists:
- value_tag = self.d.xpath('//*[@text="QQ好友"]').info['bounds']
- self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3)
- return
- def app_start_time(self):
- """
- 获取app启动时间
- :return:
- """
- return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- def stop_app(self):
- self.d.app_stop(self.package_name)
- time.sleep(5)
- def start_app(self):
- self.d.app_start(self.package_name)
- time.sleep(5)
- def restart_app(self):
- """
- 重启app
- :return:
- """
- self.stop_app()
- self.start_app()
- def enter_target_or_restart(self):
- """
- 任务开始前优先尝试回退两次并复用当前拼多多会话;
- 若当前不在拼多多,则按原逻辑重启后再进入目标页面。
- """
- for _ in range(2):
- self.d.press('back')
- time.sleep(self.get_sleep_time())
- current_package = ""
- try:
- current_app = self.d.app_current()
- if isinstance(current_app, dict):
- current_package = current_app.get("package", "")
- except Exception as e:
- self.loggerPdd.warning(f"获取当前前台应用失败: {e}")
- if current_package == self.package_name:
- print("当前在拼多多APP内,直接进入目标页面,不重启")
- self.enter_target_page()
- else:
- print("当前不在拼多多APP内,按原逻辑重启APP后进入目标页面")
- self.restart_app()
- self.enter_target_page()
- @staticmethod
- def get_sleep_time():
- return random.randint(1, 2)
- # return random.randint(5, 8)
- @staticmethod
- def get_current_date():
- return datetime.datetime.now().strftime('%Y/%m/%d')
- @staticmethod
- def _normalize_rule_list(value):
- if value is None:
- return []
- if isinstance(value, (list, tuple, set)):
- raw_values = value
- else:
- raw_values = [value]
- result = []
- for item in raw_values:
- item_str = str(item).strip()
- if item_str:
- result.append(item_str)
- return result
- @staticmethod
- def _normalize_match_text(value):
- return re.sub(r'\s+', '', str(value or '')).lower()
- def _match_any_keyword(self, text, keywords):
- keyword_list = self._normalize_rule_list(keywords)
- if not keyword_list:
- return True
- normalized_text = self._normalize_match_text(text)
- return any(self._normalize_match_text(keyword) in normalized_text for keyword in keyword_list)
- def is_link_spec_useful(self, product_title, specifications=''):
- if not self.spec_list:
- return True
- title_text = self._normalize_match_text(product_title)
- spec_text = self._normalize_match_text(specifications)
- for spec in self.spec_list:
- normalized_spec = self._normalize_match_text(spec)
- if normalized_spec in title_text or normalized_spec in spec_text:
- return True
- return False
- def is_link_useful(self, product_title, specifications=''):
- if not self._match_any_keyword(product_title, self.title_key):
- print(f"当前商品名称:{product_title} 不包含{self.title_key}关键字")
- return False
- if not self._match_any_keyword(product_title, self.brand):
- print(f"当前商品名称:{product_title} 不包含{self.brand}品牌")
- return False
- if not self.is_link_spec_useful(product_title, specifications):
- print(f"当前商品名称:{product_title} 不包含{self.spec_list}品规")
- return False
- return True
- def remove_watermark(self, img_path):
- """
- 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据
- :param img_path: 图片路径
- :return: 二进制图片数据
- """
- img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
- endswith = os.path.splitext(img_path)[1]
- new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8)
- _, img_binary = cv2.imencode(endswith, new)
- return img_binary
- def get_shop_name(self):
- """
- 获取店铺名
- :return:
- """
- try:
- xpath = '//*[@text="进店"]/preceding-sibling::android.view.ViewGroup/android.widget.LinearLayout/android.widget.TextView'
- if self.d.xpath(xpath).exists:
- shop_name = self.d.xpath(xpath).text
- self.loggerPdd.info(f'1-获取到店铺名:{shop_name}')
- else:
- # 进入店铺新页面
- shop_btn_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]'
- if self.d.xpath(shop_btn_xpath).exists:
- self.d.xpath(shop_btn_xpath).click()
- time.sleep(1)
- # self.d.xpath('//*[@text="店铺"]').click()
- xpath_shop_name = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.TextView[1]'
- if self.d.xpath(xpath_shop_name).exists:
- shop_name = self.d.xpath(xpath_shop_name).text
- self.loggerPdd.info(f'2-获取到店铺名:{shop_name}')
- else:
- shop_name = ''
- self.loggerPdd.info(f'3-获取到店铺名:{shop_name}')
- self.swipe_back(1) #
- else:
- shop_name = ''
- self.loggerPdd.info('4-因为shop_btn_xpath不存在,获取到店铺名为空')
- # time.sleep(10000)
- return shop_name
- except Exception as e:
- print(f'获取店铺名出错:{e}')
- self.loggerPdd.error(f'获取店铺名出错:{e}')
- return None
- def save_to_database(self, data):
- print(f'保存数据到数据库:{data}')
- max_retries = 5
- for attempt in range(max_retries):
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cur:
- add_sql = """
- INSERT INTO retrieve_scrape_data_999 (
- enterprise_id, platform_id, platform_item_id, province_id, city_id,
- province_name, city_name, area_info, product_name, product_specs,
- one_box_price, manufacture_date, expiry_date, manufacturer, approval_number,
- is_sold_out, online_posting_count, continuous_listing_count, link_url,
- store_name, store_url, shipment_province_id, shipment_province_name,
- shipment_city_id, shipment_city_name, company_name, qualification_number,
- scrape_date, min_price, number, sales, inventory, snapshot_url
- ) VALUES (
- %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s, %s, %s, %s, %s
- )
- """
- cur.execute(add_sql, (
- data['enterprise_id'],
- data['platform_id'],
- data['platform_item_id'],
- data['province_id'],
- data['city_id'],
- data['province_name'],
- data['city_name'],
- data['area_info'],
- data['product_name'],
- data['product_specs'],
- data['one_box_price'],
- data['manufacture_date'],
- data['expiry_date'],
- data['manufacturer'],
- data['approval_number'],
- data['is_sold_out'],
- data['online_posting_count'],
- data['continuous_listing_count'],
- data['link_url'],
- data['store_name'],
- data['store_url'],
- data['shipment_province_id'],
- data['shipment_province_name'],
- data['shipment_city_id'],
- data['shipment_city_name'],
- data['company_name'],
- data['qualification_number'],
- data['scrape_date'],
- data['min_price'],
- data['number'],
- data['sales'],
- data['inventory'],
- data['snapshot_url'],
- ))
- conn.commit()
- self.max_counts += 1
- print(f"存入数据库成功,当前已采集 {self.max_counts} 条")
- return True
- except Exception as e:
- print(f'保存数据库异常 (尝试 {attempt + 1}/{max_retries}): {e}')
- if conn:
- conn.rollback()
- conn.close()
- if attempt == max_retries - 1:
- print("达到最大重试次数,保存失败")
- return False
- time.sleep(2)
- def click_target_product_by_search_key(self, fuzzy_match=False, timeout=10):
- """
- 动态匹配self.search_key对应的商品并点击
- :param fuzzy_match: 是否模糊匹配(应对商品名带额外后缀/前缀的情况) 不模糊匹配
- :param timeout: 等待元素出现的超时时间(秒)
- :return: 点击是否成功(bool)
- """
- try:
- # 1. 定义定位条件(动态使用self.search_key)
- if fuzzy_match:
- # 模糊匹配:包含search_key即可(推荐,适配搜索结果商品名略有差异)
- locator = self.d(textContains=self.search_key)
- print(f"🔍 模糊匹配商品:包含「{self.search_key}」的元素")
- else:
- # 精确匹配:商品名与search_key完全一致
- locator = self.d(text=self.search_key)
- print(f"🔍 精确匹配商品:「{self.search_key}」")
- # 2. 等待元素出现(核心:避免元素未加载就点击)
- if locator.wait(timeout=timeout):
- print(f"✅ 找到匹配的商品,准备点击")
- # 执行点击(优先点击可点击的元素)
- locator.click()
- print(f"✅ 成功点击「{self.search_key}」对应的商品")
- # 点击后等待页面加载
- time.sleep(self.get_sleep_time())
- return True
- else:
- print(f"❌ 滑动后仍未找到「{self.search_key}」对应的商品")
- return False
- except Exception as e:
- print(f"❌ 点击「{self.search_key}」对应商品时异常:{e}")
- return False
- def swipe_down(self):
- """
- 下滑(模拟真人操作,抗风控+设备适配+容错)
- 核心:起点在屏幕上方,终点在屏幕下方(和上滑相反)
- :return: None
- """
- try:
- # 1. 获取屏幕尺寸(兼容不同设备,给默认值避免获取失败)
- screen_width = self.d.info.get('displayWidth', 1080) # 默认1080px宽度
- screen_height = self.d.info.get('displayHeight', 2400) # 默认2400px高度
- # 2. 随机滑动时长(0.1~0.3秒,避免固定值被风控,且不设0秒)
- duration_rate = random.uniform(0.1, 0.3)
- # 3. 计算滑动坐标(用屏幕比例,适配所有设备)
- start_x = screen_width // 2 # 水平居中(和上滑一致,符合真人操作习惯)
- start_y = screen_height * 0.2 # 起点:屏幕20%高度(上方偏下)
- end_y = screen_height * 0.8 # 终点:屏幕80%高度(下方偏上)
- # 强制确保起点y < 终点y(必为向下滑,避免逻辑错误)
- start_y, end_y = min(start_y, end_y - 10), max(end_y, start_y + 10)
- # 4. 核心向下滑动操作
- self.d.swipe(start_x, start_y, start_x, end_y, duration=duration_rate)
- # 滑动后全局等待(确保页面加载,避免元素定位失败)
- time.sleep(self.get_sleep_time())
- except Exception as e:
- # 异常捕获:避免设备断开/滑动失败导致程序崩溃
- print(f"向下滑动失败:{e}")
- # 兜底方案:用固定坐标重试(适配主流1080x2400设备)
- self.d.swipe(540, 480, 540, 1920, duration=0.2)
- time.sleep(self.get_sleep_time())
- def swipe_up(self):
- """
- 上滑
- :return:
- """
- screen_width = self.d.info['displayWidth']
- screen_height = self.d.info['displayHeight']
- duration_rate = random.uniform(0, 0.3)
- self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate)
- no = random.uniform(0, 1)
- if no > 0.85:
- # 有的时候卡着 再稍微往上滑一点点
- self.d.swipe_ext("up", 0.1)
- time.sleep(self.get_sleep_time())
- def swipe_back(self, no):
- """
- 返回
- :param no: 回退次数
- :return:
- """
- if not self.distinct_target():
- for idx in range(no):
- self.d.press('back')
- time.sleep(self.get_sleep_time())
- def drug_price(self):
- """
- 获取药品价格
- :return:
- """
- try:
- xpath = '//*[@text="¥"]/following-sibling::android.widget.TextView[1]'
- price_str = self.d.xpath(xpath).text
- price = float(re.search(r'[\d\.]+', price_str).group())
- print(f'获取到价格:{price}')
- return float(price)
- except Exception as e:
- print(f'提取价格出错-->{e}')
- return None
- def drug_price_ex(self):
- price_str = '' # 价格初始化
- ext = '' # 初始化已选择的信息
- price = ''
- # 这是点击进入品规的按钮
- button_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]'
- button_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]'
- # 调试
- # test_button = self.d.xpath(button_xpath_1).exists
- # print(test_button)
- # test_button_2 = self.d.xpath(button_xpath_2).exists
- # print(test_button_2)
- # time.sleep(1000)
- # if self.d.xpath('//*[@text="发起拼单"]').exists:
- # self.d.xpath('//*[@text="发起拼单"]').click()
- # elif self.d.xpath('//*[@text="去复诊开药"]').exists:
- # self.d.xpath('//*[@text="去复诊开药"]').click()
- if self.d.xpath(button_xpath_1).exists:
- self.d.xpath(button_xpath_1).click()
- elif self.d.xpath(button_xpath_2).exists:
- self.d.xpath(button_xpath_2).click()
- else:
- print("button1 and button_2 all not exist")
- return price, ext
- select_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]'
- select_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]'
- select_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()]'
- select_xpath_3_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()-1]'
- price_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]'
- price_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]'
- price_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]//android.widget.TextView[1]'
- if self.d.xpath(select_xpath_1).exists:
- text1 = self.d.xpath(select_xpath_1).text
- print(f"select_xpath_1--text1={text1}")
- if '已选' in text1:
- if self.d.xpath(price_xpath_1).exists:
- price_str = self.d.xpath(price_xpath_1).text
- print(f"select_xpath_1--price_str-1={price_str}")
- else:
- print("select_xpath_1--price_xpath_1-1 not exist")
- ext = text1
- elif '请选择' in text1:
- # 需要再下面点击选择
- scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]'
- scroll_xpath_2 = ''
- if self.d.xpath(scroll_xpath_1).exists:
- self.d.xpath(scroll_xpath_1).click()
- time.sleep(2) # 延时2秒钟,选择了之后价格会刷新
- if self.d.xpath(select_xpath_1).exists:
- text2 = self.d.xpath(select_xpath_1).text
- if '已选' in text2:
- print(f"select_xpath_1--已选择2:text2={text2}")
- if self.d.xpath(price_xpath_1).exists:
- price_str = self.d.xpath(price_xpath_1).text
- print(f"select_xpath_1--price_str-2={price_str}")
- else:
- print("select_xpath_1--price_xpath_1-2 not exist")
- ext = text2
- else:
- print("select_xpath_1--scroll_xpath_1 not exist")
- elif self.d.xpath(select_xpath_2).exists:
- text1 = self.d.xpath(select_xpath_2).text
- print(f"xpath2--text1={text1}")
- if '已选' in text1:
- ext = text1
- if self.d.xpath(price_xpath_2).exists:
- price_str = self.d.xpath(price_xpath_2).text
- print(f"select_xpath_2--price_str-2={price_str}")
- else:
- print("select_xpath_2--price_xpath_2-1 not exist")
- elif '请选择' in text1:
- print('come in here')
- # 需要再下面点击选择
- scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]'
- if self.d.xpath(scroll_xpath_1).exists:
- print("scroll_xpath_1 exists")
- self.d.xpath(scroll_xpath_1).click()
- time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新
- if self.d.xpath(select_xpath_2).exists:
- text2 = self.d.xpath(select_xpath_2).text
- if '已选' in text2:
- ext = text2
- print(f"select_xpath_2--已选择2:text2={text2}")
- if self.d.xpath(price_xpath_2).exists:
- price_str = self.d.xpath(price_xpath_2).text
- print(f"select_xpath_2--price_str-2={price_str}")
- else:
- print("select_xpath_2--price_xpath_2-2 not exist")
- else:
- print("scroll_xpath_1 not exists")
- else:
- print("not exist 请选择 or 已选")
- elif self.d.xpath(select_xpath_3).exists:
- text1 = self.d.xpath(select_xpath_3).text
- print(f"xpath3--text1-1={text1}")
- if ('请选择' not in text1) and ('已选' not in text1):
- text1 = self.d.xpath(select_xpath_3_2).text
- print(f"xpath3--text1-2={text1}")
- if '已选' in text1:
- ext = text1
- if self.d.xpath(price_xpath_3).exists:
- price_str = self.d.xpath(price_xpath_3).text
- print(f"select_xpath_3--price_str-3-3-1={price_str}")
- else:
- print("select_xpath_3--price_xpath_3-3-1 not exist")
- elif '请选择' in text1:
- print('come in here')
- # 需要再下面点击选择
- scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]'
- recycler_view_xpath = '//*[@resource-id="android:id/content"]//android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]'
- if self.d.xpath(scroll_xpath_1).exists:
- print("scroll_xpath_1 exists")
- self.d.xpath(scroll_xpath_1).click()
- time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新
- if self.d.xpath(select_xpath_3).exists:
- text2 = self.d.xpath(select_xpath_3).text
- if '已选' in text2:
- ext = text2
- print(f"select_xpath_3--已选择2:text2={text2}")
- if self.d.xpath(price_xpath_3).exists:
- price_str = self.d.xpath(price_xpath_3).text
- print(f"select_xpath_3--price_str-3-2={price_str}")
- else:
- print("select_xpath_3--price_xpath_3-3-2 not exist")
- elif self.d.xpath(recycler_view_xpath).exists:
- self.d.xpath(recycler_view_xpath).click()
- time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新
- if self.d.xpath(select_xpath_3).exists:
- text2 = self.d.xpath(select_xpath_3).text
- if '已选' in text2:
- ext = text2
- print(f"select_xpath_3--已选择2:text2={text2}")
- if self.d.xpath(price_xpath_3).exists:
- price_str = self.d.xpath(price_xpath_3).text
- print(f"select_xpath_3--price_str-3-3={price_str}")
- else:
- print("select_xpath_3--price_xpath_3-3-3 not exist")
- else:
- print("scroll_xpath_1 not exists")
- else:
- print(f"xpath3--text1-不包含请选择和已选择")
- else:
- print("select_xpath_1 and select_xpath_2 and select_xpath_3 all not exist")
- if price_str:
- # price = float(re.search('[\d\.]+', price_str).group())
- match = re.search(r'¥([\d\.]+)', price_str)
- if match:
- price = float(match.group(1))
- else:
- price = ''
- # price = float(re.search(r'¥([\d\.]+)', price_str).group(1))
- print(f'获取到价格:{price}')
- print(f"ext={ext}")
- self.swipe_back(1) #
- return price, ext
- def restart_uiautomator_services(self, device_id):
- """
- 重启atx的uiautomator 服务
- :param device_id:
- :return:
- """
- stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop'
- start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d'
- subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- def connect_devices(self, device_id):
- """
- 连接设备
- :return:
- """
- try:
- self.d = u2.connect_usb(device_id)
- # 设置隐形等待时间
- # self.d.implicitly_wait(5)
- self.restart_uiautomator_services(device_id)
- print(f'[{self.program_start_time}]连接到设备:{device_id}')
- except Exception as e:
- print(f'{device_id} 连接错误: {e}')
- raise Exception(e)
- def get_ocr_res(self, img):
- try:
- image = self.remove_watermark(img)
- res_image = self.client.basicGeneral(image)
- data = res_image.get('words_result', '')
- print(f'百度api返回结果:{data}')
- return data
- except:
- return None
- def get_title(self):
- try:
- print('开始提取标题')
- time.sleep(self.get_sleep_time())
- title_xpath = '//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]'
- if self.d.xpath(title_xpath).exists:
- title = self.d.xpath(title_xpath).info['contentDescription'].strip()
- else:
- return None
- # title = self.d.xpath('//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]').info['contentDescription'].strip()
- print(f'提取到标题:{title}')
- return title
- except Exception as e:
- print(f'获取标题出错:{e}')
- return None
- # 从里面匹配出药品名和规格
- # drugs_name
- # specifications
- # match = re.search(r'([^\d]+)([\d\D]+)', title)
- # match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title)
- # if match:
- # drugs_name = match.group(1).strip() + match.group(2).strip()
- # specifications = match.group(3).strip()
- # print("药品名:", drugs_name)
- # print("规格:", specifications)
- # print('完整药名:', drugs_name + specifications)
- # return drugs_name, specifications
- # else:
- # print("没有匹配到预期格式")
- def enter_shop(self):
- """
- 进店,方便提取资质环境
- :return:
- """
- # self.d.xpath('//*[@text="进店"]').click()
- self.d.xpath('//*[@text="店铺"]').click()
- time.sleep(self.get_sleep_time())
- def data_is_exists(self, data):
- # 1. 验证必要字段
- required_keys = ['min_price', 'shop', 'scrape_date', 'platform']
- if not all(key in data for key in required_keys):
- missing = [key for key in required_keys if key not in data]
- print(f"缺少必要字段: {', '.join(missing)}")
- return None
- conn = None
- try:
- conn = get_mysql()
- with conn.cursor() as cur:
- query_sql = """
- SELECT * FROM {}
- WHERE min_price = %s
- AND store_name = %s
- AND scrape_date = %s
- AND platform_id = %s
- """.format(self.table_name)
- cur.execute(query_sql, (
- data['min_price'],
- data['shop'],
- data['scrape_date'],
- data['platform']
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
- finally:
- if conn:
- conn.close()
- def get_instructions_data(self):
- """
- 确定有详情页之后之后,提取所有的详情页数据
- :return:
- """
- for i in range(8):
- if self.d.xpath('//*[@text="品牌"]').exists or self.d.xpath('//*[@text="药品通用名"]').exists:
- self.d.swipe_ext("up", scale=0.1)
- print('开始采集详情数据')
- break
- self.d.swipe_ext("up", scale=0.5)
- time.sleep(self.get_sleep_time())
- # 点击查看全部
- if self.d.xpath('//*[@text="品牌"]').exists:
- self.d.xpath('//*[@text="品牌"]').click()
- else:
- self.d.xpath('//*[@text="药品通用名"]').click()
- time.sleep(self.get_sleep_time())
- attr = dict()
- # # 获取详情页信息
- xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView'
- ddd = self.d.xpath(xpath).all()
- for i in range(0, len(ddd), 2):
- group = ddd[i:i + 2]
- attr[group[0].text] = group[1].text
- # 截图获取未获取到的数据
- # if not all(i in ['有效期', '生产企业', '批准文号', '药品规格', '产品规格'] for i in attr.keys()):
- if not all(i in ['有效期', '生产企业', '批准文号', '药品规格'] for i in attr.keys()):
- self.d.swipe_ext("up", 0.4)
- time.sleep(self.get_sleep_time())
- xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView'
- ddd = self.d.xpath(xpath).all()
- for i in range(0, len(ddd), 2):
- group = ddd[i:i + 2]
- attr[group[0].text] = group[1].text
- print(f'当前说明书规格参数:{attr}')
- res_data = {
- # "有效期": attr.get('有效期',''),
- # "生产单位": attr['生产企业'],
- # "批准文号": attr['批准文号'],
- # "产品规格": attr.get('药品规格') if attr.get('药品规格', '') else attr.get('药品规格')
- "有效期": attr.get('有效期', ''),
- "生产单位": attr.get('生产企业', ''),
- "批准文号": attr.get('批准文号', ''),
- "产品规格": attr.get('药品规格', '')
- }
- print(f'当前规格参数字典数据:{res_data}')
- return res_data
- def has_instructions(self):
- """
- 是否有详情页
- :return:如果有详情页返回True,否则返回False
- """
- # 没有说明书的无法采集具体数据
- max_attempts = 12 # 最大尝试次数
- attempt = 0 # 当前尝试次数
- while attempt < max_attempts:
- time.sleep(0.5)
- xpath = '//*[@text="商品详情"]'
- is_has_instructions = self.d.xpath(xpath).exists
- if is_has_instructions:
- return True # 如果找到“商品详情”,则返回True
- self.d.swipe_ext("up", 0.3)
- attempt += 1
- return False # 如果尝试次数达到最大次数,则返回False
- def distinct_target(self):
- result = False
- is_position = self.d.xpath('//*[@content-desc="拍照搜索"]').exists
- is_position2 = self.d.xpath('//*[@text="年货节大促"]').exists
- is_position3 = self.d.xpath('//*[@text="筛选"]').exists
- is_position4 = self.d.xpath('//*[@text="回头客常拼"]').exists
- list_page_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]//android.support.v7.widget.RecyclerView[1]'
- is_position_new = self.d.xpath(list_page_xpath).exists
- print(f'is_position_new={is_position_new}')
- if is_position or is_position2 or is_position3 or is_position4 or is_position_new:
- result = True
- return result
- def enter_target_page(self):
- self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]').click()
- time.sleep(self.get_sleep_time())
- self.d(className='android.widget.EditText').click()
- time.sleep(self.get_sleep_time())
- self.d.send_keys(self.search_key, clear=True)
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@text="搜索"]').click()
- time.sleep(self.get_sleep_time())
- if self.sort and self.sort_key == 0:
- self.li_or_lo(self.sort)
- # progress = self.wr_re("读", self.device_id)
- progress = None
- if not progress and self.page > 0:
- self.scroll_to_target_page(self.page)
- def get_clipboard(self):
- self.loggerPdd.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息
- clipboard_content = self.d.clipboard
- if clipboard_content is None:
- return ''
- return clipboard_content.strip()
- def get_product_link(self):
- product_link = ''
- print('开始获取商品链接')
- content_frame = self.d.xpath('//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]').exists
- print(content_frame)
- relative_layout = self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]').exists
- print(relative_layout)
- relative_layout2 = self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]').exists
- print(relative_layout2)
- Frame_Layout = self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]').exists
- print(Frame_Layout)
- ImageView = self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]').exists
- print(ImageView)
- ImageView2 = self.d.xpath(
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[3]/android.view.View[1]').exists
- print(ImageView2)
- # 多种可能的“分享”按钮
- dots_xpaths = [
- # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]',
- '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[last()]/android.view.View[1]',
- # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]',
- # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[3]/android.widget.ImageView[1]',
- ]
- max_retry = 5 # 最多尝试次数
- for idx in range(1, max_retry + 1):
- if product_link: # 已经拿到则退出
- break
- for xp in dots_xpaths:
- if self.d.xpath(xp).exists:
- # print(f'{idx}-进入分享点点点')
- self.loggerPdd.info(f'{idx}-进入分享点点点')
- self.d.xpath(xp).click()
- time.sleep(1)
- self.loggerPdd.info('开始滑动')
- self.slide_link()
- time.sleep(0.2)
- self.d.xpath('//*[@text="复制链接"]').click_exists()
- time.sleep(1)
- product_link = self.get_clipboard()
- time.sleep(0.5)
- self.loggerPdd.info(f'{idx}-商品链接:{product_link}')
- break # 找到并执行后跳出内层循环
- if not product_link and idx < max_retry:
- time.sleep(0.5) # 最后一次不需要再等待
- # time.sleep(100000)
- return product_link
- def integrate_data_v2(self):
- """
- 基于入口配置统一校验标题、品牌和品规,替代内部大量硬编码分支。
- """
- min_price, ext = self.drug_price_ex()
- title_info = self.get_title()
- if not title_info:
- print('标题获取为空')
- self.swipe_back(1)
- return
- if not self.is_link_useful(title_info):
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- if not min_price:
- min_price = self.drug_price()
- if not min_price:
- print('提取价格出错,回退到列表页')
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- product_link = self.get_product_link()
- time.sleep(2)
- if self.direct_shop_lookup:
- shop = self.get_shop_name()
- else:
- for _ in range(15):
- if self.d(textStartsWith="进店").exists:
- print('开始获取店铺名')
- break
- self.d.swipe_ext("up", scale=0.3)
- time.sleep(self.get_sleep_time())
- if self.d(textStartsWith="进店").exists:
- print('可以开始获取店铺名')
- shop = self.get_shop_name()
- if not shop:
- print('当前店铺名称为空')
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- scrape_date = self.get_current_date()
- dup_data = {
- 'min_price': min_price,
- 'shop': shop,
- 'scrape_date': scrape_date,
- 'platform': '3'
- }
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.back_to_list_page()
- return
- is_has_instructions = self.has_instructions()
- self.loggerPdd.info(f'是否有说明书:{is_has_instructions}')
- manufacture_date = ''
- credit_code = ext
- if is_has_instructions:
- try:
- instructions_info = self.get_instructions_data()
- expiry_date = instructions_info['有效期'].strip('。')
- manufacturer = instructions_info['生产单位'].strip('。')
- approval_number = instructions_info['批准文号'].strip('。')
- specifications = instructions_info['产品规格'].strip('。')
- except Exception as e:
- print(f'获取详情页规格参数出错:{e}')
- self.swipe_back(2)
- return
- else:
- expiry_date = ''
- manufacturer = ''
- approval_number = ''
- specifications = ''
- if not self.is_link_useful(title_info, specifications):
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- self.unrelated_data = 0
- if extract_box_number(credit_code):
- one_box_price = min_price / extract_box_number(credit_code)
- else:
- print("单瓶药品价格没处理成功")
- one_box_price = 0
- save_data = {
- 'enterprise_id': 6,
- 'platform_id': 3,
- 'platform_item_id': '',
- 'province_id': 0,
- 'city_id': 0,
- 'province_name': '',
- 'city_name': '',
- 'area_info': "",
- 'product_name': title_info,
- 'product_specs': specifications,
- 'one_box_price': one_box_price,
- 'manufacture_date': manufacture_date,
- 'expiry_date': expiry_date,
- 'manufacturer': manufacturer,
- 'approval_number': approval_number,
- 'is_sold_out': 0,
- 'online_posting_count': 1,
- 'continuous_listing_count': 1,
- 'link_url': product_link,
- 'store_name': shop,
- 'store_url': '',
- 'shipment_province_id': 0,
- 'shipment_province_name': "",
- 'shipment_city_id': 0,
- 'shipment_city_name': "",
- 'company_name': "",
- 'qualification_number': "",
- 'scrape_date': scrape_date,
- 'min_price': min_price,
- 'number': 0,
- 'sales': "",
- 'inventory': "",
- 'snapshot_url': "",
- 'insert_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- }
- self.save_to_database(save_data)
- def main(self, device_id, search_key_length, keyword_idx):
- completed_normally = False
- stop_by_max_count = False
- spider_no = 0
- current_page = self.page
- self.connect_devices(device_id)
- time.sleep(self.get_sleep_time())
- if keyword_idx == 0:
- print("搜索前先尝试回退两次并判断当前应用")
- self.enter_target_or_restart()
- else:
- print("清空前面的文字,再输入关键词")
- self.d.send_keys(self.search_key, clear=True)
- time.sleep(1)
- print("点击搜索")
- self.d.xpath('//*[@text="搜索"]').click()
- time.sleep(1)
- # 上报状态
- if self.task_id:
- report_api(self.task_id, self.page, 2,finish_status=0)
- for idx in range(300):
- print(f'第{current_page}页')
- # self.wr_re("写", self.device_id, self.sort, current_page)
- if spider_no > 30:
- time.sleep(300)
- spider_no = 0
- if self.unrelated_data > 30:
- print(f'[{self.program_start_time}]----{self.search_key}----连续超过30个不达标的数据则停止采集')
- print(
- f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品")
- self.swipe_down()
- time.sleep(self.get_sleep_time()) # 下滑后等待页面稳定
- click_success = self.click_target_product_by_search_key(fuzzy_match=False)
- if not click_success:
- self.finish_task_abnormally(
- current_page,
- f"连续超过30个不达标的数据后,关键词「{self.search_key}」商品点击失败",
- finish_status=1
- )
- return
- print("点击搜索框")
- self.d(className='android.widget.EditText').click()
- time.sleep(self.get_sleep_time())
- if keyword_idx == search_key_length - 1:
- print("程序最后一个品规采集完毕,返回主屏幕")
- completed_normally = self.finish_task_normally(
- current_page,
- '连续超过30个不达标的数据,结束采集'
- )
- else:
- completed_normally = True
- break
- if self.is_max_count_reached():
- completed_normally = self.finish_task_with_max_count(current_page)
- # 向下滑
- self.swipe_down()
- time.sleep(self.get_sleep_time())
- # 点击搜索框
- click_success = self.click_target_product_by_search_key(fuzzy_match=False)
- if not click_success:
- print(f"关键词「{self.search_key}」商品点击失败")
- return
- print("点击搜索框")
- self.d(className='android.widget.EditText').click()
- time.sleep(self.get_sleep_time())
- break
- # 售罄次数大于4基本就是号废了但是如果下次点击不会出现这种情况就要重置为0
- if self.sold_out_counts > 4:
- self.finish_task_abnormally(
- current_page,
- "====商品已售罄4次,结束采集(号不能用)",
- finish_status=1
- )
- print(
- f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品")
- break
- drug_lis = self.get_drug_lis(idx)
- print('数量', len(drug_lis))
- for idd, drug_one in enumerate(drug_lis):
- print(idd + 1, drug_one.info)
- time.sleep(self.get_sleep_time())
- top = drug_one.info['bounds']['top']
- bottom = drug_one.info['bounds']['bottom']
- if bottom <= 1524 and top >= 258:
- drug_one.click()
- self.click_counts += 1
- time.sleep(self.get_sleep_time())
- # 先判断是否售罄次数是否大于4
- if self.sold_out_counts >= 4:
- print(
- f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品")
- self.finish_task_abnormally(
- current_page,
- "====这是在第一页有两个,商品已售罄4次,结束采集(号不能用)====",
- finish_status=1
- )
- time.sleep(self.get_sleep_time())
- self.d.press('home')
- return
- if self.d.xpath('//*[contains(@text, "商品已售罄")]').wait(timeout=5):
- print("======商品已售罄======")
- self.sold_out_counts += 1
- if self.back_to_list_page():
- continue
- # 采集药品信息
- try:
- # 重置商品售罄次数
- self.sold_out_counts = 0
- self.integrate_data_v2()
- # 检测下是否回退到列表页
- if self.back_to_list_page():
- print('回退到列表页', True)
- else:
- print(f'[{self.app_current_time()}] 回退到列表页失败')
- print(
- f"[程序启动时间:{self.program_start_time}-----结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品")
- self.finish_task_abnormally(current_page, "回退到列表页失败,结束采集")
- return
- time.sleep(self.get_sleep_time())
- spider_no += 1
- if self.is_max_count_reached():
- completed_normally = self.finish_task_with_max_count(current_page)
- stop_by_max_count = True
- break
- except Exception as e:
- self.loggerPdd.error(f'采集药品详情数据出错:{e}')
- if not self.back_to_list_page():
- self.finish_task_abnormally(current_page, '采集药品详情数据出错且无法回到列表页,结束采集')
- return
- else:
- continue
- if stop_by_max_count:
- break
- if self.end_page is not None and current_page >= self.end_page:
- completed_normally = self.finish_task_normally(
- current_page,
- f"已采集到结束页 {self.end_page},结束任务"
- )
- break
- if self.d(textStartsWith="抱歉,没有更多商品啦~").exists:
- completed_normally = self.finish_task_normally(current_page, '已经到达列表页最底部')
- break
- print('开始滑入下一页')
- end_y = 300
- self.d.swipe(200, 1400, 200, end_y, 0.4)
- time.sleep(self.get_sleep_time())
- current_page += 1
- self.page = current_page
- if completed_normally:
- self.clear_progress_file()
- elif not self.finish_reported:
- self.finish_task_abnormally(current_page, "采集流程异常结束")
- return completed_normally
- def run_device(device_id):
- """单个设备的采集任务(运行于独立线程)"""
- if device_id not in device_list:
- logging.error(f"设备id没有配置: {device_id}")
- return
- tasks = device_list[device_id]
- logging.info(f"[设备 {device_id}] 开始执行,共 {len(tasks)} 个任务")
- for task in tasks:
- cycle_no = 0
- while True:
- cycle_no += 1
- pdd = None
- logging.info(f'[设备 {device_id}] ========== {task.get("search_key")} 第 {cycle_no} 轮采集开始 ==========')
- try:
- pdd = PDD(
- task["search_key"],
- device_id,
- title_key=task.get("title_key"),
- spec_list=task.get("spec_list"),
- brand=task.get("brand", ""),
- save_search_key=task.get("save_search_key"),
- start_page=task.get("start_page", 1),
- end_page=task.get("end_page"),
- max_counts_limit=task.get("max_counts_limit", DEFAULT_MAX_COUNTS_LIMIT),
- direct_shop_lookup=task.get("direct_shop_lookup", False),
- sort=task.get("sort", "升序"),
- platform=task.get("platform"),
- task_id=task.get("task_id"),
- enterprise_id=task.get("enterprise_id"),
- )
- pdd.main(device_id, 1, 0)
- logging.info(f'[设备 {device_id}] 关键字 {task.get("search_key")} 本轮采集完成')
- break
- except Exception as e:
- logging.exception(f'[设备 {device_id}] 关键字 {task.get("search_key")} 采集异常:{e}')
- finally:
- if pdd and hasattr(pdd, 'close'):
- pdd.close()
- logging.info(f"[设备 {device_id}] 所有任务执行完毕")
- def run_device_list_tasks(device_ids=None):
- if device_ids is None:
- device_ids = []
- if not device_ids:
- device_ids = list(device_list.keys())
- logging.info("未指定设备ID,默认运行 device_lisX`t 中全部设备")
- invalid_ids = [did for did in device_ids if did not in device_list]
- if invalid_ids:
- logging.error(f"以下设备ID未配置: {invalid_ids}")
- return
- if not device_ids:
- logging.warning("device_list 为空,没有可运行的设备")
- return
- logging.info(f"将运行指定的设备: {device_ids}")
- with ThreadPoolExecutor(max_workers=len(device_ids)) as executor:
- futures = [executor.submit(run_device, did) for did in device_ids]
- for future in futures:
- future.result()
- # pdd
- def main(use_db_task_source=USE_DB_TASK_SOURCE):
- if use_db_task_source:
- logging.info(f"PDD 调度器启动,轮询间隔 {SCHEDULER_INTERVAL_SECONDS} 秒")
- dispatch_pending_tasks()
- schedule_dispatch(SCHEDULER_INTERVAL_SECONDS)
- scheduler_stop_event.wait()
- return
- device_ids = sys.argv[1:]
- run_device_list_tasks(device_ids)
- device_list = {
- "ZDQWUSSWBEDI896T": [
- {
- "search_key": "999小儿感冒颗粒", # 必填
- "title_key": "小儿感冒颗粒",
- "spec_list": ["6g*24"], # 列表可以,代码会自动归一化
- "brand": "999",
- "save_search_key": "小儿感冒颗粒",
- "start_page": 0,
- "end_page": 200,
- "max_counts_limit": 300,
- "sort": "升序",
- },
- ],
- # "fcb3c749": [
- # {
- # "search_key": "999小儿感冒颗粒", # 必填
- # "title_key": "小儿感冒颗粒", #作为筛选依据
- # "spec_list": ["6g*24"], # 列表可以,代码会自动归一化
- # "brand": "999",
- # "save_search_key": "小儿感冒颗粒",
- # "start_page": 0,
- # "end_page": 200,
- # "max_counts_limit": 300,
- # "sort": "升序",
- # },
- # ],
- # "2e58510": [
- # {
- # "search_key": "天和追风膏", # 必填
- # "title_key": "天和追风膏",
- # "spec_list": ["7cm*10cm*8"], # 列表可以,代码会自动归一化
- # "brand": "天和",
- # "save_search_key": "天和追风膏",
- # "start_page": 0,
- # "end_page": 200,
- # "max_counts_limit": 300,
- # "sort": "升序",
- # },
- # ],
- # "ZDQWUSSWBEDI896T":[
- # {
- # "search_key": "维生素D滴剂",
- # "title_key": "维生素D滴剂",
- # "spec_list": ["40粒"],
- # "brand": "澳诺",
- # "save_search_key": "维生素D滴剂",
- # "start_page": 0,
- # "end_page": 200,
- # "max_counts_limit": 300,
- # "sort": "升序",
- # },
- # ],
- # "U47HZDRG8XJBBURW": [
- # {
- # "search_key": "维生素D滴剂",
- # "title_key": "维生素D滴剂",
- # "spec_list": ["10粒"],
- # "brand": "澳诺",
- # "save_search_key": "维生素D滴剂",
- # "start_page": 0,
- # "end_page": 200,
- # "max_counts_limit": 300,
- # "sort": "升序",
- # },
- # ],
- }
- if __name__ == '__main__':
- main()
|