import requests import base64 import uiautomator2 as u2 import time import sys import subprocess import re import random import json from aip import AipOcr import numpy as np import cv2 import os from pdd_config import Config import logging from logger import setup_logger import pymysql from 拼多多盒数处理脚本.main import extract_box_number import datetime import threading setup_logger("pdd_spider") # 初始化日志 # 功能:这个模块负责从数据库拉取拼多多待执行任务,把任务分发到空闲设备, # 然后在单设备线程中驱动 App 完成搜索、采集、校验、去重和落库。 # 边界:这里主要做调度和采集流程编排,不负责数据库表结构定义,也不负责 # OCR 服务、滑块识别服务或盒数提取逻辑本身的实现,它们都通过外部依赖完成。 # 功能:创建新的 MySQL 连接,供调度查询和采集落库逻辑复用。 # 返回:返回可直接用于 cursor/commit/rollback 的连接对象。 # 副作用/失败:连接关闭由调用方负责;如果连接失败会由上层调用点捕获异常。 def get_mysql(): return pymysql.connect( host='120.24.49.2', # 修改后的主机 port=3306, # 添加端口号 user='drug_retrieve', # 修改后的用户名 password='ksCt3xm6chzdkafj', # 修改后的密码 db='drug_retrieve', # 修改后的数据库名 charset='utf8mb4' ) def get_shop_mysql(): return pymysql.connect( host='120.24.49.2', # 修改后的主机 port=3306, # 添加端口号 user='drug_retrieve', # 修改后的用户名 password='ksCt3xm6chzdkafj', # 修改后的密码 db='drug_retrieve', # 修改后的数据库名 charset='utf8mb4' ) SCHEDULER_INTERVAL_SECONDS = 600 PLATFORM_PDD = 3 TASK_STATUS_PENDING = 1 DEVICE_STATUS_IDLE = 0 DEFAULT_MAX_COUNTS_LIMIT = 300 # 这些集合只表示“当前进程里的占用状态”。 # 数据库里设备仍可能显示空闲,因此调度前后都要结合这几份内存状态做去重。 dispatch_lock = threading.Lock() running_task_ids = set() running_device_ids = set() worker_threads = {} scheduler_stop_event = threading.Event() scheduler_timer = None # 功能:把外部输入安全转成整数;当值为空或格式不合法时回退到默认值。 # 输入约束:允许传入 None、空字符串、数字字符串或整数。 # 返回:成功时返回 int,失败时返回 default。 def parse_optional_int(value, default=None): if value in (None, ""): return default try: return int(value) except (TypeError, ValueError): return default def fetch_pending_tasks(): # 功能:查询数据库里当前仍处于待执行状态的拼多多任务。 # 返回:按任务 id 升序排列的任务列表,便于旧任务优先执行。 # 调用 get_mysql() 的目的是先建立查询连接,后续 cursor.execute() 依赖这个连接对象可用。 conn = None try: conn = get_mysql() with conn.cursor() as cursor: sql = """ SELECT * FROM retrieve_collect_task_allocate WHERE platform = %s AND status = %s ORDER BY id ASC """ cursor.execute(sql, (PLATFORM_PDD, TASK_STATUS_PENDING)) return cursor.fetchall() except Exception as e: logging.exception(f"读取待执行任务失败: {e}") return [] finally: if conn: conn.close() def fetch_idle_device_by_equipment_id(equipment_id): # 功能:按设备 id 查询指定终端是否空闲,避免任务和设备错配。 # 调用 get_mysql() 的目的是查询指定设备是否空闲,避免把任务错误派发到其他终端。 conn = None try: conn = get_mysql() with conn.cursor() as cursor: sql = """ SELECT * FROM retrieve_collect_equipment WHERE name LIKE %s AND id = %s AND status = %s LIMIT 1 """ cursor.execute(sql, ('%pdd%', equipment_id, DEVICE_STATUS_IDLE)) return cursor.fetchone() except Exception as e: logging.exception(f"读取空闲设备失败 equipment_id={equipment_id}: {e}") return None finally: if conn: conn.close() def build_task_payload(task_row, device_row): # 功能:把数据库原始任务行和设备行整理成线程入口可直接消费的任务上下文。 # 返回:统一字段名的字典,避免后续线程逻辑继续依赖固定列下标。 start_page = parse_optional_int(task_row[9] if len(task_row) > 9 else None, 0) end_page = parse_optional_int(task_row[10] if len(task_row) > 10 else None, None) max_counts_limit = parse_optional_int( task_row[11] if len(task_row) > 11 else None, DEFAULT_MAX_COUNTS_LIMIT ) return { "task_id": task_row[0], "equipment_id": task_row[2], "enterprise_id": task_row[3], "platform": task_row[4], "title_key": task_row[5], "spec_list": task_row[6], "brand": task_row[7], "search_key": f"{task_row[7]}{task_row[5]}", "save_search_key": f"{task_row[7]}{task_row[5]}", "start_page": start_page, "end_page": end_page, "max_counts_limit": max_counts_limit, "sort": "升序", "device_id": device_row[2], "task_row": task_row, } def fetch_runnable_task_payloads(): # 功能:从待执行任务中筛出“数据库层面空闲 + 当前进程未占用”的任务集合。 # 返回:可以直接交给 worker 线程执行的 payload 列表。 tasks = fetch_pending_tasks() if not tasks: logging.info("当前没有待执行任务") return [] payloads = [] # 单次轮询内也要避免一个设备被多个任务重复命中。 reserved_equipment_ids = set() for task_row in tasks: task_id = task_row[0] equipment_id = task_row[2] with dispatch_lock: if task_id in running_task_ids: continue if equipment_id in reserved_equipment_ids: continue device_row = fetch_idle_device_by_equipment_id(equipment_id) if not device_row: logging.info(f"任务 {task_id} 对应设备 {equipment_id} 当前不空闲,跳过本轮") continue device_id = device_row[2] with dispatch_lock: if device_id in running_device_ids: logging.info(f"设备 {device_id} 已在本进程执行任务,跳过任务 {task_id}") continue # 线程真正启动前先登记占用,缩小轮询竞争窗口。 running_task_ids.add(task_id) running_device_ids.add(device_id) reserved_equipment_ids.add(equipment_id) payloads.append(build_task_payload(task_row, device_row)) return payloads def cleanup_finished_workers(): # 功能:同步 worker_threads 字典,只保留仍然存活的线程引用,避免失效线程长期占位。 dead_threads = [] with dispatch_lock: for device_id, thread in worker_threads.items(): if not thread.is_alive(): dead_threads.append(device_id) for device_id in dead_threads: worker_threads.pop(device_id, None) def run_task_worker(task_payload): # 功能:单个任务线程的主入口,负责构建 PDD 实例、执行采集并在异常时兜底收尾。 task_id = task_payload["task_id"] device_id = task_payload["device_id"] pdd = None try: logging.info(f"[任务 {task_id}] 开始执行,设备: {device_id}") print(task_payload) # 调用 PDD(...) 的目的是把本次任务涉及的筛选条件、页码边界和设备信息固化到实例状态里, # 后续 main()、详情页解析和落库逻辑都会依赖这些实例属性。 pdd = PDD( task_payload["search_key"], device_id, title_key=task_payload.get("title_key"), spec_list=task_payload.get("spec_list"), brand=task_payload.get("brand", ""), save_search_key=task_payload.get("save_search_key"), start_page=task_payload.get("start_page"), end_page=task_payload.get("end_page"), max_counts_limit=task_payload.get("max_counts_limit"), direct_shop_lookup=task_payload.get("direct_shop_lookup", False), sort=task_payload.get("sort"), platform=task_payload.get("platform"), task_id=task_payload.get("task_id"), enterprise_id=task_payload.get("enterprise_id"), ) # 这里调用 pdd.main(),是为了把调度层切换到具体采集流程; # 调用结果会决定当前任务是按“正常完成”还是“已结束/异常”记录日志。 completed_normally = pdd.main(device_id, 1, 0) if completed_normally: logging.info(f"[任务 {task_id}] 执行完成,设备: {device_id}") else: logging.info(f"[任务 {task_id}] 已结束,设备: {device_id}") except Exception as e: end_page = task_payload.get("start_page") if pdd is not None: end_page = getattr(pdd, "page", end_page) pdd.finish_task_abnormally(end_page, f"任务执行异常: {e}") else: report_api(task_id, end_page=end_page, start=4, end_time=int(time.time()),finish_status=0) logging.exception(f"[任务 {task_id}] 执行异常,设备: {device_id},错误: {e}") finally: # 无论任务正常还是异常结束,都必须释放进程内占用标记。 with dispatch_lock: running_task_ids.discard(task_id) running_device_ids.discard(device_id) worker_threads.pop(device_id, None) def dispatch_pending_tasks(): # 功能:执行一轮派单,把每个可运行任务绑定到对应设备线程。 # 阶段 1:先清理已经结束的线程引用,避免占用状态过期。 cleanup_finished_workers() # 阶段 2:重新计算本轮真正可运行的任务集合。 task_payloads = fetch_runnable_task_payloads() if not task_payloads: return for task_payload in task_payloads: device_id = task_payload["device_id"] try: # 阶段 3:为每个任务创建独立线程,让不同设备可以并发执行采集。 thread = threading.Thread( target=run_task_worker, args=(task_payload,), daemon=True, name=f"pdd-{device_id}", ) with dispatch_lock: worker_threads[device_id] = thread thread.start() logging.info(f"[任务 {task_payload['task_id']}] 已分发到设备 {device_id}") except Exception: with dispatch_lock: # 线程创建失败时同步回滚占用状态,避免任务被“卡住”。 running_task_ids.discard(task_payload["task_id"]) running_device_ids.discard(device_id) worker_threads.pop(device_id, None) raise def schedule_dispatch(delay_seconds=SCHEDULER_INTERVAL_SECONDS): # 功能:安排下一次轮询触发时间。 # 功能:注册下一轮调度定时器,让调度器按固定间隔持续轮询。 global scheduler_timer if scheduler_stop_event.is_set(): return # 采用递归定时器而不是死循环,便于后续统一停机。 scheduler_timer = threading.Timer(delay_seconds, scheduled_dispatch_job) scheduler_timer.daemon = False scheduler_timer.name = "pdd-scheduler" scheduler_timer.start() def scheduled_dispatch_job(): # 功能:执行一次定时派单,并保证下一轮轮询仍会继续注册。 # 功能:包装一次定时调度执行,确保本轮出错也不会中断后续轮询。 try: dispatch_pending_tasks() except Exception as e: logging.exception(f"PDD 定时调度异常: {e}") finally: schedule_dispatch(SCHEDULER_INTERVAL_SECONDS) def report_api(task_id,page=None,start=None,end_page=None,end_time=None,finish_status=None): # 功能:向外部调度中心上报任务页码、状态和结束信息。 # 外部接口只负责收状态;具体状态值由调用方按当前阶段传入。 # 调用这个接口的目的是把当前任务状态同步给外部调度系统, # 这样任务中心才能感知“开始执行 / 正常结束 / 异常结束”等阶段变化。 params = { "collect_task_allocate_id": task_id, "statr_page":page if page is not None else '', "end_page": end_page if end_page is not None else '', "status": start, "finish_status": finish_status if finish_status is not None else 0, "start_time": int(time.time()), "end_time": end_time if end_time is not None else '', } print(params) url = "http://schedule.dfwy.tech/api/collect_equipment_execute/result_report" res = requests.get(url, params=params, timeout=20) print(res.text) # 获取滑块验证中滑块需要移动的距离 def slide_verify(img_path): # 功能:把本地截图交给第三方打码服务,换回滑块缺口位移。 # 返回:识别成功时返回滑块位移结果,失败时返回服务端给出的空结果。 with open(img_path, 'rb') as f: b = base64.b64encode(f.read()).decode() ## 图片二进制流base64字符串 url = "http://api.jfbym.com/api/YmServer/customApi" data = { ## 关于参数,一般来说有3个;不同类型id可能有不同的参数个数和参数名,找客服获取 "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } # 识别结果来自第三方服务,当前逻辑只读取它约定的 msg/data 字段。 response = requests.request("POST", url, headers=_headers, json=data).json() print(response) if response.get("msg") == "识别成功": # 获取 data 中的 data 字段 result = response.get("data", {}).get("data") if result: print(result) # 输出结果 else: print("无法获取数据") else: print("识别未成功") return result class PDD: # 功能:这个类负责维护单次拼多多采集任务的运行状态,并封装设备连接、 # 页面导航、数据提取、说明书解析、去重校验和写库等操作。 # 边界:这个类负责“如何跑完整个采集流程”,但不负责外部调度器的轮询策略, # 也不负责 OCR/盒数提取等外部依赖的底层实现。 def __init__( self, search_key, device_id, title_key=None, spec_list=None, brand="", save_search_key=None, start_page=0, end_page=None, max_counts_limit=None, direct_shop_lookup=False, sort=None, platform = None, task_id = None, enterprise_id=None, ): # 阶段 1:初始化与 App、OCR、日志、OSS 相关的基础依赖。 self.package_name = 'com.xunmeng.pinduoduo' self.APP_ID = '116857964' self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh' self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3' self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY) self.table_name = "retrieve_scrape_data" # "pdd_drug" self.shop_table_name = "pdd_shop_info_middle" # "pdd_shop_info" self.loggerPdd = logging.getLogger() self.clipboard = "" # 初始化剪切板的内容为空 # 阶段 2:固化本次任务的筛选条件、页码边界和搜索参数。 self.enterprise_id = enterprise_id self.task_id = task_id self.platform = platform self.sort = sort self.sort_key = 0 self.search_key = search_key # 参苓健脾胃颗粒 香砂平胃颗粒 舒肝颗粒 清肺化痰丸 # title_key 支持把“搜索词”和“标题过滤词”拆开;未单独传入时回退到搜索词。 self.title_key = title_key if title_key is not None else search_key # 规格统一整理成列表,后续匹配逻辑就不用再分辨单值、列表和空值三种输入。 self.spec_list = self._normalize_rule_list(spec_list) self.brand = brand self.save_search_key = save_search_key or search_key # 起止页在入口阶段先做边界修正,主循环只消费规范化后的值。 self.start_page = max(parse_optional_int(start_page, 0), 0) self.end_page = parse_optional_int(end_page, None) self.max_counts_limit = max_counts_limit self.direct_shop_lookup = direct_shop_lookup self.unrelated_data = 0 # 无关数据数量 self.device_id = device_id self.page = self.start_page if self.end_page is not None and self.end_page < self.start_page: self.end_page = self.start_page # 阶段 3:初始化运行时统计状态,这些状态会在主循环中持续更新。 # 统计售罄数量 self.sold_out_counts = 0 # 程序启动时间 self.program_start_time = self.app_start_time() # 统计商品数量 # 最大量数据阈值 self.max_counts = 0 # 统计点击商品的次数 self.click_counts = 0 # 商品在列表的位置 self.search_key_loc = 0 # finish_reported 用来保证无论走哪个退出分支,只向调度系统上报一次结束状态。 self.finish_reported = False # oss配置 self.oss_config = { "access_key_id": Config.access_key_id, "access_key_secret": Config.access_key_secret, "endpoint": Config.endpoint, # 例: oss-cn-beijing.aliyuncs.com "bucket_name": Config.bucket_name, "oss_prefix": Config.oss_prefix # OSS中存放截图的前缀(虚拟文件夹) } # 异常处理 def wr_re(self, mod, device_id, sort=None, page=None): # 功能:读写或删除本地进度文件,给断点续跑保留入口。 file_path = f'./ycwj/{device_id}_{self.title_key}.txt' if mod == "写": try: data = { "page": page if page else "", "sort": sort if sort else "", } os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"进度保存成功:{sort},{page}页") except Exception as e: print("保存进度失败") elif mod == "读": try: if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) print(self.sort) if self.sort and self.sort_key == 0: self.li_or_lo(self.sort) if data['page'] != '': progress_page = int(data['page']) self.page = max(progress_page, self.start_page) self.scroll_to_target_page(self.page) else: return None return data except Exception as e: print(f"读取进度失败", e) return None elif mod == "删": try: if os.path.exists(file_path): os.remove(file_path) print(f"进度文件已删除:{file_path}") except Exception as e: print(f"删除进度文件失败:{e}") return None def clear_progress_file(self): # 功能:统一封装“清理断点进度文件”动作;当前保留空实现以兼容旧调用点。 # self.wr_re("删", self.device_id, self.sort) pass def is_max_count_reached(self): # 功能:判断当前采集数量是否达到任务设定上限。 return bool(self.max_counts_limit and self.max_counts >= self.max_counts_limit) def scroll_to_target_page(self, target_page): # 功能:按目标页数执行固定次数滑动,用于恢复上次采集的大致位置。 target_page = int(target_page or 0) if target_page <= 0: return # 这里按“页数约等于滑动次数”的经验规则恢复列表位置,宁可保守,不做复杂校准。 for _ in range(target_page): end_y = 300 self.d.swipe(200, 1400, 200, end_y, 0.4) time.sleep(self.get_sleep_time()) def finish_task_normally(self, end_page, reason): # 功能:以“正常完成”状态结束任务并保证只上报一次。 # 多个退出分支都会走到这里,因此先判断是否已经上报过结束状态。 if not self.finish_reported: report_api(self.task_id, end_page=end_page, start=3, end_time=int(time.time()),finish_status=1) self.finish_reported = True print(reason) return True def finish_task_abnormally(self, end_page, reason, finish_status=0): # 功能:以“异常结束”状态结束任务并保证只上报一次。 # 异常结束与正常结束共享同一幂等保护,避免重复通知外部调度系统。 if not self.finish_reported: report_api( self.task_id, end_page=end_page, start=4, end_time=int(time.time()), finish_status=finish_status ) self.finish_reported = True print(reason) return False def finish_task_with_max_count(self, end_page): # 功能:达到采集上限时复用正常结束逻辑,只替换结束原因。 return self.finish_task_normally( end_page, f"达到最大采集数量 {self.max_counts_limit},当前已采集 {self.max_counts} 条,停止任务" ) # 排序 def li_or_lo(self, key): # 功能:进入搜索结果页后切换价格排序方式。 if key == "升序": self.sort_key += 1 self.d.xpath('//*[@text="价格"]').click() n = self.d.xpath('//*[@text="总价低到高"]') if n.exists: n.click() time.sleep(self.get_sleep_time()) if key == "降序": self.sort_key += 1 self.d.xpath('//*[@text="价格"]').click() n = self.d.xpath('//*[@text="单粒价格低到高"]') if n: n.click() else: self.d.xpath('//*[@text="价格"]').click() # 返回列表页 def back_to_list_page(self): # 功能:通过多次尝试返回键,尽量把页面恢复到商品列表页。 for i in range(10): if self.distinct_target(): return True print(f'第{i}次尝试退回到列表页') self.swipe_back(1) time.sleep(1) print('页面出错,没有退回到列表页') return False def get_drug_lis(self, idx): # 功能:根据当前页布局拿到可点击的商品卡片列表。 # 这里区分 idx==0 和后续页面,是因为首屏与翻页后的 RecyclerView 层级不完全一致。 if idx == 0: drug_lis = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all() else: for i in range(1, 6): drug_lis = self.d.xpath( f'/hierarchy/android.widget.FrameLayout[{i}]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all() if drug_lis: break return drug_lis # 代码运行那时候的时间 def app_current_time(self): # 功能:返回当前时刻的格式化时间字符串,主要用于日志打印。 return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def slide_link(self): # 功能:在分享面板中横向滑动,把“复制链接”附近的目标入口滑到可见区域。 value_tag = None if self.d.xpath('//*[@text="微信"]').exists: value_tag = self.d.xpath('//*[@text="微信"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return if self.d.xpath('//*[@text="朋友圈"]').exists: value_tag = self.d.xpath('//*[@text="朋友圈"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return if self.d.xpath('//*[@text="QQ好友"]').exists: value_tag = self.d.xpath('//*[@text="QQ好友"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return def app_start_time(self): """ 获取app启动时间 :return: """ return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def stop_app(self): # 功能:停止拼多多 App,并等待设备状态稳定下来。 self.d.app_stop(self.package_name) time.sleep(5) def start_app(self): # 功能:启动拼多多 App,并预留加载时间。 self.d.app_start(self.package_name) time.sleep(5) def restart_app(self): """ 重启app :return: """ self.stop_app() # 这里先调用 stop_app(),是为了清掉上一次运行残留的页面状态; # 后续 start_app() 依赖应用已经被完全拉起,搜索流程才能从稳定起点开始。 self.start_app() @staticmethod def get_sleep_time(): # 功能:生成短随机等待时间,减少固定节奏操作带来的页面未加载或风控风险。 return random.randint(1, 2) # return random.randint(5, 8) @staticmethod def get_current_date(): # 功能:返回当前采集日期,作为去重和落库字段使用。 return datetime.datetime.now().strftime('%Y/%m/%d') @staticmethod def _normalize_rule_list(value): # 功能:把单值或集合统一归一成非空字符串列表,供过滤逻辑直接复用。 if value is None: return [] if isinstance(value, (list, tuple, set)): raw_values = value else: raw_values = [value] result = [] for item in raw_values: item_str = str(item).strip() if item_str: result.append(item_str) return result @staticmethod def _normalize_match_text(value): # 功能:把待匹配文本做去空白和小写归一,减少页面文案格式差异带来的误判。 return re.sub(r'\s+', '', str(value or '')).lower() def _match_any_keyword(self, text, keywords): # 功能:判断目标文本是否命中任一过滤词;过滤词为空时直接放行。 keyword_list = self._normalize_rule_list(keywords) if not keyword_list: # 没有配置过滤词时默认放行,让调用方只在需要时开启细筛。 return True normalized_text = self._normalize_match_text(text) return any(self._normalize_match_text(keyword) in normalized_text for keyword in keyword_list) def is_link_spec_useful(self, product_title, specifications=''): # 功能:判断标题或说明书规格里是否包含目标品规。 if not self.spec_list: return True title_text = self._normalize_match_text(product_title) spec_text = self._normalize_match_text(specifications) for spec in self.spec_list: normalized_spec = self._normalize_match_text(spec) if normalized_spec in title_text or normalized_spec in spec_text: return True return False def is_link_useful(self, product_title, specifications=''): # 功能:统一做标题、品牌、规格三层过滤,尽量在早期就排除无关商品。 if not self._match_any_keyword(product_title, self.title_key): print(f"当前商品名称:{product_title} 不包含{self.title_key}关键字") return False if not self._match_any_keyword(product_title, self.brand): print(f"当前商品名称:{product_title} 不包含{self.brand}品牌") return False if not self.is_link_spec_useful(product_title, specifications): print(f"当前商品名称:{product_title} 不包含{self.spec_list}品规") return False return True def remove_watermark(self, img_path): # 功能:弱化截图中的水印或遮罩,提升后续 OCR 识别成功率。 """ 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据 :param img_path: 图片路径 :return: 二进制图片数据 """ img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) endswith = os.path.splitext(img_path)[1] new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8) _, img_binary = cv2.imencode(endswith, new) return img_binary def get_shop_name(self): # 功能:优先从当前详情页直接提取店铺名,失败时再进入店铺页兜底提取。 """ 获取店铺名 :return: """ try: xpath = '//*[@text="进店"]/preceding-sibling::android.view.ViewGroup/android.widget.LinearLayout/android.widget.TextView' # 优先从当前详情页直接读取店铺名,成本最低。 if self.d.xpath(xpath).exists: shop_name = self.d.xpath(xpath).text self.loggerPdd.info(f'1-获取到店铺名:{shop_name}') else: # 进入店铺新页面 # 当前页取不到时,再进入店铺页做兜底提取。 shop_btn_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]' if self.d.xpath(shop_btn_xpath).exists: self.d.xpath(shop_btn_xpath).click() time.sleep(1) # self.d.xpath('//*[@text="店铺"]').click() xpath_shop_name = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.TextView[1]' if self.d.xpath(xpath_shop_name).exists: shop_name = self.d.xpath(xpath_shop_name).text self.loggerPdd.info(f'2-获取到店铺名:{shop_name}') else: shop_name = '' self.loggerPdd.info(f'3-获取到店铺名:{shop_name}') self.swipe_back(1) # else: shop_name = '' self.loggerPdd.info('4-因为shop_btn_xpath不存在,获取到店铺名为空') # time.sleep(10000) return shop_name except Exception as e: print(f'获取店铺名出错:{e}') self.loggerPdd.error(f'获取店铺名出错:{e}') return None def save_to_shop_database(self, data): # 功能:把当前商品采集结果落库;只有 commit 成功后才计入采集数量。 print(f'保存店铺数据到店铺数据库:{data}') max_retries = 5 # 数据库偶发抖动时允许短重试,但只有 commit 成功后才算真正采集到一条数据。 for attempt in range(max_retries): conn = None try: conn = get_mysql() with conn.cursor() as cur: add_sql = """ INSERT INTO pdd_shop_info_middle ( shop, contact_address, qualification_number, business_license_company, business_license_address, store_url, scrape_date, platform, province,city, create_time, update_time ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ cur.execute(add_sql, ( data['shop'], None, None, None, None, data['store_url'], data['scrape_date'], data['platform'], None, None, data['create_time'], data['update_time'], )) conn.commit() self.max_counts += 1 print(f"存入数据库成功") return True except Exception as e: print(f'保存数据库异常 (尝试 {attempt + 1}/{max_retries}): {e}') if conn: conn.rollback() conn.close() if attempt == max_retries - 1: print("达到最大重试次数,保存失败") return False time.sleep(2) def save_to_database(self, data): # 功能:把当前商品采集结果落库;只有 commit 成功后才计入采集数量。 print(f'保存数据到数据库:{data}') max_retries = 5 # 数据库偶发抖动时允许短重试,但只有 commit 成功后才算真正采集到一条数据。 for attempt in range(max_retries): conn = None try: conn = get_mysql() with conn.cursor() as cur: add_sql = """ INSERT INTO retrieve_scrape_data ( enterprise_id, platform_id, platform_item_id, province_id, city_id, province_name, city_name, area_info, product_name, product_specs, one_box_price, manufacture_date, expiry_date, manufacturer, approval_number, is_sold_out, online_posting_count, continuous_listing_count, link_url, store_name, store_url, shipment_province_id, shipment_province_name, shipment_city_id, shipment_city_name, company_name, qualification_number, scrape_date, min_price, number, sales, inventory, snapshot_url ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ cur.execute(add_sql, ( data['enterprise_id'], data['platform_id'], data['platform_item_id'], data['province_id'], data['city_id'], data['province_name'], data['city_name'], data['area_info'], data['product_name'], data['product_specs'], data['one_box_price'], data['manufacture_date'], data['expiry_date'], data['manufacturer'], data['approval_number'], data['is_sold_out'], data['online_posting_count'], data['continuous_listing_count'], data['link_url'], data['store_name'], data['store_url'], data['shipment_province_id'], data['shipment_province_name'], data['shipment_city_id'], data['shipment_city_name'], data['company_name'], data['qualification_number'], data['scrape_date'], data['min_price'], data['number'], data['sales'], data['inventory'], data['snapshot_url'], )) conn.commit() self.max_counts += 1 print(f"存入数据库成功,当前已采集 {self.max_counts} 条") return True except Exception as e: print(f'保存数据库异常 (尝试 {attempt + 1}/{max_retries}): {e}') if conn: conn.rollback() conn.close() if attempt == max_retries - 1: print("达到最大重试次数,保存失败") return False time.sleep(2) def click_target_product_by_search_key(self, fuzzy_match=False, timeout=10): # 功能:在列表页重新定位当前搜索词对应的商品,常用于异常恢复后的重新对焦。 """ 动态匹配self.search_key对应的商品并点击 :param fuzzy_match: 是否模糊匹配(应对商品名带额外后缀/前缀的情况) 不模糊匹配 :param timeout: 等待元素出现的超时时间(秒) :return: 点击是否成功(bool) """ try: # 1. 定义定位条件(动态使用self.search_key) # 异常恢复后需要重新找到“当前任务真正想点的那一个商品”, # 这里支持精确和模糊两种定位策略。 if fuzzy_match: # 模糊匹配:包含search_key即可(推荐,适配搜索结果商品名略有差异) locator = self.d(textContains=self.search_key) print(f"🔍 模糊匹配商品:包含「{self.search_key}」的元素") else: # 精确匹配:商品名与search_key完全一致 locator = self.d(text=self.search_key) print(f"🔍 精确匹配商品:「{self.search_key}」") # 2. 等待元素出现(核心:避免元素未加载就点击) if locator.wait(timeout=timeout): print(f"✅ 找到匹配的商品,准备点击") # 执行点击(优先点击可点击的元素) locator.click() print(f"✅ 成功点击「{self.search_key}」对应的商品") # 点击后等待页面加载 time.sleep(self.get_sleep_time()) return True else: print(f"❌ 滑动后仍未找到「{self.search_key}」对应的商品") return False except Exception as e: print(f"❌ 点击「{self.search_key}」对应商品时异常:{e}") return False def swipe_down(self): # 功能:执行带随机性的向下滑动,兼顾页面恢复、回找搜索框和设备适配。 """ 下滑(模拟真人操作,抗风控+设备适配+容错) 核心:起点在屏幕上方,终点在屏幕下方(和上滑相反) :return: None """ try: # 1. 获取屏幕尺寸(兼容不同设备,给默认值避免获取失败) screen_width = self.d.info.get('displayWidth', 1080) # 默认1080px宽度 screen_height = self.d.info.get('displayHeight', 2400) # 默认2400px高度 # 2. 随机滑动时长(0.1~0.3秒,避免固定值被风控,且不设0秒) duration_rate = random.uniform(0.1, 0.3) # 3. 计算滑动坐标(用屏幕比例,适配所有设备) start_x = screen_width // 2 # 水平居中(和上滑一致,符合真人操作习惯) start_y = screen_height * 0.2 # 起点:屏幕20%高度(上方偏下) end_y = screen_height * 0.8 # 终点:屏幕80%高度(下方偏上) # 强制确保起点y < 终点y(必为向下滑,避免逻辑错误) start_y, end_y = min(start_y, end_y - 10), max(end_y, start_y + 10) # 4. 核心向下滑动操作 self.d.swipe(start_x, start_y, start_x, end_y, duration=duration_rate) # 滑动后全局等待(确保页面加载,避免元素定位失败) time.sleep(self.get_sleep_time()) except Exception as e: # 异常捕获:避免设备断开/滑动失败导致程序崩溃 print(f"向下滑动失败:{e}") # 兜底方案:用固定坐标重试(适配主流1080x2400设备) self.d.swipe(540, 480, 540, 1920, duration=0.2) time.sleep(self.get_sleep_time()) def swipe_up(self): # 功能:执行向上滑动,用于翻页或继续向下浏览详情。 """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) def swipe_back(self, no): # 功能:按指定次数执行返回,但只有当前不在列表页时才真正后退。 """ 返回 :param no: 回退次数 :return: """ if not self.distinct_target(): for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) def drug_price(self): # 功能:直接从详情页读取价格,作为规格弹窗取价失败时的兜底方案。 """ 获取药品价格 :return: """ try: xpath = '//*[@text="¥"]/following-sibling::android.widget.TextView[1]' price_str = self.d.xpath(xpath).text price = float(re.search(r'[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return float(price) except Exception as e: print(f'提取价格出错-->{e}') return None def drug_price_ex(self): # 功能:优先从规格选择弹窗里同时提取价格和“已选规格”文本。 price_str = '' # 价格初始化 ext = '' # 初始化已选择的信息 price = '' # 阶段 1:先尝试打开规格/品规弹窗,因为后续价格和规格文本都依赖这个弹窗内容。 # 这是点击进入品规的按钮 button_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]' button_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]' # 调试 # test_button = self.d.xpath(button_xpath_1).exists # print(test_button) # test_button_2 = self.d.xpath(button_xpath_2).exists # print(test_button_2) # time.sleep(1000) # if self.d.xpath('//*[@text="发起拼单"]').exists: # self.d.xpath('//*[@text="发起拼单"]').click() # elif self.d.xpath('//*[@text="去复诊开药"]').exists: # self.d.xpath('//*[@text="去复诊开药"]').click() if self.d.xpath(button_xpath_1).exists: self.d.xpath(button_xpath_1).click() elif self.d.xpath(button_xpath_2).exists: self.d.xpath(button_xpath_2).click() else: print("button1 and button_2 all not exist") return price, ext # 阶段 2:根据不同弹窗布局选择对应的 XPath 解析策略。 select_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]' select_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]' select_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()]' select_xpath_3_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()-1]' price_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]' price_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]' price_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]//android.widget.TextView[1]' if self.d.xpath(select_xpath_1).exists: text1 = self.d.xpath(select_xpath_1).text print(f"select_xpath_1--text1={text1}") # 这里先判断是否已经有默认规格,是为了减少额外点击; # 如果已经存在“已选”文本,后续可以直接读取价格和规格。 if '已选' in text1: if self.d.xpath(price_xpath_1).exists: price_str = self.d.xpath(price_xpath_1).text print(f"select_xpath_1--price_str-1={price_str}") else: print("select_xpath_1--price_xpath_1-1 not exist") ext = text1 elif '请选择' in text1: # 调用 click() 的目的是补齐一次规格选择动作, # 调用后价格文本和“已选规格”文案才会稳定刷新出来。 # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' scroll_xpath_2 = '' if self.d.xpath(scroll_xpath_1).exists: self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格会刷新 if self.d.xpath(select_xpath_1).exists: text2 = self.d.xpath(select_xpath_1).text if '已选' in text2: print(f"select_xpath_1--已选择2:text2={text2}") if self.d.xpath(price_xpath_1).exists: price_str = self.d.xpath(price_xpath_1).text print(f"select_xpath_1--price_str-2={price_str}") else: print("select_xpath_1--price_xpath_1-2 not exist") ext = text2 else: print("select_xpath_1--scroll_xpath_1 not exist") elif self.d.xpath(select_xpath_2).exists: text1 = self.d.xpath(select_xpath_2).text print(f"xpath2--text1={text1}") if '已选' in text1: ext = text1 if self.d.xpath(price_xpath_2).exists: price_str = self.d.xpath(price_xpath_2).text print(f"select_xpath_2--price_str-2={price_str}") else: print("select_xpath_2--price_xpath_2-1 not exist") elif '请选择' in text1: # 当前布局下如果不先选择一个规格,后续既拿不到准确价格,也无法计算盒数。 print('come in here') # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' if self.d.xpath(scroll_xpath_1).exists: print("scroll_xpath_1 exists") self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_2).exists: text2 = self.d.xpath(select_xpath_2).text if '已选' in text2: ext = text2 print(f"select_xpath_2--已选择2:text2={text2}") if self.d.xpath(price_xpath_2).exists: price_str = self.d.xpath(price_xpath_2).text print(f"select_xpath_2--price_str-2={price_str}") else: print("select_xpath_2--price_xpath_2-2 not exist") else: print("scroll_xpath_1 not exists") else: print("not exist 请选择 or 已选") elif self.d.xpath(select_xpath_3).exists: text1 = self.d.xpath(select_xpath_3).text print(f"xpath3--text1-1={text1}") if ('请选择' not in text1) and ('已选' not in text1): text1 = self.d.xpath(select_xpath_3_2).text print(f"xpath3--text1-2={text1}") if '已选' in text1: ext = text1 if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-3-1={price_str}") else: print("select_xpath_3--price_xpath_3-3-1 not exist") elif '请选择' in text1: # 这一支兼容另一类规格弹窗结构,核心目标仍然是先拿到“已选”文本。 print('come in here') # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' recycler_view_xpath = '//*[@resource-id="android:id/content"]//android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' if self.d.xpath(scroll_xpath_1).exists: print("scroll_xpath_1 exists") self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_3).exists: text2 = self.d.xpath(select_xpath_3).text if '已选' in text2: ext = text2 print(f"select_xpath_3--已选择2:text2={text2}") if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-2={price_str}") else: print("select_xpath_3--price_xpath_3-3-2 not exist") elif self.d.xpath(recycler_view_xpath).exists: self.d.xpath(recycler_view_xpath).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_3).exists: text2 = self.d.xpath(select_xpath_3).text if '已选' in text2: ext = text2 print(f"select_xpath_3--已选择2:text2={text2}") if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-3={price_str}") else: print("select_xpath_3--price_xpath_3-3-3 not exist") else: print("scroll_xpath_1 not exists") else: print(f"xpath3--text1-不包含请选择和已选择") else: print("select_xpath_1 and select_xpath_2 and select_xpath_3 all not exist") # 阶段 3:从界面文案中抽取纯价格值,供后续去重和单盒价格计算。 if price_str: # price = float(re.search('[\d\.]+', price_str).group()) match = re.search(r'¥([\d\.]+)', price_str) if match: price = float(match.group(1)) else: price = '' # price = float(re.search(r'¥([\d\.]+)', price_str).group(1)) print(f'获取到价格:{price}') print(f"ext={ext}") # 调用 swipe_back() 的目的是把页面从规格弹窗恢复回商品详情页, # 后续提取店铺名、链接和说明书都依赖当前仍停留在详情页。 self.swipe_back(1) # return price, ext def restart_uiautomator_services(self, device_id): # 功能:重启设备上的 atx-agent/uiautomator 服务,恢复自动化控制能力。 """ 重启atx的uiautomator 服务 :param device_id: :return: """ stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop' start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d' subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) def connect_devices(self, device_id): # 功能:建立 USB 设备连接,并把自动化服务重置到可用状态。 """ 连接设备 :return: """ try: self.d = u2.connect_usb(device_id) # 设置隐形等待时间 # self.d.implicitly_wait(5) # 连上设备后主动重启 atx-agent,减少长时间运行后的控件失效问题。 self.restart_uiautomator_services(device_id) print(f'[{self.program_start_time}]连接到设备:{device_id}') except Exception as e: print(f'{device_id} 连接错误: {e}') raise Exception(e) def get_ocr_res(self, img): # 功能:对截图做去水印后调用百度 OCR,返回识别出的文字结果列表。 try: image = self.remove_watermark(img) res_image = self.client.basicGeneral(image) data = res_image.get('words_result', '') print(f'百度api返回结果:{data}') return data except: return None def get_title(self): # 功能:从商品详情页提取当前标题,作为第一层匹配和落库名称来源。 try: print('开始提取标题') time.sleep(self.get_sleep_time()) title_xpath = '//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]' if self.d.xpath(title_xpath).exists: title = self.d.xpath(title_xpath).info['contentDescription'].strip() else: return None # title = self.d.xpath('//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]').info['contentDescription'].strip() print(f'提取到标题:{title}') return title except Exception as e: print(f'获取标题出错:{e}') return None # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) # match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) # if match: # drugs_name = match.group(1).strip() + match.group(2).strip() # specifications = match.group(3).strip() # print("药品名:", drugs_name) # print("规格:", specifications) # print('完整药名:', drugs_name + specifications) # return drugs_name, specifications # else: # print("没有匹配到预期格式") def enter_shop(self): # 功能:进入店铺页,供后续读取店铺或资质信息时使用。 """ 进店,方便提取资质环境 :return: """ # self.d.xpath('//*[@text="进店"]').click() self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) #店铺去重 def shop_is_exists(self, data): # 功能:按店铺去重校验,避免同类数据重复入库。 # 1. 验证必要字段 # 先校验去重所需字段是否齐全,避免把不完整的数据带到 SQL 条件里。 required_keys = ['shop'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] print(f"缺少必要字段: {', '.join(missing)}") return None shop_value = data.get('shop') if not shop_value or not str(shop_value).strip(): print("shop 字段为空,无法执行去重查询") return False conn = None try: conn = get_mysql() with conn.cursor() as cur: query_sql = """ SELECT * FROM {} WHERE shop = %s LIMIT 1 """.format(self.shop_table_name) cur.execute(query_sql, ( data['shop'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") finally: if conn: conn.close() def get_province_city(self,data): """ 从 pdd_shop_info_middle 表中查询已存在的 province 和 city, 并赋值给 data['province_name'] 和 data['city_name'] """ print("获取店铺营业公司对应的省份和城市") shop_name = data.get('shop') if not shop_name: print("shop 字段为空,无法执行查询") return conn = None try: conn = get_mysql() with conn.cursor() as cur: # 查询 shop_info_middle 表,获取 province 和 city sql = "SELECT province, city FROM pdd_shop_info_middle WHERE shop = %s LIMIT 1" cur.execute(sql, (shop_name,)) result = cur.fetchone() if result: province, city = result data['province_name'] = province if province else '' data['city_name'] = city if city else '' print(f"店铺 {shop_name} 对应的省份和城市为: {province}, {city}") else: print(f"未在 shop_info_middle 表中找到店铺:{shop_name}") # 可根据业务需求设置默认值或保持原样 data['province_name'] = '' data['city_name'] = '' except Exception as e: print(f"查询省市信息失败: {str(e)}") # 异常时也可设置默认空值,避免后续代码因缺少键而报错 data['province_name'] = '' data['city_name'] = '' finally: if conn: conn.close() def data_is_exists(self, data): # 功能:按价格、店铺、日期、平台做去重校验,避免同类数据重复入库。 # 1. 验证必要字段 # 先校验去重所需字段是否齐全,避免把不完整的数据带到 SQL 条件里。 required_keys = ['min_price', 'shop', 'scrape_date', 'platform'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] print(f"缺少必要字段: {', '.join(missing)}") return None conn = None try: conn = get_mysql() with conn.cursor() as cur: query_sql = """ SELECT * FROM {} WHERE min_price = %s AND store_name = %s AND scrape_date = %s AND platform_id = %s """.format(self.table_name) cur.execute(query_sql, ( data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") finally: if conn: conn.close() def get_instructions_data(self): # 功能:在详情页中提取说明书/商品参数区域的关键字段,整理成统一字典。 """ 确定有详情页之后之后,提取所有的详情页数据 :return: """ # 先把页面滚到说明书/参数区域附近,再开始解析键值对。 for i in range(8): if self.d.xpath('//*[@text="品牌"]').exists or self.d.xpath('//*[@text="药品通用名"]').exists: self.d.swipe_ext("up", scale=0.1) print('开始采集详情数据') break self.d.swipe_ext("up", scale=0.5) time.sleep(self.get_sleep_time()) # 阶段 2:进入“查看全部”区域,把折叠的参数信息完整展开。 # 点击查看全部 if self.d.xpath('//*[@text="品牌"]').exists: self.d.xpath('//*[@text="品牌"]').click() else: self.d.xpath('//*[@text="药品通用名"]').click() time.sleep(self.get_sleep_time()) attr = dict() # 阶段 3:批量解析键值对文本,构造说明书字段字典。 # # 获取详情页信息 xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView' ddd = self.d.xpath(xpath).all() for i in range(0, len(ddd), 2): group = ddd[i:i + 2] attr[group[0].text] = group[1].text # 截图获取未获取到的数据 # if not all(i in ['有效期', '生产企业', '批准文号', '药品规格', '产品规格'] for i in attr.keys()): if not all(i in ['有效期', '生产企业', '批准文号', '药品规格'] for i in attr.keys()): # 首轮解析拿不到关键字段时再补一次较短滑动,兼容参数区未完整展示的情况。 self.d.swipe_ext("up", 0.4) time.sleep(self.get_sleep_time()) xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView' ddd = self.d.xpath(xpath).all() for i in range(0, len(ddd), 2): group = ddd[i:i + 2] attr[group[0].text] = group[1].text print(f'当前说明书规格参数:{attr}') res_data = { # "有效期": attr.get('有效期',''), # "生产单位": attr['生产企业'], # "批准文号": attr['批准文号'], # "产品规格": attr.get('药品规格') if attr.get('药品规格', '') else attr.get('药品规格') "有效期": attr.get('有效期', ''), "生产单位": attr.get('生产企业', ''), "批准文号": attr.get('批准文号', ''), "产品规格": attr.get('药品规格', '') } print(f'当前规格参数字典数据:{res_data}') return res_data def has_instructions(self): # 功能:判断当前详情页能否找到说明书/商品详情区域。 """ 是否有详情页 :return:如果有详情页返回True,否则返回False """ # 没有说明书的无法采集具体数据 max_attempts = 12 # 最大尝试次数 attempt = 0 # 当前尝试次数 while attempt < max_attempts: time.sleep(0.5) xpath = '//*[@text="商品详情"]' is_has_instructions = self.d.xpath(xpath).exists if is_has_instructions: return True # 如果找到“商品详情”,则返回True self.d.swipe_ext("up", 0.3) attempt += 1 return False # 如果尝试次数达到最大次数,则返回False def distinct_target(self): # 功能:判断当前页面是否已经回到商品列表页。 # 这里同时检查多个锚点,是为了兼容拼多多不同活动页和不同 UI 版本。 result = False is_position = self.d.xpath('//*[@content-desc="拍照搜索"]').exists is_position2 = self.d.xpath('//*[@text="年货节大促"]').exists is_position3 = self.d.xpath('//*[@text="筛选"]').exists is_position4 = self.d.xpath('//*[@text="回头客常拼"]').exists list_page_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]//android.support.v7.widget.RecyclerView[1]' is_position_new = self.d.xpath(list_page_xpath).exists print(f'is_position_new={is_position_new}') if is_position or is_position2 or is_position3 or is_position4 or is_position_new: result = True return result def enter_target_page(self): # 功能:进入搜索页、输入关键字并恢复排序/页位,为主循环建立起始页面。 # 阶段 1:进入搜索框并提交当前任务的搜索词。 self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]').click() time.sleep(self.get_sleep_time()) self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) self.d.send_keys(self.search_key, clear=True) time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="搜索"]').click() time.sleep(self.get_sleep_time()) # 阶段 2:如果任务要求排序,则在首次进入结果页后先切到目标排序方式。 # 排序只在进入列表后的第一次执行,避免恢复进度时重复切换排序方向。 if self.sort and self.sort_key == 0: self.li_or_lo(self.sort) # progress = self.wr_re("读", self.device_id) progress = None # 阶段 3:如有历史页码,则把列表大致恢复到目标位置。 # 进度恢复逻辑目前停用,但保留按页滑动的入口,便于后续重新启用断点续跑。 if not progress and self.page > 0: self.scroll_to_target_page(self.page) def get_clipboard(self): # 功能:读取设备剪贴板内容,并去掉空值和首尾空白。 self.loggerPdd.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息 clipboard_content = self.d.clipboard if clipboard_content is None: return '' return clipboard_content.strip() def get_product_link(self): # 功能:通过商品详情页的分享入口复制商品链接。 product_link = '' print('开始获取商品链接') content_frame = self.d.xpath('//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]').exists print(content_frame) relative_layout = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]').exists print(relative_layout) relative_layout2 = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]').exists print(relative_layout2) Frame_Layout = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]').exists print(Frame_Layout) ImageView = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]').exists print(ImageView) ImageView2 = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[3]/android.view.View[1]').exists print(ImageView2) # 多种可能的“分享”按钮 # 分享入口在不同商品页布局里位置不稳定,因此保留多套候选 XPath。 dots_xpaths = [ # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]', '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[last()]/android.view.View[1]', # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]', # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[3]/android.widget.ImageView[1]', ] # 阶段 1:遍历候选分享入口,找到当前布局下可点击的“更多/分享”按钮。 max_retry = 5 # 最多尝试次数 # 分享面板偶尔会因为动画或按钮未露出而失败,因此允许多次重试。 for idx in range(1, max_retry + 1): if product_link: # 已经拿到则退出 break for xp in dots_xpaths: if self.d.xpath(xp).exists: # print(f'{idx}-进入分享点点点') self.loggerPdd.info(f'{idx}-进入分享点点点') # 调用 click() 的目的是打开分享面板; # 后续 slide_link() 和“复制链接”点击都依赖分享面板已经展开。 self.d.xpath(xp).click() time.sleep(1) self.loggerPdd.info('开始滑动') # 这里先调用 slide_link(),是为了把“复制链接”按钮滑到当前可见区域。 self.slide_link() time.sleep(0.2) # 调用 click_exists() 的目的是直接触发系统复制动作, # 调用后 get_clipboard() 才有机会读到最新商品链接。 self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) product_link = self.get_clipboard() time.sleep(0.5) self.loggerPdd.info(f'{idx}-商品链接:{product_link}') break # 找到并执行后跳出内层循环 if not product_link and idx < max_retry: time.sleep(0.5) # 最后一次不需要再等待 # time.sleep(100000) return product_link def integrate_data_v2(self): # 功能:在单个商品详情页内完成价格、链接、店铺、说明书、去重和落库的完整聚合流程。 """ 基于入口配置统一校验标题、品牌和品规,替代内部大量硬编码分支。 """ # 阶段 1:先拿价格和标题,并在最早阶段过滤无关商品。 # 价格优先走规格弹窗,因为这里还能顺便拿到已选规格文本。 min_price, ext = self.drug_price_ex() title_info = self.get_title() if not title_info: print('标题获取为空') self.swipe_back(1) return # 先只按标题/品牌做一次粗过滤,尽早淘汰无关商品。 if not self.is_link_useful(title_info): self.swipe_back(1) self.unrelated_data += 1 return # 规格弹窗提价失败时,再回退到详情页直接取价。 if not min_price: min_price = self.drug_price() if not min_price: print('提取价格出错,回退到列表页') self.swipe_back(1) self.unrelated_data += 1 return # 阶段 2:补齐商品链接和店铺信息,这两类字段是后续落库和去重的关键上下文。 product_link = self.get_product_link() time.sleep(2) # 有的页面店铺信息不在首屏,这里按配置决定是否直接读取还是先滑动到店铺区域。 if self.direct_shop_lookup: shop = self.get_shop_name() else: for _ in range(15): if self.d(textStartsWith="进店").exists: print('开始获取店铺名') break self.d.swipe_ext("up", scale=0.3) time.sleep(self.get_sleep_time()) if self.d(textStartsWith="进店").exists: print('可以开始获取店铺名') shop = self.get_shop_name() if not shop: print('当前店铺名称为空') self.swipe_back(1) self.unrelated_data += 1 return scrape_date = self.get_current_date() dup_data = { 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '3' } # 同一天同店铺同价格的数据视为重复,避免重复入库。 if self.data_is_exists(dup_data): print('存在相同数据不入库') self.back_to_list_page() return shop_data = { 'shop': shop, 'store_url': product_link, 'scrape_date': scrape_date, 'platform': '拼多多', 'create_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } province_name = '' city_name = '' # 插入店铺数据 if self.shop_is_exists(shop_data): print("店铺数据已存在,进行省市回填") self.get_province_city(shop_data) province_name = shop_data['province_name'] city_name = shop_data['city_name'] else: print(f"店铺数据不存在,插入{self.shop_table_name}店铺表") self.save_to_shop_database(shop_data) # 阶段 3:确认是否存在说明书页,并在有说明书时补提取规格、生产单位和批准文号。 is_has_instructions = self.has_instructions() self.loggerPdd.info(f'是否有说明书:{is_has_instructions}') manufacture_date = '' credit_code = ext # 说明书页不是每个商品都有;没有时允许继续落库,只是相关字段留空。 if is_has_instructions: try: instructions_info = self.get_instructions_data() expiry_date = instructions_info['有效期'].strip('。') manufacturer = instructions_info['生产单位'].strip('。') approval_number = instructions_info['批准文号'].strip('。') specifications = instructions_info['产品规格'].strip('。') except Exception as e: print(f'获取详情页规格参数出错:{e}') self.swipe_back(2) return else: expiry_date = '' manufacturer = '' approval_number = '' specifications = '' # 二次校验把说明书里的规格也纳入判断,避免标题模糊匹配带来误采。 if not self.is_link_useful(title_info, specifications): self.swipe_back(1) self.unrelated_data += 1 return self.unrelated_data = 0 # 箱数提取失败时不阻断主流程,只把单盒价格回退为 0。 if extract_box_number(credit_code): one_box_price = min_price / extract_box_number(credit_code) else: print("单瓶药品价格没处理成功") one_box_price = 0 # 阶段 4:把当前详情页提取结果整理成统一落库结构。 save_data = { 'enterprise_id': self.enterprise_id, 'platform_id': self.platform, 'platform_item_id': '', 'province_id': 0, 'city_id': 0, 'province_name': province_name, 'city_name': city_name, 'area_info': "", 'product_name': title_info, 'product_specs': specifications, 'one_box_price': one_box_price, 'manufacture_date': manufacture_date, 'expiry_date': expiry_date, 'manufacturer': manufacturer, 'approval_number': approval_number, 'is_sold_out': 0, 'online_posting_count': 1, 'continuous_listing_count': 1, 'link_url': product_link, 'store_name': shop, 'store_url': '', 'shipment_province_id': 0, 'shipment_province_name': "", 'shipment_city_id': 0, 'shipment_city_name': "", 'company_name': "", 'qualification_number': "", 'scrape_date': scrape_date, 'min_price': min_price, 'number': 0, 'sales': "", 'inventory': "", 'snapshot_url': "", 'insert_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } # 调用 save_to_database() 的目的是把当前已经校验通过的数据立即持久化, # 避免后续页面跳转、返回或异常中断导致采集结果丢失。 self.save_to_database(save_data) def main(self, device_id, search_key_length, keyword_idx): # 功能:执行单设备的完整采集主循环,直到达到结束页、采集上限或异常退出条件。 completed_normally = False stop_by_max_count = False spider_no = 0 current_page = self.page # 阶段 1:建立设备连接,准备进入搜索页面。 self.connect_devices(device_id) time.sleep(self.get_sleep_time()) # 第一个关键字会重启 App 并重新进入搜索页,后续关键字只复用当前输入框。 if keyword_idx == 0: print("搜索前,先重启APP") self.restart_app() # 搜索关键字 self.enter_target_page() else: print("清空前面的文字,再输入关键词") self.d.send_keys(self.search_key, clear=True) time.sleep(1) print("点击搜索") self.d.xpath('//*[@text="搜索"]').click() time.sleep(1) # 阶段 2:向调度系统上报“执行中”,然后开始按页扫描商品列表。 # 上报状态 # 进入主循环前先上报“执行中”,让调度系统能看到设备已经开始跑任务。 report_api(self.task_id, self.page, 2,finish_status=0) for idx in range(300): print(f'第{current_page}页') # self.wr_re("写", self.device_id, self.sort, current_page) if spider_no > 30: time.sleep(300) spider_no = 0 # 连续命中太多无关商品时,认为当前搜索结果已偏离目标,主动收尾。 if self.unrelated_data > 30: print(f'[{self.program_start_time}]----{self.search_key}----连续超过30个不达标的数据则停止采集') print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.swipe_down() time.sleep(self.get_sleep_time()) # 下滑后等待页面稳定 click_success = self.click_target_product_by_search_key(fuzzy_match=False) if not click_success: self.finish_task_abnormally( current_page, f"连续超过30个不达标的数据后,关键词「{self.search_key}」商品点击失败", finish_status=1 ) return print("点击搜索框") self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) if keyword_idx == search_key_length - 1: print("程序最后一个品规采集完毕,返回主屏幕") completed_normally = self.finish_task_normally( current_page, '连续超过30个不达标的数据,结束采集' ) else: completed_normally = True break # 达到采集上限后不再继续翻页,直接走正常结束分支。 if self.is_max_count_reached(): completed_normally = self.finish_task_with_max_count(current_page) # 向下滑 self.swipe_down() time.sleep(self.get_sleep_time()) # 点击搜索框 click_success = self.click_target_product_by_search_key(fuzzy_match=False) if not click_success: print(f"关键词「{self.search_key}」商品点击失败") return print("点击搜索框") self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) break # 售罄次数大于4基本就是号废了但是如果下次点击不会出现这种情况就要重置为0 # 连续多次命中售罄商品时,认为当前账号/结果页已失去采集价值,提前退出。 if self.sold_out_counts > 4: self.finish_task_abnormally( current_page, "====商品已售罄4次,结束采集(号不能用)", finish_status=1 ) print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") break # 阶段 3:获取当前页可见商品卡片,并逐个点击进入详情页采集。 drug_lis = self.get_drug_lis(idx) print('数量', len(drug_lis)) for idd, drug_one in enumerate(drug_lis): print(idd + 1, drug_one.info) time.sleep(self.get_sleep_time()) top = drug_one.info['bounds']['top'] bottom = drug_one.info['bounds']['bottom'] if bottom <= 1524 and top >= 258: drug_one.click() self.click_counts += 1 time.sleep(self.get_sleep_time()) # 先判断是否售罄次数是否大于4 if self.sold_out_counts >= 4: print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.finish_task_abnormally( current_page, "====这是在第一页有两个,商品已售罄4次,结束采集(号不能用)====", finish_status=1 ) time.sleep(self.get_sleep_time()) self.d.press('home') return # 这里先判断“商品已售罄”,是为了尽早放弃无效详情页; # 如果不先做这一步,后续详情采集会浪费时间且可能干扰账号状态判断。 if self.d.xpath('//*[contains(@text, "商品已售罄")]').wait(timeout=5): print("======商品已售罄======") self.sold_out_counts += 1 if self.back_to_list_page(): continue # 采集药品信息 # 进入详情页后的采集与回退是最容易卡死的阶段,需要单独兜底。 try: # 重置商品售罄次数 self.sold_out_counts = 0 self.integrate_data_v2() # 检测下是否回退到列表页 if self.back_to_list_page(): print('回退到列表页', True) else: print(f'[{self.app_current_time()}] 回退到列表页失败') print( f"[程序启动时间:{self.program_start_time}-----结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.finish_task_abnormally(current_page, "回退到列表页失败,结束采集") return time.sleep(self.get_sleep_time()) spider_no += 1 if self.is_max_count_reached(): completed_normally = self.finish_task_with_max_count(current_page) stop_by_max_count = True break except Exception as e: self.loggerPdd.error(f'采集药品详情数据出错:{e}') if not self.back_to_list_page(): self.finish_task_abnormally(current_page, '采集药品详情数据出错且无法回到列表页,结束采集') return else: continue # 阶段 4:处理翻页前的收尾条件,包括采集上限、结束页和列表到底。 if stop_by_max_count: break # 配置了结束页时,以调用方传入的页边界作为最高优先级退出条件。 if self.end_page is not None and current_page >= self.end_page: completed_normally = self.finish_task_normally( current_page, f"已采集到结束页 {self.end_page},结束任务" ) break if self.d(textStartsWith="抱歉,没有更多商品啦~").exists: completed_normally = self.finish_task_normally(current_page, '已经到达列表页最底部') break # 阶段 5:当前页还没触发任何结束条件时,继续滑到下一页。 print('开始滑入下一页') end_y = 300 self.d.swipe(200, 1400, 200, end_y, 0.4) time.sleep(self.get_sleep_time()) # 阶段 6:根据最终状态做统一收尾,保证任务一定会走到正常或异常结束分支之一。 if completed_normally: self.clear_progress_file() elif not self.finish_reported: self.finish_task_abnormally(current_page, "采集流程异常结束") return completed_normally # pdd def main(): # 功能:启动调度器入口,先立即执行一轮派单,再注册后续轮询。 logging.info(f"PDD 调度器启动,轮询间隔 {SCHEDULER_INTERVAL_SECONDS} 秒") dispatch_pending_tasks() schedule_dispatch(SCHEDULER_INTERVAL_SECONDS) scheduler_stop_event.wait() if __name__ == '__main__': main()