# mt_spider/spider.py import re, time, random, datetime, json, logging, requests, base64, cv2, uiautomator2 as u2, subprocess from monitor import SpiderMonitor from config import Config from db import get_mysql # ------------------ 装饰器 ------------------ def safe_method(func): import functools @functools.wraps(func) def wrapper(self, *args, **kwargs): return self.safe_exec(func, self, *args, **kwargs) return wrapper # ------------------ 主类 ------------------ class MT: def __init__(self, key: str): self.package_name = Config.PACKAGE_NAME self.search_key = key self.unrelated_data = 0 self.monitor = None self.d = None # ------------------ 通用 ------------------ def safe_exec(self, func, *args, **kwargs): while self.monitor and self.monitor.pausing.is_set(): time.sleep(1) return func(*args, **kwargs) @staticmethod def get_sleep_time(): return random.randint(5, 8) @staticmethod def get_current_date(): return datetime.datetime.now().strftime('%Y-%m-%d') def stop_all(self): logging.warning("收到停止信号,准备退出") if self.monitor: self.monitor.stop() # ------------------ 设备/APP ------------------ def connect_devices(self, device_id): """ 连接设备 :return: """ # try: # self.d = u2.connect_usb(device_id) # # 设置隐形等待时间 # # self.d.implicitly_wait(5) # self.restart_uiautomator_services(device_id) # print(f'连接到设备:{device_id}') # except Exception as e: # print(f'{device_id} 连接错误: {e}') # raise Exception(e) self.d = u2.connect_usb(device_id) print(f'连接到设备:{device_id}') subprocess.run( f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d'.split(), capture_output=True ) time.sleep(3) def restart_app(self): self.d.app_stop(self.package_name) time.sleep(2) self.d.app_start(self.package_name) time.sleep(5) # ------------------ 页面操作 ------------------ def enter_target_page(self): self.d.xpath('//*[@content-desc="看病买药"]').click() self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click() self.d.xpath('//*[@text="搜索"]').click() self.d.send_keys(self.search_key, clear=True) self.d.xpath('//*[@text="搜索"]').click() # ------------------ 数据抓取 ------------------ @safe_method def get_title(self): if "999" in self.search_key: self.search_key = self.search_key.replace("999", "") title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text print(f'获取到药品标题:{title}') match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title) if match: drugs_name = match.group(1).strip() + match.group(2).strip() specifications = match.group(3).strip() print("药品名:", drugs_name) print("规格:", specifications) print('完整药名:', drugs_name + specifications) return drugs_name, specifications else: print("没有匹配到预期格式") return None, None # match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title or "") # return (match.group(1) + match.group(2), match.group(3)) if match else (None, None) @safe_method def swipe_up(self): """ 上滑 :return: """ screen_width = self.d.info['displayWidth'] screen_height = self.d.info['displayHeight'] duration_rate = random.uniform(0, 0.3) self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate) no = random.uniform(0, 1) if no > 0.85: # 有的时候卡着 再稍微往上滑一点点 self.d.swipe_ext("up", 0.1) time.sleep(self.get_sleep_time()) @safe_method def swipe_back(self, no): """ 返回 :param no: 回退次数 :return: """ for idx in range(no): self.d.press('back') time.sleep(self.get_sleep_time()) @safe_method def drug_price(self): """ 获取药品价格 :return: """ try: price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text price = float(re.search('[\d\.]+', price_str).group()) print(f'获取到价格:{price}') return price except Exception as e: print(f'提取价格出错-->{e}') return None # txt = self.d.xpath('//*[starts-with(@text,"¥")]').text # return float(re.search(r'[\d.]+', txt).group()) if txt else None @safe_method def get_shop_name(self): try: return self.d.xpath('//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text except: try: return self.d.xpath('//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text except Exception as e: logging.error('获取店铺名出错: %s', e) return None @safe_method def get_shop_address(self): try: shop_address = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text print(f'获取到店铺地址:{shop_address}') return shop_address except: return None @safe_method def get_qualification_number(self): try: qualification_number_str = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text return qualification_number_str.strip('资质编号:').strip() except: return None # ------------------ OCR ------------------ def get_ocr_res(self, img): try: #img地址 print(f'开始识别图片:{img}') request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license" # 二进制方式打开图片文件 f = open(img, 'rb') img = base64.b64encode(f.read()) params = {"image": img} # access_token = get_access_token() request_url = request_url + "?access_token=" + self.access_token headers = {'content-type': 'application/x-www-form-urlencoded'} response = requests.post(request_url, data=params, headers=headers) if response: res = response.json() new_dic = dict() for ite in res['words_result'].keys(): new_dic[ite] = res['words_result'][ite]['words'] print('资质数据信息', new_dic) return new_dic else: return None except: return None def screenshot_the_business_license(self, qualification_number: str): screenshot_path = 'screenshot1.png' self.d.screenshot(screenshot_path) img = cv2.imread(screenshot_path) # 指定裁剪区域 (left, top, right, bottom) left = 0 top = 480 right = 720 bottom = 1420 cropped_img = img[top:bottom, left:right] if qualification_number: cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png' else: cropped_screenshot_path = 'cropped_screenshot.png' cv2.imwrite(cropped_screenshot_path, cropped_img) return cropped_screenshot_path # ------------------ 说明书 ------------------ @safe_method def get_instructions_data(self): self.d.xpath('//*[@text="说明"]').click() time.sleep(1) self.d.xpath('//*[@text="查看详细说明"]').click() time.sleep(1) self.d.xpath('//*[@text="加载更多"]').click_exists() loop_page = 5 new_list = [] for i in range(loop_page): self.d.xpath('//*[@text="加载更多"]').click_exists() time.sleep(0.2) if i == 0: self.d.swipe(200, 1000, 200, 300, 0.4) else: self.d.swipe(200, 1000, 200, 62) time.sleep(0.2) if self.d.xpath('//*[@text="加载更多"]').exists: self.d.xpath('//*[@text="加载更多"]').click() time.sleep(0.2) all_tt = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all() for idx in range(1, len(all_tt) + 1): all_tt1 = self.d.xpath( f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all() print(f'当前说明书列表数据:{all_tt1}') for tt in all_tt1: if tt.text and tt.text != '展开全文': new_list.append(tt.text) if i == 0: height = 938 else: drug_box = self.d.xpath( '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info bounds = drug_box['bounds'] height = bounds['bottom'] - bounds['top'] if height < 938: # print('说明书翻页到底部') break # 展开全文 new_list = [item for item in new_list if item != '展开全文'] print(f'当前说明书列表数据:{new_list}') res_data = { "有效期": (new_list[new_list.index("有效期") + 1]) if "有效期" in new_list and new_list.index("有效期") + 1 < len(new_list) else "", "生产单位": (new_list[new_list.index("生产单位") + 1]) if "生产单位" in new_list and new_list.index("生产单位") + 1 < len(new_list) else "", "批准文号": (new_list[new_list.index("批准文号") + 1]) if "批准文号" in new_list and new_list.index("批准文号") + 1 < len(new_list) else "" } print(f'当前说明书字典数据:{res_data}') return res_data # ------------------ 店铺资质 ------------------ def enter_shop(self): self.d.xpath('//*[@text="店铺"]').click() time.sleep(self.get_sleep_time()) def enter_shoper(self): self.d.xpath('//*[@text="商家"]').click() time.sleep(self.get_sleep_time()) def scan_shoper_license(self): self.d.xpath('//*[@text="查看商家资质"]').click() time.sleep(self.get_sleep_time()) @safe_method def get_license_info_ex(self): self.enter_shop() self.enter_shoper() contact_address = self.get_shop_address() self.scan_shoper_license() qualification_number = self.get_qualification_number() if qualification_number: business_license_company = '' business_license_address = '' self.d.click(0.603, 0.27) time.sleep(self.get_sleep_time()) img_path = self.screenshot_the_business_license(qualification_number) print(f'cropped_screenshot_path:{img_path}') ocr_res = self.get_ocr_res(img_path) print(f'ocr_res:{ocr_res}') if ocr_res: if '单位名称' in ocr_res.keys(): business_license_company = ocr_res['单位名称'] if '地址' in ocr_res.keys(): business_license_address = ocr_res['地址'] license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number, 'business_license_company': business_license_company, 'business_license_address': business_license_address} else: license_info_data = {'contact_address': contact_address, 'qualification_number': '', 'business_license_company': '', 'business_license_address': ''} return license_info_data # ------------------ 数据库 ------------------ def data_is_exists(self, data): try: conn = get_mysql() cur = conn.cursor() query_sql = """ SELECT * FROM {} WHERE product = %s AND min_price = %s AND shop = %s AND scrape_date = %s AND platform = %s """.format(Config.DB_TABLE) cur.execute(query_sql, ( data['product'], data['min_price'], data['shop'], data['scrape_date'], data['platform'] )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") logging.error('检查商品存在性失败: %s', e) return False def shop_is_exists_database(self, shop): try: conn = get_mysql() cur = conn.cursor() query_sql = """ SELECT * FROM {} WHERE shop = %s """.format(Config.DB_SHOP_TABLE) cur.execute(query_sql, ( shop )) result = cur.fetchone() return bool(result) # 如果存在返回True,否则False except Exception as e: print(f"MySQL 错误: {str(e)}") logging.error('检查店铺存在性失败: %s', e) return False def save_to_database(self, data): print(f'保存数据到数据库:{data}') try: conn = get_mysql() cur = conn.cursor() add_sql = f""" INSERT INTO {Config.DB_TABLE} (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cur.execute(add_sql, ( data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'], data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], data['scrape_date'], data['scrape_province'], data['availability'], data['credit_code'], data['platform'] )) conn.commit() # 提交数据 print(f"存入数据库成功") logging.info('商品数据已入库') except Exception as e: logging.error('写入商品数据失败: %s', e) def save_shop_info_to_database(self, data): print(f'保存店铺数据到数据库:{data}') try: conn = get_mysql() cur = conn.cursor() add_sql = f""" INSERT INTO {Config.DB_TABLE} (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.execute(add_sql, ( data['shop'], data['contact_address'], data['qualification_number'], data['business_license_company'], data['business_license_address'], data['scrape_date'], data['platform'] )) conn.commit() # 提交数据 print(f'存入店铺信息到数据库成功') logging.info('店铺数据已入库') except Exception as e: logging.error('写入店铺数据失败: %s', e) # ------------------ 采集一条完整商品数据 ------------------ @safe_method def integrate_data(self): """ 采集一条完整商品 + 店铺信息并入库 """ logger = logging.getLogger() logger.info('开始采集当前商品详情') # 1. 商品名 + 规格 title_info = self.get_title() if not title_info: logger.warning('未获取到标题,跳过') self.swipe_back(1) return product, specifications = title_info if self.search_key not in product.replace(' ', ''): logger.info('无关商品,跳过') self.unrelated_data += 1 self.swipe_back(1) return # 2. 价格 min_price = self.drug_price() if min_price is None: logger.warning('未获取到价格,跳过') self.swipe_back(1) return # 3. 自营判断 if self.d.xpath('//*[@text="自营"]').exists: shop = "美团自营大药房(快递电商)" scrape_date = self.get_current_date() dup_data = { 'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团' } if self.data_is_exists(dup_data): logger.info('自营商品已存在,跳过') self.swipe_back(1) return else: # 4. 非自营:找进店 for i in range(3): if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名1') break self.d.swipe_ext('up', 0.2) time.sleep(1) if self.d.xpath('//*[@text="进店"]').exists: print('开始获取店铺名2') break shop = self.get_shop_name() scrape_date = self.get_current_date() dup_data = { 'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date, 'platform': '美团' } if self.data_is_exists(dup_data): logger.info('商品已存在,跳过') self.swipe_back(1) return if not shop or '自营' in shop: logger.info('店铺为自营或空,跳过') self.swipe_back(1) return # 5. 采集店铺资质(仅新店铺) if self.d.xpath('//*[@text="进店"]').exists and '美团官方' not in shop and not self.shop_is_exists_database(shop): lic = self.get_license_info_ex() save_shop_data = { 'shop': shop, 'contact_address': lic['contact_address'], 'qualification_number': lic['qualification_number'], 'business_license_company': lic['business_license_company'], 'business_license_address': lic['business_license_address'], 'scrape_date': scrape_date, 'platform': '美团' } self.save_shop_info_to_database(save_shop_data) self.swipe_back(2) # 返回两次:资质页 -> 店铺 -> 列表 # 6. 说明书信息 if not self.has_instructions(): logger.info('无说明书,跳过') self.swipe_back(1) return instructions = self.get_instructions_data() expiry_date = instructions.get('有效期', '').strip('。') manufacturer = instructions.get('生产单位', '').strip('。') approval_number = instructions.get('批准文号', '').strip('。') # 7. 组装入库数据 save_data = { 'product': product, 'min_price': min_price, 'manufacture_date': '', 'expiry_date': expiry_date, 'shop': shop, 'business_license_company': '', 'province': '', 'city': '', 'manufacturer': manufacturer, 'specification': specifications, 'approval_number': approval_number, 'product_link': '', 'scrape_date': scrape_date, 'scrape_province': '广东', 'availability': '', 'credit_code': '', 'platform': '美团' } self.save_to_database(save_data) logger.info('商品数据已入库:%s', product) self.unrelated_data = 0 self.swipe_back(1) # ------------------ 主流程 ------------------ def main(self, device_id, retry_count=0): MAX_RETRY = 3 logger = logging.getLogger() spider_no = 0 self.connect_devices(device_id) time.sleep(self.get_sleep_time()) self.monitor = SpiderMonitor(self) self.monitor.start() try: self.restart_app() self.enter_target_page() for idx in range(300): logger.info('========== 第 %s 页 ==========', idx + 1) if spider_no > 30: logger.info('已采集 30 条,休息 120 秒') time.sleep(120) spider_no = 0 if self.monitor.verification_count >= self.monitor.MAX_VERIFICATION_RETRY: logger.warning('验证码重试超限,等待人工处理') self.d.toast('请处理验证码后点击继续', 30) self.monitor.verification_count = 0 continue drug_lis = self.safe_exec( self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all ) lis_len = len(drug_lis) logger.info('当前页面共有 %s 个商品', lis_len) for drug_one in drug_lis: bounds = drug_one.info['bounds'] top, bottom = bounds['top'], bounds['bottom'] if not (304 <= top and bottom <= 1559): continue self.safe_exec(drug_one.click) time.sleep(2) try: self.integrate_data() spider_no += 1 except Exception as e: logger.exception('采集详情异常: %s', e) self.swipe_back(1) continue if self.safe_exec(self.distinct_target): logger.debug('已返回列表页') else: if self.d.xpath('//*[@text="搜索"]').exists: logger.warning('已回到搜索页,重新开始流程') if retry_count < MAX_RETRY: self.monitor.stop() self.monitor.join() return self.main(device_id, retry_count + 1) else: logger.error('超过最大重试次数,终止') return else: logger.error('无法恢复页面,终止') return time.sleep(self.get_sleep_time()) if self.d.xpath('//*[@text="已经到底啦"]').exists: logger.info('已到底') break self.d.drag(300, 1400, 300, 400, 1) time.sleep(self.get_sleep_time()) finally: self.monitor.stop() self.monitor.join()