import requests import base64 import uiautomator2 as u2 import time import sys import subprocess import re import random import json from aip import AipOcr import numpy as np import cv2 import os from pdd_config import Config import logging from logger import setup_logger import pymysql from 拼多多盒数处理脚本.main import extract_box_number import datetime import threading setup_logger("pdd_spider") # 初始化日志 def get_mysql(): return pymysql.connect( host='120.24.49.2', # 修改后的主机 port=3306, # 添加端口号 user='drug_retrieve', # 修改后的用户名 password='ksCt3xm6chzdkafj', # 修改后的密码 db='drug_retrieve', # 修改后的数据库名 charset='utf8mb4' ) SCHEDULER_INTERVAL_SECONDS = 600 PLATFORM_PDD = 3 TASK_STATUS_PENDING = 1 DEVICE_STATUS_IDLE = 0 DEFAULT_MAX_COUNTS_LIMIT = 300 dispatch_lock = threading.Lock() running_task_ids = set() running_device_ids = set() worker_threads = {} scheduler_stop_event = threading.Event() scheduler_timer = None def parse_optional_int(value, default=None): if value in (None, ""): return default try: return int(value) except (TypeError, ValueError): return default def fetch_pending_tasks(): conn = None try: conn = get_mysql() with conn.cursor() as cursor: sql = """ SELECT * FROM retrieve_collect_task_allocate WHERE platform = %s AND status = %s ORDER BY id ASC """ cursor.execute(sql, (PLATFORM_PDD, TASK_STATUS_PENDING)) return cursor.fetchall() except Exception as e: logging.exception(f"读取待执行任务失败: {e}") return [] finally: if conn: conn.close() def fetch_idle_device_by_equipment_id(equipment_id): conn = None try: conn = get_mysql() with conn.cursor() as cursor: sql = """ SELECT * FROM retrieve_collect_equipment WHERE name LIKE %s AND id = %s AND status = %s LIMIT 1 """ cursor.execute(sql, ('%pdd%', equipment_id, DEVICE_STATUS_IDLE)) return cursor.fetchone() except Exception as e: logging.exception(f"读取空闲设备失败 equipment_id={equipment_id}: {e}") return None finally: if conn: conn.close() def build_task_payload(task_row, device_row): start_page = parse_optional_int(task_row[9] if len(task_row) > 9 else None, 0) end_page = parse_optional_int(task_row[10] if len(task_row) > 10 else None, None) max_counts_limit = parse_optional_int( task_row[11] if len(task_row) > 11 else None, DEFAULT_MAX_COUNTS_LIMIT ) return { "task_id": task_row[0], "equipment_id": task_row[2], "enterprise_id": task_row[3], "platform": task_row[4], "title_key": task_row[5], "spec_list": task_row[6], "brand": task_row[7], "search_key": f"{task_row[7]}{task_row[5]}", "save_search_key": f"{task_row[7]}{task_row[5]}", "start_page": start_page, "end_page": end_page, "max_counts_limit": max_counts_limit, "sort": "升序", "device_id": device_row[2], "task_row": task_row, } def fetch_runnable_task_payloads(): tasks = fetch_pending_tasks() if not tasks: logging.info("当前没有待执行任务") return [] payloads = [] reserved_equipment_ids = set() for task_row in tasks: task_id = task_row[0] equipment_id = task_row[2] with dispatch_lock: if task_id in running_task_ids: continue if equipment_id in reserved_equipment_ids: continue device_row = fetch_idle_device_by_equipment_id(equipment_id) if not device_row: logging.info(f"任务 {task_id} 对应设备 {equipment_id} 当前不空闲,跳过本轮") continue device_id = device_row[2] with dispatch_lock: if device_id in running_device_ids: logging.info(f"设备 {device_id} 已在本进程执行任务,跳过任务 {task_id}") continue running_task_ids.add(task_id) running_device_ids.add(device_id) reserved_equipment_ids.add(equipment_id) payloads.append(build_task_payload(task_row, device_row)) return payloads def cleanup_finished_workers(): dead_threads = [] with dispatch_lock: for device_id, thread in worker_threads.items(): if not thread.is_alive(): dead_threads.append(device_id) for device_id in dead_threads: worker_threads.pop(device_id, None) def run_task_worker(task_payload): task_id = task_payload["task_id"] device_id = task_payload["device_id"] pdd = None try: logging.info(f"[任务 {task_id}] 开始执行,设备: {device_id}") print(task_payload) pdd = PDD( task_payload["search_key"], device_id, title_key=task_payload.get("title_key"), spec_list=task_payload.get("spec_list"), brand=task_payload.get("brand", ""), save_search_key=task_payload.get("save_search_key"), start_page=task_payload.get("start_page"), end_page=task_payload.get("end_page"), max_counts_limit=task_payload.get("max_counts_limit"), direct_shop_lookup=task_payload.get("direct_shop_lookup", False), sort=task_payload.get("sort"), platform=task_payload.get("platform"), task_id=task_payload.get("task_id"), enterprise_id=task_payload.get("enterprise_id"), ) completed_normally = pdd.main(device_id, 1, 0) if completed_normally: logging.info(f"[任务 {task_id}] 执行完成,设备: {device_id}") else: logging.info(f"[任务 {task_id}] 已结束,设备: {device_id}") except Exception as e: end_page = task_payload.get("start_page") if pdd is not None: end_page = getattr(pdd, "page", end_page) pdd.finish_task_abnormally(end_page, f"任务执行异常: {e}") else: report_api(task_id, end_page=end_page, start=4, end_time=int(time.time()),finish_status=0) logging.exception(f"[任务 {task_id}] 执行异常,设备: {device_id},错误: {e}") finally: with dispatch_lock: running_task_ids.discard(task_id) running_device_ids.discard(device_id) worker_threads.pop(device_id, None) def dispatch_pending_tasks(): cleanup_finished_workers() task_payloads = fetch_runnable_task_payloads() if not task_payloads: return for task_payload in task_payloads: device_id = task_payload["device_id"] try: thread = threading.Thread( target=run_task_worker, args=(task_payload,), daemon=True, name=f"pdd-{device_id}", ) with dispatch_lock: worker_threads[device_id] = thread thread.start() logging.info(f"[任务 {task_payload['task_id']}] 已分发到设备 {device_id}") except Exception: with dispatch_lock: running_task_ids.discard(task_payload["task_id"]) running_device_ids.discard(device_id) worker_threads.pop(device_id, None) raise def schedule_dispatch(delay_seconds=SCHEDULER_INTERVAL_SECONDS): global scheduler_timer if scheduler_stop_event.is_set(): return scheduler_timer = threading.Timer(delay_seconds, scheduled_dispatch_job) scheduler_timer.daemon = False scheduler_timer.name = "pdd-scheduler" scheduler_timer.start() def scheduled_dispatch_job(): try: dispatch_pending_tasks() except Exception as e: logging.exception(f"PDD 定时调度异常: {e}") finally: schedule_dispatch(SCHEDULER_INTERVAL_SECONDS) def report_api(task_id,page=None,start=None,end_page=None,end_time=None,finish_status=None): params = { "collect_task_allocate_id": task_id, "statr_page":page if page is not None else '', "end_page": end_page if end_page is not None else '', "status": start, "finish_status": finish_status if finish_status is not None else 0, "start_time": int(time.time()), "end_time": end_time if end_time is not None else '', } print(params) url = "http://schedule.dfwy.tech/api/collect_equipment_execute/result_report" res = requests.get(url, params=params, timeout=20) print(res.text) # 获取滑块验证中滑块需要移动的距离 def slide_verify(img_path): with open(img_path, 'rb') as f: b = base64.b64encode(f.read()).decode() ## 图片二进制流base64字符串 url = "http://api.jfbym.com/api/YmServer/customApi" data = { ## 关于参数,一般来说有3个;不同类型id可能有不同的参数个数和参数名,找客服获取 "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data).json() print(response) if response.get("msg") == "识别成功": # 获取 data 中的 data 字段 result = response.get("data", {}).get("data") if result: print(result) # 输出结果 else: print("无法获取数据") else: print("识别未成功") return result class PDD: def __init__( self, search_key, device_id, title_key=None, spec_list=None, brand="", save_search_key=None, start_page=0, end_page=None, max_counts_limit=None, direct_shop_lookup=False, sort=None, platform = None, task_id = None, enterprise_id=None, ): self.package_name = 'com.xunmeng.pinduoduo' self.APP_ID = '116857964' self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh' self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3' self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY) self.table_name = "retrieve_scrape_data" # "pdd_drug" self.shop_table_name = "retrieve_scrape_data" # "pdd_shop_info" self.loggerPdd = logging.getLogger() self.clipboard = "" # 初始化剪切板的内容为空 self.enterprise_id = enterprise_id self.task_id = task_id self.platform = platform self.sort = sort self.sort_key = 0 self.search_key = search_key # 参苓健脾胃颗粒 香砂平胃颗粒 舒肝颗粒 清肺化痰丸 self.title_key = title_key if title_key is not None else search_key self.spec_list = self._normalize_rule_list(spec_list) self.brand = brand self.save_search_key = save_search_key or search_key self.start_page = max(parse_optional_int(start_page, 0), 0) self.end_page = parse_optional_int(end_page, None) self.max_counts_limit = max_counts_limit self.direct_shop_lookup = direct_shop_lookup self.unrelated_data = 0 # 无关数据数量 self.device_id = device_id self.page = self.start_page if self.end_page is not None and self.end_page < self.start_page: self.end_page = self.start_page # 统计售罄数量 self.sold_out_counts = 0 # 程序启动时间 self.program_start_time = self.app_start_time() # 统计商品数量 # 最大量数据阈值 self.max_counts = 0 # 统计点击商品的次数 self.click_counts = 0 # 商品在列表的位置 self.search_key_loc = 0 self.finish_reported = False # oss配置 self.oss_config = { "access_key_id": Config.access_key_id, "access_key_secret": Config.access_key_secret, "endpoint": Config.endpoint, # 例: oss-cn-beijing.aliyuncs.com "bucket_name": Config.bucket_name, "oss_prefix": Config.oss_prefix # OSS中存放截图的前缀(虚拟文件夹) } # 异常处理 def wr_re(self, mod, device_id, sort=None, page=None): file_path = f'./ycwj/{device_id}_{self.title_key}.txt' if mod == "写": try: data = { "page": page if page else "", "sort": sort if sort else "", } os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"进度保存成功:{sort},{page}页") except Exception as e: print("保存进度失败") elif mod == "读": try: if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) print(self.sort) if self.sort and self.sort_key == 0: self.li_or_lo(self.sort) if data['page'] != '': progress_page = int(data['page']) self.page = max(progress_page, self.start_page) self.scroll_to_target_page(self.page) else: return None return data except Exception as e: print(f"读取进度失败", e) return None elif mod == "删": try: if os.path.exists(file_path): os.remove(file_path) print(f"进度文件已删除:{file_path}") except Exception as e: print(f"删除进度文件失败:{e}") return None def clear_progress_file(self): # self.wr_re("删", self.device_id, self.sort) pass def is_max_count_reached(self): return bool(self.max_counts_limit and self.max_counts >= self.max_counts_limit) def scroll_to_target_page(self, target_page): target_page = int(target_page or 0) if target_page <= 0: return for _ in range(target_page): end_y = 300 self.d.swipe(200, 1400, 200, end_y, 0.4) time.sleep(self.get_sleep_time()) def finish_task_normally(self, end_page, reason): if not self.finish_reported: report_api(self.task_id, end_page=end_page, start=3, end_time=int(time.time()),finish_status=1) self.finish_reported = True print(reason) return True def finish_task_abnormally(self, end_page, reason, finish_status=0): if not self.finish_reported: report_api( self.task_id, end_page=end_page, start=4, end_time=int(time.time()), finish_status=finish_status ) self.finish_reported = True print(reason) return False def finish_task_with_max_count(self, end_page): return self.finish_task_normally( end_page, f"达到最大采集数量 {self.max_counts_limit},当前已采集 {self.max_counts} 条,停止任务" ) # 排序 def li_or_lo(self, key): if key == "升序": self.sort_key += 1 self.d.xpath('//*[@text="价格"]').click() n = self.d.xpath('//*[@text="总价低到高"]') if n.exists: n.click() time.sleep(self.get_sleep_time()) if key == "降序": self.sort_key += 1 self.d.xpath('//*[@text="价格"]').click() n = self.d.xpath('//*[@text="单粒价格低到高"]') if n: n.click() else: self.d.xpath('//*[@text="价格"]').click() # 返回列表页 def back_to_list_page(self): for i in range(10): if self.distinct_target(): return True print(f'第{i}次尝试退回到列表页') self.swipe_back(1) time.sleep(1) print('页面出错,没有退回到列表页') return False def get_drug_lis(self, idx): if idx == 0: drug_lis = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all() else: for i in range(1, 6): drug_lis = self.d.xpath( f'/hierarchy/android.widget.FrameLayout[{i}]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all() if drug_lis: break return drug_lis # 代码运行那时候的时间 def app_current_time(self): return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def slide_link(self): value_tag = None if self.d.xpath('//*[@text="微信"]').exists: value_tag = self.d.xpath('//*[@text="微信"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return if self.d.xpath('//*[@text="朋友圈"]').exists: value_tag = self.d.xpath('//*[@text="朋友圈"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return if self.d.xpath('//*[@text="QQ好友"]').exists: value_tag = self.d.xpath('//*[@text="QQ好友"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return def app_start_time(self): """ 获取app启动时间 :return: """ return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def stop_app(self): self.d.app_stop(self.package_name) time.sleep(5) def start_app(self): self.d.app_start(self.package_name) time.sleep(5) def restart_app(self): """ 重启app :return: """ self.stop_app() self.start_app() @staticmethod def get_sleep_time(): return random.randint(1, 2) # return random.randint(5, 8) @staticmethod def get_current_date(): return datetime.datetime.now().strftime('%Y/%m/%d') @staticmethod def _normalize_rule_list(value): if value is None: return [] if isinstance(value, (list, tuple, set)): raw_values = value else: raw_values = [value] result = [] for item in raw_values: item_str = str(item).strip() if item_str: result.append(item_str) return result @staticmethod def _normalize_match_text(value): return re.sub(r'\s+', '', str(value or '')).lower() def _match_any_keyword(self, text, keywords): keyword_list = self._normalize_rule_list(keywords) if not keyword_list: return True normalized_text = self._normalize_match_text(text) return any(self._normalize_match_text(keyword) in normalized_text for keyword in keyword_list) def is_link_spec_useful(self, product_title, specifications=''): if not self.spec_list: return True title_text = self._normalize_match_text(product_title) spec_text = self._normalize_match_text(specifications) for spec in self.spec_list: normalized_spec = self._normalize_match_text(spec) if normalized_spec in title_text or normalized_spec in spec_text: return True return False def is_link_useful(self, product_title, specifications=''): if not self._match_any_keyword(product_title, self.title_key): print(f"当前商品名称:{product_title} 不包含{self.title_key}关键字") return False if not self._match_any_keyword(product_title, self.brand): print(f"当前商品名称:{product_title} 不包含{self.brand}品牌") return False if not self.is_link_spec_useful(product_title, specifications): print(f"当前商品名称:{product_title} 不包含{self.spec_list}品规") return False return True def remove_watermark(self, img_path): """ 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据 :param img_path: 图片路径 :return: 二进制图片数据 """ img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) endswith = os.path.splitext(img_path)[1] new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8) _, img_binary = cv2.imencode(endswith, new) return img_binary def get_shop_name(self): """ 获取店铺名 :return: """ try: xpath = '//*[@text="进店"]/preceding-sibling::android.view.ViewGroup/android.widget.LinearLayout/android.widget.TextView' if self.d.xpath(xpath).exists: shop_name = self.d.xpath(xpath).text self.loggerPdd.info(f'1-获取到店铺名:{shop_name}') else: # 进入店铺新页面 shop_btn_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]' if self.d.xpath(shop_btn_xpath).exists: self.d.xpath(shop_btn_xpath).click() time.sleep(1) # self.d.xpath('//*[@text="店铺"]').click() xpath_shop_name = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.TextView[1]' if self.d.xpath(xpath_shop_name).exists: shop_name = self.d.xpath(xpath_shop_name).text self.loggerPdd.info(f'2-获取到店铺名:{shop_name}') else: shop_name = '' self.loggerPdd.info(f'3-获取到店铺名:{shop_name}') self.swipe_back(1) # else: shop_name = '' self.loggerPdd.info('4-因为shop_btn_xpath不存在,获取到店铺名为空') # time.sleep(10000) return shop_name except Exception as e: print(f'获取店铺名出错:{e}') self.loggerPdd.error(f'获取店铺名出错:{e}') return None def save_to_database(self, data): print(f'保存数据到数据库:{data}') max_retries = 5 for attempt in range(max_retries): conn = None try: conn = get_mysql() with conn.cursor() as cur: add_sql = """ INSERT INTO retrieve_scrape_data ( enterprise_id, platform_id, platform_item_id, province_id, city_id, province_name, city_name, area_info, product_name, product_specs, one_box_price, manufacture_date, expiry_date, manufacturer, approval_number, is_sold_out, online_posting_count, continuous_listing_count, link_url, store_name, store_url, shipment_province_id, shipment_province_name, shipment_city_id, shipment_city_name, company_name, qualification_number, scrape_date, min_price, number, sales, inventory, snapshot_url ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ cur.execute(add_sql, ( data['enterprise_id'], data['platform_id'], data['platform_item_id'], data['province_id'], data['city_id'], data['province_name'], data['city_name'], data['area_info'], data['product_name'], data['product_specs'], data['one_box_price'], data['manufacture_date'], data['expiry_date'], data['manufacturer'], data['approval_number'], data['is_sold_out'], data['online_posting_count'], data['continuous_listing_count'], data['link_url'], data['store_name'], data['store_url'], data['shipment_province_id'], data['shipment_province_name'], data['shipment_city_id'], data['shipment_city_name'], data['company_name'], data['qualification_number'], data['scrape_date'], data['min_price'], data['number'], data['sales'], data['inventory'], data['snapshot_url'], )) conn.commit() self.max_counts += 1 print(f"存入数据库成功,当前已采集 {self.max_counts} 条") return True except Exception as e: print(f'保存数据库异常 (尝试 {attempt + 1}/{max_retries}): {e}') if conn: conn.rollback() conn.close() if attempt == max_retries - 1: print("达到最大重试次数,保存失败") return False time.sleep(2) def click_target_product_by_search_key(self, fuzzy_match=False, timeout=10): """ 动态匹配self.search_key对应的商品并点击 :param fuzzy_match: 是否模糊匹配(应对商品名带额外后缀/前缀的情况) 不模糊匹配 :param timeout: 等待元素出现的超时时间(秒) :return: 点击是否成功(bool) """ try: # 1. 定义定位条件(动态使用self.search_key) if fuzzy_match: # 模糊匹配:包含search_key即可(推荐,适配搜索结果商品名略有差异) locator = self.d(textContains=self.search_key) print(f"🔍 模糊匹配商品:包含「{self.search_key}」的元素") else: # 精确匹配:商品名与search_key完全一致 locator = self.d(text=self.search_key) print(f"🔍 精确匹配商品:「{self.search_key}」") # 2. 等待元素出现(核心:避免元素未加载就点击) if locator.wait(timeout=timeout): print(f"✅ 找到匹配的商品,准备点击") # 执行点击(优先点击可点击的元素) locator.click() print(f"✅ 成功点击「{self.search_key}」对应的商品") # 点击后等待页面加载 time.sleep(self.get_sleep_time()) return True else: print(f"❌ 滑动后仍未找到「{self.search_key}」对应的商品") return False except Exception as e: print(f"❌ 点击「{self.search_key}」对应商品时异常:{e}") return False def swipe_down(self): """ 下滑(模拟真人操作,抗风控+设备适配+容错) 核心:起点在屏幕上方,终点在屏幕下方(和上滑相反) :return: None """ try: # 1. 获取屏幕尺寸(兼容不同设备,给默认值避免获取失败) screen_width = self.d.info.get('displayWidth', 1080) # 默认1080px宽度 screen_height = self.d.info.get('displayHeight', 2400) # 默认2400px高度 # 2. 随机滑动时长(0.1~0.3秒,避免固定值被风控,且不设0秒) duration_rate = random.uniform(0.1, 0.3) # 3. 计算滑动坐标(用屏幕比例,适配所有设备) start_x = screen_width // 2 # 水平居中(和上滑一致,符合真人操作习惯) start_y = screen_height * 0.2 # 起点:屏幕20%高度(上方偏下) end_y = screen_height * 0.8 # 终点:屏幕80%高度(下方偏上) # 强制确保起点y < 终点y(必为向下滑,避免逻辑错误) start_y, end_y = min(start_y, end_y - 10), max(end_y, start_y + 10) # 4. 核心向下滑动操作 self.d.swipe(start_x, start_y, start_x, end_y, duration=duration_rate) # 滑动后全局等待(确保页面加载,避免元素定位失败) time.sleep(self.get_sleep_time()) except Exception as e: # 异常捕获:避免设备断开/滑动失败导致程序崩溃 print(f"向下滑动失败:{e}") # 兜底方案:用固定坐标重试(适配主流1080x2400设备) self.d.swipe(540, 480, 540, 1920, duration=0.2) time.sleep(self.get_sleep_time()) def swipe_up(self): """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) def swipe_back(self, no): """ 返回 :param no: 回退次数 :return: """ if not self.distinct_target(): for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) def drug_price(self): """ 获取药品价格 :return: """ try: xpath = '//*[@text="¥"]/following-sibling::android.widget.TextView[1]' price_str = self.d.xpath(xpath).text price = float(re.search(r'[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return float(price) except Exception as e: print(f'提取价格出错-->{e}') return None def drug_price_ex(self): price_str = '' # 价格初始化 ext = '' # 初始化已选择的信息 price = '' # 这是点击进入品规的按钮 button_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]' button_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]' # 调试 # test_button = self.d.xpath(button_xpath_1).exists # print(test_button) # test_button_2 = self.d.xpath(button_xpath_2).exists # print(test_button_2) # time.sleep(1000) # if self.d.xpath('//*[@text="发起拼单"]').exists: # self.d.xpath('//*[@text="发起拼单"]').click() # elif self.d.xpath('//*[@text="去复诊开药"]').exists: # self.d.xpath('//*[@text="去复诊开药"]').click() if self.d.xpath(button_xpath_1).exists: self.d.xpath(button_xpath_1).click() elif self.d.xpath(button_xpath_2).exists: self.d.xpath(button_xpath_2).click() else: print("button1 and button_2 all not exist") return price, ext select_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]' select_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]' select_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()]' select_xpath_3_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()-1]' price_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]' price_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]' price_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]//android.widget.TextView[1]' if self.d.xpath(select_xpath_1).exists: text1 = self.d.xpath(select_xpath_1).text print(f"select_xpath_1--text1={text1}") if '已选' in text1: if self.d.xpath(price_xpath_1).exists: price_str = self.d.xpath(price_xpath_1).text print(f"select_xpath_1--price_str-1={price_str}") else: print("select_xpath_1--price_xpath_1-1 not exist") ext = text1 elif '请选择' in text1: # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' scroll_xpath_2 = '' if self.d.xpath(scroll_xpath_1).exists: self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格会刷新 if self.d.xpath(select_xpath_1).exists: text2 = self.d.xpath(select_xpath_1).text if '已选' in text2: print(f"select_xpath_1--已选择2:text2={text2}") if self.d.xpath(price_xpath_1).exists: price_str = self.d.xpath(price_xpath_1).text print(f"select_xpath_1--price_str-2={price_str}") else: print("select_xpath_1--price_xpath_1-2 not exist") ext = text2 else: print("select_xpath_1--scroll_xpath_1 not exist") elif self.d.xpath(select_xpath_2).exists: text1 = self.d.xpath(select_xpath_2).text print(f"xpath2--text1={text1}") if '已选' in text1: ext = text1 if self.d.xpath(price_xpath_2).exists: price_str = self.d.xpath(price_xpath_2).text print(f"select_xpath_2--price_str-2={price_str}") else: print("select_xpath_2--price_xpath_2-1 not exist") elif '请选择' in text1: print('come in here') # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' if self.d.xpath(scroll_xpath_1).exists: print("scroll_xpath_1 exists") self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_2).exists: text2 = self.d.xpath(select_xpath_2).text if '已选' in text2: ext = text2 print(f"select_xpath_2--已选择2:text2={text2}") if self.d.xpath(price_xpath_2).exists: price_str = self.d.xpath(price_xpath_2).text print(f"select_xpath_2--price_str-2={price_str}") else: print("select_xpath_2--price_xpath_2-2 not exist") else: print("scroll_xpath_1 not exists") else: print("not exist 请选择 or 已选") elif self.d.xpath(select_xpath_3).exists: text1 = self.d.xpath(select_xpath_3).text print(f"xpath3--text1-1={text1}") if ('请选择' not in text1) and ('已选' not in text1): text1 = self.d.xpath(select_xpath_3_2).text print(f"xpath3--text1-2={text1}") if '已选' in text1: ext = text1 if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-3-1={price_str}") else: print("select_xpath_3--price_xpath_3-3-1 not exist") elif '请选择' in text1: print('come in here') # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' recycler_view_xpath = '//*[@resource-id="android:id/content"]//android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' if self.d.xpath(scroll_xpath_1).exists: print("scroll_xpath_1 exists") self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_3).exists: text2 = self.d.xpath(select_xpath_3).text if '已选' in text2: ext = text2 print(f"select_xpath_3--已选择2:text2={text2}") if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-2={price_str}") else: print("select_xpath_3--price_xpath_3-3-2 not exist") elif self.d.xpath(recycler_view_xpath).exists: self.d.xpath(recycler_view_xpath).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_3).exists: text2 = self.d.xpath(select_xpath_3).text if '已选' in text2: ext = text2 print(f"select_xpath_3--已选择2:text2={text2}") if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-3={price_str}") else: print("select_xpath_3--price_xpath_3-3-3 not exist") else: print("scroll_xpath_1 not exists") else: print(f"xpath3--text1-不包含请选择和已选择") else: print("select_xpath_1 and select_xpath_2 and select_xpath_3 all not exist") if price_str: # price = float(re.search('[\d\.]+', price_str).group()) match = re.search(r'¥([\d\.]+)', price_str) if match: price = float(match.group(1)) else: price = '' # price = float(re.search(r'¥([\d\.]+)', price_str).group(1)) print(f'获取到价格:{price}') print(f"ext={ext}") self.swipe_back(1) # return price, ext def restart_uiautomator_services(self, device_id): """ 重启atx的uiautomator 服务 :param device_id: :return: """ stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop' start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d' subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) def connect_devices(self, device_id): """ 连接设备 :return: """ try: self.d = u2.connect_usb(device_id) # 设置隐形等待时间 # self.d.implicitly_wait(5) self.restart_uiautomator_services(device_id) print(f'[{self.program_start_time}]连接到设备:{device_id}') except Exception as e: print(f'{device_id} 连接错误: {e}') raise Exception(e) def get_ocr_res(self, img): try: image = self.remove_watermark(img) res_image = self.client.basicGeneral(image) data = res_image.get('words_result', '') print(f'百度api返回结果:{data}') return data except: return None def get_title(self): try: print('开始提取标题') time.sleep(self.get_sleep_time()) title_xpath = '//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]' if self.d.xpath(title_xpath).exists: title = self.d.xpath(title_xpath).info['contentDescription'].strip() else: return None # title = self.d.xpath('//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]').info['contentDescription'].strip() print(f'提取到标题:{title}') return title except Exception as e: print(f'获取标题出错:{e}') return None # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) # match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) # if match: # drugs_name = match.group(1).strip() + match.group(2).strip() # specifications = match.group(3).strip() # print("药品名:", drugs_name) # print("规格:", specifications) # print('完整药名:', drugs_name + specifications) # return drugs_name, specifications # else: # print("没有匹配到预期格式") def enter_shop(self): """ 进店,方便提取资质环境 :return: """ # self.d.xpath('//*[@text="进店"]').click() self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) def data_is_exists(self, data): # 1. 验证必要字段 required_keys = ['min_price', 'shop', 'scrape_date', 'platform'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] print(f"缺少必要字段: {', '.join(missing)}") return None conn = None try: conn = get_mysql() with conn.cursor() as cur: query_sql = """ SELECT * FROM {} WHERE min_price = %s AND store_name = %s AND scrape_date = %s AND platform_id = %s """.format(self.table_name) cur.execute(query_sql, ( data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") finally: if conn: conn.close() def get_instructions_data(self): """ 确定有详情页之后之后,提取所有的详情页数据 :return: """ for i in range(8): if self.d.xpath('//*[@text="品牌"]').exists or self.d.xpath('//*[@text="药品通用名"]').exists: self.d.swipe_ext("up", scale=0.1) print('开始采集详情数据') break self.d.swipe_ext("up", scale=0.5) time.sleep(self.get_sleep_time()) # 点击查看全部 if self.d.xpath('//*[@text="品牌"]').exists: self.d.xpath('//*[@text="品牌"]').click() else: self.d.xpath('//*[@text="药品通用名"]').click() time.sleep(self.get_sleep_time()) attr = dict() # # 获取详情页信息 xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView' ddd = self.d.xpath(xpath).all() for i in range(0, len(ddd), 2): group = ddd[i:i + 2] attr[group[0].text] = group[1].text # 截图获取未获取到的数据 # if not all(i in ['有效期', '生产企业', '批准文号', '药品规格', '产品规格'] for i in attr.keys()): if not all(i in ['有效期', '生产企业', '批准文号', '药品规格'] for i in attr.keys()): self.d.swipe_ext("up", 0.4) time.sleep(self.get_sleep_time()) xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView' ddd = self.d.xpath(xpath).all() for i in range(0, len(ddd), 2): group = ddd[i:i + 2] attr[group[0].text] = group[1].text print(f'当前说明书规格参数:{attr}') res_data = { # "有效期": attr.get('有效期',''), # "生产单位": attr['生产企业'], # "批准文号": attr['批准文号'], # "产品规格": attr.get('药品规格') if attr.get('药品规格', '') else attr.get('药品规格') "有效期": attr.get('有效期', ''), "生产单位": attr.get('生产企业', ''), "批准文号": attr.get('批准文号', ''), "产品规格": attr.get('药品规格', '') } print(f'当前规格参数字典数据:{res_data}') return res_data def has_instructions(self): """ 是否有详情页 :return:如果有详情页返回True,否则返回False """ # 没有说明书的无法采集具体数据 max_attempts = 12 # 最大尝试次数 attempt = 0 # 当前尝试次数 while attempt < max_attempts: time.sleep(0.5) xpath = '//*[@text="商品详情"]' is_has_instructions = self.d.xpath(xpath).exists if is_has_instructions: return True # 如果找到“商品详情”,则返回True self.d.swipe_ext("up", 0.3) attempt += 1 return False # 如果尝试次数达到最大次数,则返回False def distinct_target(self): result = False is_position = self.d.xpath('//*[@content-desc="拍照搜索"]').exists is_position2 = self.d.xpath('//*[@text="年货节大促"]').exists is_position3 = self.d.xpath('//*[@text="筛选"]').exists is_position4 = self.d.xpath('//*[@text="回头客常拼"]').exists list_page_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]//android.support.v7.widget.RecyclerView[1]' is_position_new = self.d.xpath(list_page_xpath).exists print(f'is_position_new={is_position_new}') if is_position or is_position2 or is_position3 or is_position4 or is_position_new: result = True return result def enter_target_page(self): self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]').click() time.sleep(self.get_sleep_time()) self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) self.d.send_keys(self.search_key, clear=True) time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="搜索"]').click() time.sleep(self.get_sleep_time()) if self.sort and self.sort_key == 0: self.li_or_lo(self.sort) # progress = self.wr_re("读", self.device_id) progress = None if not progress and self.page > 0: self.scroll_to_target_page(self.page) def get_clipboard(self): self.loggerPdd.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息 clipboard_content = self.d.clipboard if clipboard_content is None: return '' return clipboard_content.strip() def get_product_link(self): product_link = '' print('开始获取商品链接') content_frame = self.d.xpath('//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]').exists print(content_frame) relative_layout = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]').exists print(relative_layout) relative_layout2 = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]').exists print(relative_layout2) Frame_Layout = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]').exists print(Frame_Layout) ImageView = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]').exists print(ImageView) ImageView2 = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[3]/android.view.View[1]').exists print(ImageView2) # 多种可能的“分享”按钮 dots_xpaths = [ # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]', '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[last()]/android.view.View[1]', # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]', # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[3]/android.widget.ImageView[1]', ] max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): if product_link: # 已经拿到则退出 break for xp in dots_xpaths: if self.d.xpath(xp).exists: # print(f'{idx}-进入分享点点点') self.loggerPdd.info(f'{idx}-进入分享点点点') self.d.xpath(xp).click() time.sleep(1) self.loggerPdd.info('开始滑动') self.slide_link() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) product_link = self.get_clipboard() time.sleep(0.5) self.loggerPdd.info(f'{idx}-商品链接:{product_link}') break # 找到并执行后跳出内层循环 if not product_link and idx < max_retry: time.sleep(0.5) # 最后一次不需要再等待 # time.sleep(100000) return product_link def integrate_data_v2(self): """ 基于入口配置统一校验标题、品牌和品规,替代内部大量硬编码分支。 """ min_price, ext = self.drug_price_ex() title_info = self.get_title() if not title_info: print('标题获取为空') self.swipe_back(1) return if not self.is_link_useful(title_info): self.swipe_back(1) self.unrelated_data += 1 return if not min_price: min_price = self.drug_price() if not min_price: print('提取价格出错,回退到列表页') self.swipe_back(1) self.unrelated_data += 1 return product_link = self.get_product_link() time.sleep(2) if self.direct_shop_lookup: shop = self.get_shop_name() else: for _ in range(15): if self.d(textStartsWith="进店").exists: print('开始获取店铺名') break self.d.swipe_ext("up", scale=0.3) time.sleep(self.get_sleep_time()) if self.d(textStartsWith="进店").exists: print('可以开始获取店铺名') shop = self.get_shop_name() if not shop: print('当前店铺名称为空') self.swipe_back(1) self.unrelated_data += 1 return scrape_date = self.get_current_date() dup_data = { 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '3' } if self.data_is_exists(dup_data): print('存在相同数据不入库') self.back_to_list_page() return is_has_instructions = self.has_instructions() self.loggerPdd.info(f'是否有说明书:{is_has_instructions}') manufacture_date = '' credit_code = ext if is_has_instructions: try: instructions_info = self.get_instructions_data() expiry_date = instructions_info['有效期'].strip('。') manufacturer = instructions_info['生产单位'].strip('。') approval_number = instructions_info['批准文号'].strip('。') specifications = instructions_info['产品规格'].strip('。') except Exception as e: print(f'获取详情页规格参数出错:{e}') self.swipe_back(2) return else: expiry_date = '' manufacturer = '' approval_number = '' specifications = '' if not self.is_link_useful(title_info, specifications): self.swipe_back(1) self.unrelated_data += 1 return self.unrelated_data = 0 if extract_box_number(credit_code): one_box_price = min_price / extract_box_number(credit_code) else: print("单瓶药品价格没处理成功") one_box_price = 0 save_data = { 'enterprise_id': self.enterprise_id, 'platform_id': self.platform, 'platform_item_id': '', 'province_id': 0, 'city_id': 0, 'province_name': '', 'city_name': '', 'area_info': "", 'product_name': title_info, 'product_specs': specifications, 'one_box_price': one_box_price, 'manufacture_date': manufacture_date, 'expiry_date': expiry_date, 'manufacturer': manufacturer, 'approval_number': approval_number, 'is_sold_out': 0, 'online_posting_count': 1, 'continuous_listing_count': 1, 'link_url': product_link, 'store_name': shop, 'store_url': '', 'shipment_province_id': 0, 'shipment_province_name': "", 'shipment_city_id': 0, 'shipment_city_name': "", 'company_name': "", 'qualification_number': "", 'scrape_date': scrape_date, 'min_price': min_price, 'number': 0, 'sales': "", 'inventory': "", 'snapshot_url': "", 'insert_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } self.save_to_database(save_data) def main(self, device_id, search_key_length, keyword_idx): completed_normally = False stop_by_max_count = False spider_no = 0 current_page = self.page self.connect_devices(device_id) time.sleep(self.get_sleep_time()) if keyword_idx == 0: print("搜索前,先重启APP") self.restart_app() # 搜索关键字 self.enter_target_page() else: print("清空前面的文字,再输入关键词") self.d.send_keys(self.search_key, clear=True) time.sleep(1) print("点击搜索") self.d.xpath('//*[@text="搜索"]').click() time.sleep(1) # 上报状态 report_api(self.task_id, self.page, 2,finish_status=0) for idx in range(300): print(f'第{current_page}页') # self.wr_re("写", self.device_id, self.sort, current_page) if spider_no > 30: time.sleep(300) spider_no = 0 if self.unrelated_data > 30: print(f'[{self.program_start_time}]----{self.search_key}----连续超过30个不达标的数据则停止采集') print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.swipe_down() time.sleep(self.get_sleep_time()) # 下滑后等待页面稳定 click_success = self.click_target_product_by_search_key(fuzzy_match=False) if not click_success: self.finish_task_abnormally( current_page, f"连续超过30个不达标的数据后,关键词「{self.search_key}」商品点击失败", finish_status=1 ) return print("点击搜索框") self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) if keyword_idx == search_key_length - 1: print("程序最后一个品规采集完毕,返回主屏幕") completed_normally = self.finish_task_normally( current_page, '连续超过30个不达标的数据,结束采集' ) else: completed_normally = True break if self.is_max_count_reached(): completed_normally = self.finish_task_with_max_count(current_page) # 向下滑 self.swipe_down() time.sleep(self.get_sleep_time()) # 点击搜索框 click_success = self.click_target_product_by_search_key(fuzzy_match=False) if not click_success: print(f"关键词「{self.search_key}」商品点击失败") return print("点击搜索框") self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) break # 售罄次数大于4基本就是号废了但是如果下次点击不会出现这种情况就要重置为0 if self.sold_out_counts > 4: self.finish_task_abnormally( current_page, "====商品已售罄4次,结束采集(号不能用)", finish_status=1 ) print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") break drug_lis = self.get_drug_lis(idx) print('数量', len(drug_lis)) for idd, drug_one in enumerate(drug_lis): print(idd + 1, drug_one.info) time.sleep(self.get_sleep_time()) top = drug_one.info['bounds']['top'] bottom = drug_one.info['bounds']['bottom'] if bottom <= 1524 and top >= 258: drug_one.click() self.click_counts += 1 time.sleep(self.get_sleep_time()) # 先判断是否售罄次数是否大于4 if self.sold_out_counts >= 4: print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.finish_task_abnormally( current_page, "====这是在第一页有两个,商品已售罄4次,结束采集(号不能用)====", finish_status=1 ) time.sleep(self.get_sleep_time()) self.d.press('home') return if self.d.xpath('//*[contains(@text, "商品已售罄")]').wait(timeout=5): print("======商品已售罄======") self.sold_out_counts += 1 if self.back_to_list_page(): continue # 采集药品信息 try: # 重置商品售罄次数 self.sold_out_counts = 0 self.integrate_data_v2() # 检测下是否回退到列表页 if self.back_to_list_page(): print('回退到列表页', True) else: print(f'[{self.app_current_time()}] 回退到列表页失败') print( f"[程序启动时间:{self.program_start_time}-----结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.finish_task_abnormally(current_page, "回退到列表页失败,结束采集") return time.sleep(self.get_sleep_time()) spider_no += 1 if self.is_max_count_reached(): completed_normally = self.finish_task_with_max_count(current_page) stop_by_max_count = True break except Exception as e: self.loggerPdd.error(f'采集药品详情数据出错:{e}') if not self.back_to_list_page(): self.finish_task_abnormally(current_page, '采集药品详情数据出错且无法回到列表页,结束采集') return else: continue if stop_by_max_count: break if self.end_page is not None and current_page >= self.end_page: completed_normally = self.finish_task_normally( current_page, f"已采集到结束页 {self.end_page},结束任务" ) break if self.d(textStartsWith="抱歉,没有更多商品啦~").exists: completed_normally = self.finish_task_normally(current_page, '已经到达列表页最底部') break print('开始滑入下一页') end_y = 300 self.d.swipe(200, 1400, 200, end_y, 0.4) time.sleep(self.get_sleep_time()) if completed_normally: self.clear_progress_file() elif not self.finish_reported: self.finish_task_abnormally(current_page, "采集流程异常结束") return completed_normally # pdd def main(): logging.info(f"PDD 调度器启动,轮询间隔 {SCHEDULER_INTERVAL_SECONDS} 秒") dispatch_pending_tasks() schedule_dispatch(SCHEDULER_INTERVAL_SECONDS) scheduler_stop_event.wait() if __name__ == '__main__': main()