import requests import base64 import uiautomator2 as u2 import time import sys import subprocess import re import random import json from aip import AipOcr import numpy as np import cv2 import os from pdd_config import Config import logging from logger import setup_logger import pymysql from 拼多多盒数处理脚本.main import extract_box_number import datetime setup_logger("pdd_spider") # 初始化日志 def get_mysql(): return pymysql.connect( host='39.108.116.125', # "localhost", # 修改后的主机 port=3306, # 3306, # 添加端口号 user='drug_retrieve', # 'root', # 修改后的用户名 password='Pem287cwM58jNpe2', # 修改后的密码 db='drug_retrieve', # charset='utf8mb4' ) def get_task_mysql(): task_cnn = pymysql.connect( host='39.108.116.125', # "localhost", # 修改后的主机 port=3306, # 3306, # 添加端口号 user='drug_retrieve', # 'root', # 修改后的用户名 password='Pem287cwM58jNpe2', # 修改后的密码 db='drug_retrieve', # "drug_data", # 修改后的数据库名 charset='utf8mb4' ) try: cursor = task_cnn.cursor() result = {} # 用于存储两个表的返回数据 # ---------- 任务表 ---------- sql1 = "SELECT * FROM retrieve_collect_task_allocate WHERE platform = 3 AND status = 1" cursor.execute(sql1) row1 = cursor.fetchone() print(row1) if row1: result['data'] = row1 else: print("表1没有找到 platform=3 且 status=1 的记录") result['data'] = None # ---------- 设备表---------- # 条件:name 包含 'pdd' 且 status = 0 keyword = "pdd" dynamic_id = row1[2] sql2 = "SELECT * FROM retrieve_collect_equipment WHERE name LIKE %s AND id = %s" cursor.execute(sql2, (f'%{keyword}%', dynamic_id)) row2 = cursor.fetchone() if row2: result['device_id'] = row2[2] else: print("设备表没有找到 name 包含 'pdd' 且 status=0 的记录") result['device_id'] = None # 提交所有更新 task_cnn.commit() return result except Exception as e: print(f"操作出错: {e}") task_cnn.rollback() return None finally: cursor.close() task_cnn.close() def get_all_tasks(): """ 获取所有符合条件的任务,返回任务列表 返回格式: [{'data': row1, 'device_id': device_id}, ...] """ task_cnn = pymysql.connect( host='39.108.116.125', port=3306, user='drug_retrieve', password='Pem287cwM58jNpe2', db='drug_retrieve', charset='utf8mb4' ) result_list = [] try: cursor = task_cnn.cursor() # ---------- 获取所有符合条件的任务 ---------- sql1 = "SELECT * FROM retrieve_collect_task_allocate WHERE platform = 3 AND status = 1" cursor.execute(sql1) rows = cursor.fetchall() # 获取所有记录 if not rows: print("没有找到 platform=3 且 status=1 的任务记录") return result_list print(f"找到 {len(rows)} 个待执行任务") # ---------- 遍历每个任务,获取对应的设备 ---------- for row in rows: task_info = {} task_info['data'] = row # 获取 dynamic_id(假设在索引2的位置,请根据实际表结构调整) dynamic_id = row[2] # 如果索引不对,请修改 keyword = "pdd" # 查询设备表 sql2 = "SELECT * FROM retrieve_collect_equipment WHERE name LIKE %s AND id = %s" cursor.execute(sql2, (f'%{keyword}%', dynamic_id)) device_row = cursor.fetchone() if device_row: task_info['device_id'] = device_row[2] # 假设设备ID在索引2 print(f"任务 {dynamic_id} 分配到设备: {task_info['device_id']}") else: print(f"警告: 任务 {dynamic_id} 没有找到可用设备") task_info['device_id'] = None result_list.append(task_info) task_cnn.commit() return result_list except Exception as e: print(f"操作出错: {e}") task_cnn.rollback() return [] finally: cursor.close() task_cnn.close() def report_api(task_id,page=None,start=None,end_page=None,end_time=None): params = { "collect_task_allocate_id": task_id, "statr_page":page, "end_page":end_page, "status": start, "finish_status": 0, "start_time": int(time.time()), "end_time": '', } print(params) url = "http://schedule.dfwy.tech/api/collect_equipment_execute/result_report" res = requests.get(url, params=params, timeout=20) print(res.text) # 获取滑块验证中滑块需要移动的距离 def slide_verify(img_path): with open(img_path, 'rb') as f: b = base64.b64encode(f.read()).decode() ## 图片二进制流base64字符串 url = "http://api.jfbym.com/api/YmServer/customApi" data = { ## 关于参数,一般来说有3个;不同类型id可能有不同的参数个数和参数名,找客服获取 "token": "1nDVocTE2mJ0yLEYb2sZJ5uUY2VIEoGTkIpW44X7Kgk", "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data).json() print(response) if response.get("msg") == "识别成功": # 获取 data 中的 data 字段 result = response.get("data", {}).get("data") if result: print(result) # 输出结果 else: print("无法获取数据") else: print("识别未成功") return result class PDD: def __init__( self, search_key, device_id, title_key=None, spec_list=None, brand="", save_search_key=None, max_counts_limit=None, direct_shop_lookup=False, sort=None, platform = None, task_id = None ): self.package_name = 'com.xunmeng.pinduoduo' self.APP_ID = '116857964' self.API_KEY = '1gAzACJOAr7BeILKqkqPOETh' self.SECRET_KEY = 'ZNArANb9GwJYgLKg4EfYhukKBfPdl1n3' self.client = AipOcr(self.APP_ID, self.API_KEY, self.SECRET_KEY) self.table_name = "retrieve_scrape_data" # "pdd_drug" self.shop_table_name = "retrieve_scrape_data" # "pdd_shop_info" self.loggerPdd = logging.getLogger() self.clipboard = "" # 初始化剪切板的内容为空 self.task_id = task_id self.platform = platform self.sort = sort self.sort_key = 0 self.search_key = search_key # 参苓健脾胃颗粒 香砂平胃颗粒 舒肝颗粒 清肺化痰丸 self.title_key = title_key if title_key is not None else search_key self.spec_list = self._normalize_rule_list(spec_list) self.brand = brand self.save_search_key = save_search_key or search_key self.max_counts_limit = max_counts_limit self.direct_shop_lookup = direct_shop_lookup self.unrelated_data = 0 # 无关数据数量 self.device_id = device_id self.page = 0 # 统计售罄数量 self.sold_out_counts = 0 # 程序启动时间 self.program_start_time = self.app_start_time() # 统计商品数量 # 最大量数据阈值 self.max_counts = 0 # 统计点击商品的次数 self.click_counts = 0 # 商品在列表的位置 self.search_key_loc = 0 # oss配置 self.oss_config = { "access_key_id": Config.access_key_id, "access_key_secret": Config.access_key_secret, "endpoint": Config.endpoint, # 例: oss-cn-beijing.aliyuncs.com "bucket_name": Config.bucket_name, "oss_prefix": Config.oss_prefix # OSS中存放截图的前缀(虚拟文件夹) } # 异常处理 def wr_re(self, mod, device_id, sort=None, page=None): file_path = f'./ycwj/{device_id}_{self.title_key}.txt' if mod == "写": try: data = { "page": page if page else "", "sort": sort if sort else "", } os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"进度保存成功:{sort},{page}页") except Exception as e: print("保存进度失败") elif mod == "读": try: if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) print(self.sort) if self.sort and self.sort_key == 0: self.li_or_lo(self.sort) if data['page'] != '': i = 0 while True: if i == data['page']: self.page = data['page'] break else: i += 1 end_y = 300 self.d.swipe(200, 1400, 200, end_y, 0.4) else: return None return data except Exception as e: print(f"读取进度失败", e) return None return None # 排序 def li_or_lo(self, key): if key == "升序": self.sort_key += 1 self.d.xpath('//*[@text="价格"]').click() n = self.d.xpath('//*[@text="总价低到高"]') if n.exists: n.click() time.sleep(self.get_sleep_time()) if key == "降序": self.sort_key += 1 self.d.xpath('//*[@text="价格"]').click() n = self.d.xpath('//*[@text="单粒价格低到高"]') if n: n.click() else: self.d.xpath('//*[@text="价格"]').click() # 返回列表页 def back_to_list_page(self): for i in range(10): if self.distinct_target(): return True print(f'第{i}次尝试退回到列表页') self.swipe_back(1) time.sleep(1) print('页面出错,没有退回到列表页') return False def get_drug_lis(self, idx): if idx == 0: drug_lis = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all() else: for i in range(1, 6): drug_lis = self.d.xpath( f'/hierarchy/android.widget.FrameLayout[{i}]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.FrameLayout').all() if drug_lis: break return drug_lis # 代码运行那时候的时间 def app_current_time(self): return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def slide_link(self): value_tag = None if self.d.xpath('//*[@text="微信"]').exists: value_tag = self.d.xpath('//*[@text="微信"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return if self.d.xpath('//*[@text="朋友圈"]').exists: value_tag = self.d.xpath('//*[@text="朋友圈"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return if self.d.xpath('//*[@text="QQ好友"]').exists: value_tag = self.d.xpath('//*[@text="QQ好友"]').info['bounds'] self.d.swipe(400, value_tag['top'], 100, value_tag['top'], 0.3) return def app_start_time(self): """ 获取app启动时间 :return: """ return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def stop_app(self): self.d.app_stop(self.package_name) time.sleep(5) def start_app(self): self.d.app_start(self.package_name) time.sleep(5) def restart_app(self): """ 重启app :return: """ self.stop_app() self.start_app() @staticmethod def get_sleep_time(): return random.randint(1, 2) # return random.randint(5, 8) @staticmethod def get_current_date(): return datetime.datetime.now().strftime('%Y/%m/%d') @staticmethod def _normalize_rule_list(value): if value is None: return [] if isinstance(value, (list, tuple, set)): raw_values = value else: raw_values = [value] result = [] for item in raw_values: item_str = str(item).strip() if item_str: result.append(item_str) return result @staticmethod def _normalize_match_text(value): return re.sub(r'\s+', '', str(value or '')).lower() def _match_any_keyword(self, text, keywords): keyword_list = self._normalize_rule_list(keywords) if not keyword_list: return True normalized_text = self._normalize_match_text(text) return any(self._normalize_match_text(keyword) in normalized_text for keyword in keyword_list) def is_link_spec_useful(self, product_title, specifications=''): if not self.spec_list: return True title_text = self._normalize_match_text(product_title) spec_text = self._normalize_match_text(specifications) for spec in self.spec_list: normalized_spec = self._normalize_match_text(spec) if normalized_spec in title_text or normalized_spec in spec_text: return True return False def is_link_useful(self, product_title, specifications=''): if not self._match_any_keyword(product_title, self.title_key): print(f"当前商品名称:{product_title} 不包含{self.title_key}关键字") return False if not self._match_any_keyword(product_title, self.brand): print(f"当前商品名称:{product_title} 不包含{self.brand}品牌") return False if not self.is_link_spec_useful(product_title, specifications): print(f"当前商品名称:{product_title} 不包含{self.spec_list}品规") return False return True def remove_watermark(self, img_path): """ 图片去水印(将水印部分变成白色背景)并将数据转化为二进制数据 :param img_path: 图片路径 :return: 二进制图片数据 """ img = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) endswith = os.path.splitext(img_path)[1] new = np.clip(1.4057577998008846 * img - 38.33089999653017, 0, 255).astype(np.uint8) _, img_binary = cv2.imencode(endswith, new) return img_binary def get_shop_name(self): """ 获取店铺名 :return: """ try: xpath = '//*[@text="进店"]/preceding-sibling::android.view.ViewGroup/android.widget.LinearLayout/android.widget.TextView' if self.d.xpath(xpath).exists: shop_name = self.d.xpath(xpath).text self.loggerPdd.info(f'1-获取到店铺名:{shop_name}') else: # 进入店铺新页面 shop_btn_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]' if self.d.xpath(shop_btn_xpath).exists: self.d.xpath(shop_btn_xpath).click() time.sleep(1) # self.d.xpath('//*[@text="店铺"]').click() xpath_shop_name = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.TextView[1]' if self.d.xpath(xpath_shop_name).exists: shop_name = self.d.xpath(xpath_shop_name).text self.loggerPdd.info(f'2-获取到店铺名:{shop_name}') else: shop_name = '' self.loggerPdd.info(f'3-获取到店铺名:{shop_name}') self.swipe_back(1) # else: shop_name = '' self.loggerPdd.info('4-因为shop_btn_xpath不存在,获取到店铺名为空') # time.sleep(10000) return shop_name except Exception as e: print(f'获取店铺名出错:{e}') self.loggerPdd.error(f'获取店铺名出错:{e}') return None def save_to_database(self, data): print(f'保存数据到数据库:{data}') max_retries = 5 for attempt in range(max_retries): conn = None try: conn = get_mysql() with conn.cursor() as cur: add_sql = """ INSERT INTO retrieve_scrape_data ( enterprise_id, platform_id, platform_item_id, province_id, city_id, province_name, city_name, area_info, product_name, product_specs, one_box_price, manufacture_date, expiry_date, manufacturer, approval_number, is_sold_out, online_posting_count, continuous_listing_count, link_url, store_name, store_url, shipment_province_id, shipment_province_name, shipment_city_id, shipment_city_name, company_name, qualification_number, scrape_date, min_price, number, sales, inventory, snapshot_url ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ cur.execute(add_sql, ( data['enterprise_id'], data['platform_id'], data['platform_item_id'], data['province_id'], data['city_id'], data['province_name'], data['city_name'], data['area_info'], data['product_name'], data['product_specs'], data['one_box_price'], data['manufacture_date'], data['expiry_date'], data['manufacturer'], data['approval_number'], data['is_sold_out'], data['online_posting_count'], data['continuous_listing_count'], data['link_url'], data['store_name'], data['store_url'], data['shipment_province_id'], data['shipment_province_name'], data['shipment_city_id'], data['shipment_city_name'], data['company_name'], data['qualification_number'], data['scrape_date'], data['min_price'], data['number'], data['sales'], data['inventory'], data['snapshot_url'], )) conn.commit() # 如果你有计数器,可以取消下面注释 self.max_counts += 1 print("存入数据库成功") return True except Exception as e: print(f'保存数据库异常 (尝试 {attempt + 1}/{max_retries}): {e}') if conn: conn.rollback() conn.close() if attempt == max_retries - 1: print("达到最大重试次数,保存失败") return False time.sleep(2) def click_target_product_by_search_key(self, fuzzy_match=False, timeout=10): """ 动态匹配self.search_key对应的商品并点击 :param fuzzy_match: 是否模糊匹配(应对商品名带额外后缀/前缀的情况) 不模糊匹配 :param timeout: 等待元素出现的超时时间(秒) :return: 点击是否成功(bool) """ try: # 1. 定义定位条件(动态使用self.search_key) if fuzzy_match: # 模糊匹配:包含search_key即可(推荐,适配搜索结果商品名略有差异) locator = self.d(textContains=self.search_key) print(f"🔍 模糊匹配商品:包含「{self.search_key}」的元素") else: # 精确匹配:商品名与search_key完全一致 locator = self.d(text=self.search_key) print(f"🔍 精确匹配商品:「{self.search_key}」") # 2. 等待元素出现(核心:避免元素未加载就点击) if locator.wait(timeout=timeout): print(f"✅ 找到匹配的商品,准备点击") # 执行点击(优先点击可点击的元素) locator.click() print(f"✅ 成功点击「{self.search_key}」对应的商品") # 点击后等待页面加载 time.sleep(self.get_sleep_time()) return True else: # print(f"❌ 超时未找到「{self.search_key}」对应的商品,尝试滑动刷新") # # 容错:向上滑动(加载更多)后重试 # self.swipe_up() # 滑动后再等待5秒重试 # if locator.wait(timeout=5): # locator.click() # print(f"✅ 滑动后找到并点击「{self.search_key}」对应的商品") # return True # else: print(f"❌ 滑动后仍未找到「{self.search_key}」对应的商品") return False except Exception as e: print(f"❌ 点击「{self.search_key}」对应商品时异常:{e}") return False def swipe_down(self): """ 下滑(模拟真人操作,抗风控+设备适配+容错) 核心:起点在屏幕上方,终点在屏幕下方(和上滑相反) :return: None """ try: # 1. 获取屏幕尺寸(兼容不同设备,给默认值避免获取失败) screen_width = self.d.info.get('displayWidth', 1080) # 默认1080px宽度 screen_height = self.d.info.get('displayHeight', 2400) # 默认2400px高度 # 2. 随机滑动时长(0.1~0.3秒,避免固定值被风控,且不设0秒) duration_rate = random.uniform(0.1, 0.3) # 3. 计算滑动坐标(用屏幕比例,适配所有设备) start_x = screen_width // 2 # 水平居中(和上滑一致,符合真人操作习惯) start_y = screen_height * 0.2 # 起点:屏幕20%高度(上方偏下) end_y = screen_height * 0.8 # 终点:屏幕80%高度(下方偏上) # 强制确保起点y < 终点y(必为向下滑,避免逻辑错误) start_y, end_y = min(start_y, end_y - 10), max(end_y, start_y + 10) # 4. 核心向下滑动操作 self.d.swipe(start_x, start_y, start_x, end_y, duration=duration_rate) # 滑动后全局等待(确保页面加载,避免元素定位失败) time.sleep(self.get_sleep_time()) except Exception as e: # 异常捕获:避免设备断开/滑动失败导致程序崩溃 print(f"向下滑动失败:{e}") # 兜底方案:用固定坐标重试(适配主流1080x2400设备) self.d.swipe(540, 480, 540, 1920, duration=0.2) time.sleep(self.get_sleep_time()) def swipe_up(self): """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) def swipe_back(self, no): """ 返回 :param no: 回退次数 :return: """ if not self.distinct_target(): for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) def drug_price(self): """ 获取药品价格 :return: """ try: xpath = '//*[@text="¥"]/following-sibling::android.widget.TextView[1]' price_str = self.d.xpath(xpath).text price = float(re.search(r'[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return float(price) except Exception as e: print(f'提取价格出错-->{e}') return None def drug_price_ex(self): price_str = '' # 价格初始化 ext = '' # 初始化已选择的信息 price = '' # 这是点击进入品规的按钮 button_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]' button_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[last()]' # 调试 # test_button = self.d.xpath(button_xpath_1).exists # print(test_button) # test_button_2 = self.d.xpath(button_xpath_2).exists # print(test_button_2) # time.sleep(1000) # if self.d.xpath('//*[@text="发起拼单"]').exists: # self.d.xpath('//*[@text="发起拼单"]').click() # elif self.d.xpath('//*[@text="去复诊开药"]').exists: # self.d.xpath('//*[@text="去复诊开药"]').click() if self.d.xpath(button_xpath_1).exists: self.d.xpath(button_xpath_1).click() elif self.d.xpath(button_xpath_2).exists: self.d.xpath(button_xpath_2).click() else: print("button1 and button_2 all not exist") return price, ext select_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]' select_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[last()]' select_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()]' select_xpath_3_2 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[last()-1]' price_xpath_1 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]' price_xpath_2 = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.TextView[1]' price_xpath_3 = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[2]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.view.ViewGroup[1]//android.widget.TextView[1]' if self.d.xpath(select_xpath_1).exists: text1 = self.d.xpath(select_xpath_1).text print(f"select_xpath_1--text1={text1}") if '已选' in text1: if self.d.xpath(price_xpath_1).exists: price_str = self.d.xpath(price_xpath_1).text print(f"select_xpath_1--price_str-1={price_str}") else: print("select_xpath_1--price_xpath_1-1 not exist") ext = text1 elif '请选择' in text1: # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]' scroll_xpath_2 = '' if self.d.xpath(scroll_xpath_1).exists: self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格会刷新 if self.d.xpath(select_xpath_1).exists: text2 = self.d.xpath(select_xpath_1).text if '已选' in text2: print(f"select_xpath_1--已选择2:text2={text2}") if self.d.xpath(price_xpath_1).exists: price_str = self.d.xpath(price_xpath_1).text print(f"select_xpath_1--price_str-2={price_str}") else: print("select_xpath_1--price_xpath_1-2 not exist") ext = text2 else: print("select_xpath_1--scroll_xpath_1 not exist") elif self.d.xpath(select_xpath_2).exists: text1 = self.d.xpath(select_xpath_2).text print(f"xpath2--text1={text1}") if '已选' in text1: ext = text1 if self.d.xpath(price_xpath_2).exists: price_str = self.d.xpath(price_xpath_2).text print(f"select_xpath_2--price_str-2={price_str}") else: print("select_xpath_2--price_xpath_2-1 not exist") elif '请选择' in text1: print('come in here') # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' if self.d.xpath(scroll_xpath_1).exists: print("scroll_xpath_1 exists") self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_2).exists: text2 = self.d.xpath(select_xpath_2).text if '已选' in text2: ext = text2 print(f"select_xpath_2--已选择2:text2={text2}") if self.d.xpath(price_xpath_2).exists: price_str = self.d.xpath(price_xpath_2).text print(f"select_xpath_2--price_str-2={price_str}") else: print("select_xpath_2--price_xpath_2-2 not exist") else: print("scroll_xpath_1 not exists") else: print("not exist 请选择 or 已选") elif self.d.xpath(select_xpath_3).exists: text1 = self.d.xpath(select_xpath_3).text print(f"xpath3--text1-1={text1}") if ('请选择' not in text1) and ('已选' not in text1): text1 = self.d.xpath(select_xpath_3_2).text print(f"xpath3--text1-2={text1}") if '已选' in text1: ext = text1 if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-3-1={price_str}") else: print("select_xpath_3--price_xpath_3-3-1 not exist") elif '请选择' in text1: print('come in here') # 需要再下面点击选择 scroll_xpath_1 = '//*[@resource-id="android:id/content"]//android.widget.ScrollView[1]/android.widget.LinearLayout[1]/android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' recycler_view_xpath = '//*[@resource-id="android:id/content"]//android.support.v7.widget.RecyclerView[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]' if self.d.xpath(scroll_xpath_1).exists: print("scroll_xpath_1 exists") self.d.xpath(scroll_xpath_1).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_3).exists: text2 = self.d.xpath(select_xpath_3).text if '已选' in text2: ext = text2 print(f"select_xpath_3--已选择2:text2={text2}") if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-2={price_str}") else: print("select_xpath_3--price_xpath_3-3-2 not exist") elif self.d.xpath(recycler_view_xpath).exists: self.d.xpath(recycler_view_xpath).click() time.sleep(2) # 延时2秒钟,选择了之后价格可能会刷新 if self.d.xpath(select_xpath_3).exists: text2 = self.d.xpath(select_xpath_3).text if '已选' in text2: ext = text2 print(f"select_xpath_3--已选择2:text2={text2}") if self.d.xpath(price_xpath_3).exists: price_str = self.d.xpath(price_xpath_3).text print(f"select_xpath_3--price_str-3-3={price_str}") else: print("select_xpath_3--price_xpath_3-3-3 not exist") else: print("scroll_xpath_1 not exists") else: print(f"xpath3--text1-不包含请选择和已选择") else: print("select_xpath_1 and select_xpath_2 and select_xpath_3 all not exist") if price_str: # price = float(re.search('[\d\.]+', price_str).group()) match = re.search(r'¥([\d\.]+)', price_str) if match: price = float(match.group(1)) else: price = '' # price = float(re.search(r'¥([\d\.]+)', price_str).group(1)) print(f'获取到价格:{price}') print(f"ext={ext}") self.swipe_back(1) # return price, ext def restart_uiautomator_services(self, device_id): """ 重启atx的uiautomator 服务 :param device_id: :return: """ stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop' start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d' # result = subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) # print(result.stdout) subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True) time.sleep(self.get_sleep_time()) def connect_devices(self, device_id): """ 连接设备 :return: """ try: self.d = u2.connect_usb(device_id) # 设置隐形等待时间 # self.d.implicitly_wait(5) self.restart_uiautomator_services(device_id) print(f'[{self.program_start_time}]连接到设备:{device_id}') except Exception as e: print(f'{device_id} 连接错误: {e}') raise Exception(e) def get_ocr_res(self, img): try: image = self.remove_watermark(img) res_image = self.client.basicGeneral(image) data = res_image.get('words_result', '') print(f'百度api返回结果:{data}') return data except: return None def get_title(self): try: print('开始提取标题') time.sleep(self.get_sleep_time()) title_xpath = '//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]' if self.d.xpath(title_xpath).exists: title = self.d.xpath(title_xpath).info['contentDescription'].strip() else: return None # title = self.d.xpath('//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]').info['contentDescription'].strip() print(f'提取到标题:{title}') return title except Exception as e: print(f'获取标题出错:{e}') return None # 从里面匹配出药品名和规格 # drugs_name # specifications # match = re.search(r'([^\d]+)([\d\D]+)', title) # match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) # if match: # drugs_name = match.group(1).strip() + match.group(2).strip() # specifications = match.group(3).strip() # print("药品名:", drugs_name) # print("规格:", specifications) # print('完整药名:', drugs_name + specifications) # return drugs_name, specifications # else: # print("没有匹配到预期格式") def enter_shop(self): """ 进店,方便提取资质环境 :return: """ # self.d.xpath('//*[@text="进店"]').click() self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) def data_is_exists(self, data): # 1. 验证必要字段 required_keys = ['min_price', 'shop', 'scrape_date', 'platform'] if not all(key in data for key in required_keys): missing = [key for key in required_keys if key not in data] print(f"缺少必要字段: {', '.join(missing)}") return None conn = None try: conn = get_mysql() with conn.cursor() as cur: query_sql = """ SELECT * FROM {} WHERE min_price = %s AND store_name = %s AND scrape_date = %s AND platform_id = %s """.format(self.table_name) cur.execute(query_sql, ( data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") finally: if conn: conn.close() def get_instructions_data(self): """ 确定有详情页之后之后,提取所有的详情页数据 :return: """ # 下面的for循环已经有滑动的操作了,不要一进来就滑动。 # self.d.swipe_ext("up", scale=0.5) for i in range(8): if self.d.xpath('//*[@text="品牌"]').exists or self.d.xpath('//*[@text="药品通用名"]').exists: self.d.swipe_ext("up", scale=0.1) print('开始采集详情数据') break self.d.swipe_ext("up", scale=0.5) time.sleep(self.get_sleep_time()) # 点击查看全部 if self.d.xpath('//*[@text="品牌"]').exists: self.d.xpath('//*[@text="品牌"]').click() else: self.d.xpath('//*[@text="药品通用名"]').click() time.sleep(self.get_sleep_time()) attr = dict() # # 获取详情页信息 xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView' ddd = self.d.xpath(xpath).all() for i in range(0, len(ddd), 2): group = ddd[i:i + 2] attr[group[0].text] = group[1].text # 截图获取未获取到的数据 # if not all(i in ['有效期', '生产企业', '批准文号', '药品规格', '产品规格'] for i in attr.keys()): if not all(i in ['有效期', '生产企业', '批准文号', '药品规格'] for i in attr.keys()): self.d.swipe_ext("up", 0.4) time.sleep(self.get_sleep_time()) xpath = '//*[starts-with(@text,"商品参数")]/parent::*/parent::*/following-sibling::*/*/*/android.view.ViewGroup//android.widget.TextView' ddd = self.d.xpath(xpath).all() for i in range(0, len(ddd), 2): group = ddd[i:i + 2] attr[group[0].text] = group[1].text print(f'当前说明书规格参数:{attr}') res_data = { # "有效期": attr.get('有效期',''), # "生产单位": attr['生产企业'], # "批准文号": attr['批准文号'], # "产品规格": attr.get('药品规格') if attr.get('药品规格', '') else attr.get('药品规格') "有效期": attr.get('有效期', ''), "生产单位": attr.get('生产企业', ''), "批准文号": attr.get('批准文号', ''), "产品规格": attr.get('药品规格', '') } print(f'当前规格参数字典数据:{res_data}') return res_data def has_instructions(self): """ 是否有详情页 :return:如果有详情页返回True,否则返回False """ # 没有说明书的无法采集具体数据 max_attempts = 12 # 最大尝试次数 attempt = 0 # 当前尝试次数 while attempt < max_attempts: time.sleep(0.5) xpath = '//*[@text="商品详情"]' is_has_instructions = self.d.xpath(xpath).exists if is_has_instructions: return True # 如果找到“商品详情”,则返回True self.d.swipe_ext("up", 0.3) attempt += 1 return False # 如果尝试次数达到最大次数,则返回False def distinct_target(self): result = False is_position = self.d.xpath('//*[@content-desc="拍照搜索"]').exists is_position2 = self.d.xpath('//*[@text="年货节大促"]').exists is_position3 = self.d.xpath('//*[@text="筛选"]').exists is_position4 = self.d.xpath('//*[@text="回头客常拼"]').exists list_page_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.view.ViewGroup[1]/android.widget.LinearLayout[1]//android.support.v7.widget.RecyclerView[1]' is_position_new = self.d.xpath(list_page_xpath).exists print(f'is_position_new={is_position_new}') if is_position or is_position2 or is_position3 or is_position4 or is_position_new: result = True return result def enter_target_page(self): self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]').click() time.sleep(self.get_sleep_time()) self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) self.d.send_keys(self.search_key, clear=True) time.sleep(self.get_sleep_time()) self.d.xpath('//*[@text="搜索"]').click() time.sleep(self.get_sleep_time()) if self.sort and self.sort_key == 0: self.li_or_lo(self.sort) self.wr_re("读", self.device_id) def get_clipboard(self): self.loggerPdd.info(f"Clipboard content:{self.d.clipboard}") # 打印调试信息 clipboard_content = self.d.clipboard if clipboard_content is None: return '' return clipboard_content.strip() def get_product_link(self): product_link = '' print('开始获取商品链接') content_frame = self.d.xpath('//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]').exists print(content_frame) relative_layout = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]').exists print(relative_layout) relative_layout2 = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]').exists print(relative_layout2) Frame_Layout = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]').exists print(Frame_Layout) ImageView = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]').exists print(ImageView) ImageView2 = self.d.xpath( '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[3]/android.view.View[1]').exists print(ImageView2) # 多种可能的“分享”按钮 dots_xpaths = [ # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]', '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[last()]/android.view.View[1]', # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]/android.view.View[1]', # '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[2]/android.widget.FrameLayout[3]/android.widget.ImageView[1]', ] max_retry = 5 # 最多尝试次数 for idx in range(1, max_retry + 1): if product_link: # 已经拿到则退出 break for xp in dots_xpaths: if self.d.xpath(xp).exists: # print(f'{idx}-进入分享点点点') self.loggerPdd.info(f'{idx}-进入分享点点点') self.d.xpath(xp).click() time.sleep(1) self.loggerPdd.info('开始滑动') self.slide_link() time.sleep(0.2) self.d.xpath('//*[@text="复制链接"]').click_exists() time.sleep(1) product_link = self.get_clipboard() time.sleep(0.5) self.loggerPdd.info(f'{idx}-商品链接:{product_link}') break # 找到并执行后跳出内层循环 if not product_link and idx < max_retry: time.sleep(0.5) # 最后一次不需要再等待 # time.sleep(100000) return product_link def integrate_data_v2(self): """ 基于入口配置统一校验标题、品牌和品规,替代内部大量硬编码分支。 """ min_price, ext = self.drug_price_ex() title_info = self.get_title() if not title_info: print('标题获取为空') self.swipe_back(1) return if not self.is_link_useful(title_info): self.swipe_back(1) self.unrelated_data += 1 return if not min_price: min_price = self.drug_price() if not min_price: print('提取价格出错,回退到列表页') self.swipe_back(1) self.unrelated_data += 1 return product_link = self.get_product_link() time.sleep(2) if self.direct_shop_lookup: shop = self.get_shop_name() else: for _ in range(15): if self.d(textStartsWith="进店").exists: print('开始获取店铺名') break self.d.swipe_ext("up", scale=0.3) time.sleep(self.get_sleep_time()) if self.d(textStartsWith="进店").exists: print('可以开始获取店铺名') shop = self.get_shop_name() if not shop: print('当前店铺名称为空') self.swipe_back(1) self.unrelated_data += 1 return scrape_date = self.get_current_date() dup_data = { 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '3' } if self.data_is_exists(dup_data): print('存在相同数据不入库') self.back_to_list_page() return is_has_instructions = self.has_instructions() self.loggerPdd.info(f'是否有说明书:{is_has_instructions}') manufacture_date = '' credit_code = ext if is_has_instructions: try: instructions_info = self.get_instructions_data() expiry_date = instructions_info['有效期'].strip('。') manufacturer = instructions_info['生产单位'].strip('。') approval_number = instructions_info['批准文号'].strip('。') specifications = instructions_info['产品规格'].strip('。') except Exception as e: print(f'获取详情页规格参数出错:{e}') self.swipe_back(2) return else: expiry_date = '' manufacturer = '' approval_number = '' specifications = '' if not self.is_link_useful(title_info, specifications): self.swipe_back(1) self.unrelated_data += 1 return self.unrelated_data = 0 if extract_box_number(credit_code): one_box_price = min_price / extract_box_number(credit_code) else: print("单瓶药品价格没处理成功") one_box_price = 0 save_data = { 'enterprise_id': 0, 'platform_id': self.platform, 'platform_item_id': '', 'province_id': 0, 'city_id': 0, 'province_name': '', 'city_name': '', 'area_info': "", 'product_name': title_info, 'product_specs': specifications, 'one_box_price': one_box_price, 'manufacture_date': manufacture_date, 'expiry_date': expiry_date, 'manufacturer': manufacturer, 'approval_number': approval_number, 'is_sold_out': 0, 'online_posting_count': 1, 'continuous_listing_count': 1, 'link_url': product_link, 'store_name': shop, 'store_url': '', 'shipment_province_id': 0, 'shipment_province_name': "", 'shipment_city_id': 0, 'shipment_city_name': "", 'company_name': "", 'qualification_number': "", 'scrape_date': scrape_date, 'min_price': min_price, 'number': 0, 'sales': "", 'inventory': "", 'snapshot_url': "", 'insert_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } self.save_to_database(save_data) def main(self, device_id, search_key_length, keyword_idx): spider_no = 0 self.connect_devices(device_id) time.sleep(self.get_sleep_time()) if keyword_idx == 0: print("搜索前,先重启APP") self.restart_app() # 搜索关键字 self.enter_target_page() else: print("清空前面的文字,再输入关键词") self.d.send_keys(self.search_key, clear=True) time.sleep(1) print("点击搜索") self.d.xpath('//*[@text="搜索"]').click() time.sleep(1) # 上报状态 report_api(self.task_id, self.page, 2) for idx in range(300): print(f'第{idx + self.page}页') self.wr_re("写", self.device_id, self.sort, idx + self.page) if spider_no > 30: time.sleep(300) spider_no = 0 if self.unrelated_data > 10: print(f'[{self.program_start_time}]----{self.search_key}----连续超过10个不达标的数据则停止采集') print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") self.swipe_down() time.sleep(self.get_sleep_time()) # 下滑后等待页面稳定 click_success = self.click_target_product_by_search_key(fuzzy_match=False) if not click_success: print(f"⚠️ 关键词「{self.search_key}」商品点击失败") return print("点击搜索框") self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) if keyword_idx == search_key_length - 1: print("程序最后一个品规采集完毕,返回主屏幕") report_api(self.task_id, end_page=idx, start=3, end_time=int(time.time())) # self.d.press("home") # 连续超过10个不达标的数据则停止采集 break if self.max_counts_limit and self.max_counts >= self.max_counts_limit: report_api(self.task_id, end_page=idx, start=3, end_time=int(time.time())) print('enough') # 向下滑 self.swipe_down() time.sleep(self.get_sleep_time()) # 点击搜索框 click_success = self.click_target_product_by_search_key(fuzzy_match=False) if not click_success: print(f"关键词「{self.search_key}」商品点击失败") return print("点击搜索框") self.d(className='android.widget.EditText').click() time.sleep(self.get_sleep_time()) break # 售罄次数大于4基本就是号废了但是如果下次点击不会出现这种情况就要重置为0 if self.sold_out_counts > 4: report_api(self.task_id, end_page=idx,start=4,end_time=int(time.time())) print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") print("====商品已售罄4次,结束采集(号不能用)") break drug_lis = self.get_drug_lis(idx) print('数量', len(drug_lis)) for idd, drug_one in enumerate(drug_lis): print(idd + 1, drug_one.info) time.sleep(self.get_sleep_time()) top = drug_one.info['bounds']['top'] bottom = drug_one.info['bounds']['bottom'] if bottom <= 1524 and top >= 258: drug_one.click() self.click_counts += 1 time.sleep(self.get_sleep_time()) # 先判断是否售罄次数是否大于4 if self.sold_out_counts >= 4: print( f"[程序启动时间:{self.program_start_time}-----程序结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") print("====这是在第一页有两个,商品已售罄4次,结束采集(号不能用)====") time.sleep(self.get_sleep_time()) report_api(self.task_id, end_page=idx, start=4, end_time=int(time.time())) self.d.press('home') return if self.d.xpath('//*[contains(@text, "商品已售罄")]').wait(timeout=5): print("======商品已售罄======") self.sold_out_counts += 1 if self.back_to_list_page(): continue # 采集药品信息 try: # 重置商品售罄次数 self.sold_out_counts = 0 self.integrate_data_v2() # 检测下是否回退到列表页 if self.back_to_list_page(): print('回退到列表页', True) else: print(f'[{self.app_current_time()}] 回退到列表页失败') print( f"[程序启动时间:{self.program_start_time}-----结束时间:{self.app_current_time()}]----搜索关键词:{self.search_key}----点击了{self.click_counts}个商品") report_api(self.task_id, end_page=idx, start=4, end_time=int(time.time())) return time.sleep(self.get_sleep_time()) spider_no += 1 except Exception as e: self.loggerPdd.error(f'采集药品详情数据出错:{e}') if not self.back_to_list_page(): print('页面出错,退回列表') else: continue if self.d(textStartsWith="抱歉,没有更多商品啦~").exists: report_api(self.task_id, end_page=idx,start=3,end_time=int(time.time())) print('已经到达列表页最底部') break print('开始滑入下一页') end_y = 300 self.d.swipe(200, 1400, 200, end_y, 0.4) time.sleep(self.get_sleep_time()) # pdd def main(): # task = get_task_mysql() task = get_all_tasks() for i in range(len(task)): if task[i]["data"] and task[i]["device_id"]: print("任务开始") device_list = { task[i]["device_id"]: [ { "search_key": task[i]["data"][7] + task[i]["data"][5], "title_key": task[i]["data"][5], "spec_list": task[i]["data"][6], "brand": task[i]["data"][7], "save_search_key": task[i]["data"][7] + task[i]["data"][5], "max_counts_limit": 300, "sort": "升序", "platform": task[i]["data"][4], "task_id": task[i]["data"][0] }, ], } # if len(sys.argv) < 2: # print('需要输入设备id') # return # device_id = sys.argv[1] # print(f"设备id: {device_id}") device_id = task[i]["device_id"] if device_id not in device_list: print(f"设备id没有配置: {device_id}") return tasks = device_list[device_id] task_length = len(tasks) for keyword_idx, task in enumerate(tasks): print(f"正在处理第 {keyword_idx + 1}/{task_length} 个关键词:{task['search_key']}") pdd = PDD( task["search_key"], device_id, title_key=task.get("title_key"), spec_list=task.get("spec_list"), brand=task.get("brand", ""), save_search_key=task.get("save_search_key"), max_counts_limit=task.get("max_counts_limit"), direct_shop_lookup=task.get("direct_shop_lookup", False), sort=task.get("sort"), platform=task.get("platform"), task_id=task.get("task_id"), ) pdd.main(device_id, task_length, keyword_idx) else: if not task["data"]: print("没有任务") if not task["device_id"]: print("设备被占用") if __name__ == '__main__': main()