| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940 |
- import requests
- import base64
- import cv2
- import uiautomator2 as u2
- import time
- import subprocess
- import re
- import random
- import datetime
- import json
- from apscheduler.schedulers.blocking import BlockingScheduler
- from db_mysql import mysqlClient
- from config import Config
- import logging
- # from database import MySQLClient
- # 配置日志
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- def get_access_token():
- AppKey = "tRK2RhyItCSh6BzyT4CNVXQa"
- AppSrcret = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh"
- token_url = 'https://aip.baidubce.com/oauth/2.0/token'
- url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}"
- payload = ""
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- try:
- return response.json()['access_token']
- except:
- return None
-
- def get_mysql():
- """
- 建立并返回一个到数据库的连接对象
- """
- import pymysql
- return pymysql.connect(
- host = Config.DB_HOST, #"localhost", # 修改后的主机
- port = Config.DB_PORT, #3306, # 添加端口号
- user = Config.DB_USER, #'root', # 修改后的用户名
- password = Config.DB_PASSWORD, # 修改后的密码
- db = Config.DB_NAME, #"drug_data", # 修改后的数据库名
- charset='utf8mb4'
- )
- class MT:
- def __init__(self, key):
- # self.package_name = 'com.sankuai.meituan'
- self.package_name = Config.PACKAGE_NAME
- self.access_token = get_access_token()
- self.city2province = self.get_city_info()
- # host = Config.DB_HOST #"localhost"
- # user = Config.DB_USER #"root"
- # password = Config.DB_PASSWORD #"dfwy2025"
- # database = Config.DB_NAME #"drug_data"
- # port = Config.DB_PORT#3306
- # print(f'数据库配置:host:{host},user:{user},password:{password},database:{database},port:{port}')
-
- self.table_name = Config.DB_TABLE #"mt_drug"
- self.shop_table_name = Config.DB_SHOP_TABLE
- print(f'数据库表名:table_name:{self.table_name},shop_table_name:{self.shop_table_name}')
- # self.mysql_client = mysqlClient(host, user, password, database, port)
-
- self.search_key = key # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒
- self.unrelated_data = 0 # 无关数据数量
- def stop_app(self):
- self.d.app_stop(self.package_name)
- time.sleep(5)
- def start_app(self):
- self.d.app_start(self.package_name)
- time.sleep(5)
- def restart_app(self):
- """
- 重启app
- :return:
- """
- self.stop_app()
- self.start_app()
- @staticmethod
- def get_sleep_time():
- return random.randint(5, 8)
- @staticmethod
- def get_current_date():
- return datetime.datetime.now().strftime('%Y/%m/%d')
- @staticmethod
- def get_city_info():
- """
- 获取所有的省市数据
- :return:
- """
- file_path = '../kailin_city.json'
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- province = {province_one["id"]: province_one for province_one in data['province']}
- city2province = dict()
- city = data['city']
- for city_one in city:
- name = city_one['name']
- pid = city_one['pid']
- if len(str(pid)) > 2:
- pid = int(re.match('^\d{2}', str(pid)).group())
- city2province[name] = province[pid]['name']
- return city2province
- def get_shop_name(self):
- """
- 获取店铺名
- :return:
- """
- try:
- shop_name = self.d.xpath(
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到店铺名:{shop_name}')
- return shop_name
- except:
- try:
- shop_name = self.d.xpath(
- '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[last()-1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到店铺名2:{shop_name}')
- return shop_name
- except Exception as e:
- print(f'获取店铺名出错:{e}')
- return None
- def get_qualification_number(self):
- """
- 获取资质编号
- :return:
- """
- try:
- qualification_number_str = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/mil_container"]/android.webkit.WebView[1]/android.webkit.WebView[1]/android.view.View[1]/android.view.View[1]/android.widget.TextView[2]').text
- qualification_number = qualification_number_str.strip('资质编号:').strip()
- return qualification_number
- except:
- return None
-
- def get_shop_address(self):
- try:
- shop_address = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/wm_sc_drug_shop_content_mrn_container_id_2"]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text
- print(f'获取到店铺地址:{shop_address}')
- return shop_address
- except:
- print(f'获取店铺地址出错-get_shop_address')
- return None
- def enter_detail(self):
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/recycler"]/android.widget.FrameLayout[1]').click()
- time.sleep(self.get_sleep_time())
- def save_to_database(self, data):
- print(f'保存数据到数据库:{data}')
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- # add_sql = "insert into delete_friend_table(delete_user_name,delete_user_id,delete_content,delete_time) value(%s,%s,%s,%s)"
- add_sql = f"""
- INSERT INTO {self.table_name}
- (product, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- cur.execute(add_sql, (data['product'], data['min_price'], data['manufacture_date'], data['expiry_date'], data['shop'], data['business_license_company'],data['province'], data['city'], data['manufacturer'], data['specification'], data['approval_number'], data['product_link'], self.get_current_date(), data['scrape_province'], data['availability'], data['credit_code'], data['platform']))
- conn.commit() # 提交数据
- #self.mysql_client.insert(self.table_name, data)
- print(f"存入数据库成功")
- def save_shop_info_to_database(self, data):
- print(f'保存店铺数据到数据库:{data}')
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- add_sql = f"""
- INSERT INTO {self.shop_table_name}
- (shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
- cur.execute(add_sql, (data['shop'], data['contact_address'], data['qualification_number'], data['business_license_company'], data['business_license_address'], data['scrape_date'], data['platform']))
- conn.commit() # 提交数据
- #self.mysql_client.insert(self.shop_table_name, data)
- print(f'存入店铺信息到数据库成功')
- def swipe_up(self):
- """
- 上滑
- :return:
- """
- screen_width = self.d.info['displayWidth']
- screen_height = self.d.info['displayHeight']
- duration_rate = random.uniform(0, 0.3)
- self.d.swipe(screen_width // 2, screen_height - 100, screen_width // 2, 100, duration=duration_rate)
- no = random.uniform(0, 1)
- if no > 0.85:
- # 有的时候卡着 再稍微往上滑一点点
- self.d.swipe_ext("up", 0.1)
- time.sleep(self.get_sleep_time())
- def swipe_back(self, no):
- """
- 返回
- :param no: 回退次数
- :return:
- """
- for idx in range(no):
- self.d.press('back')
- time.sleep(self.get_sleep_time())
- def drug_price(self):
- """
- 获取药品价格
- :return:
- """
- try:
- price_str = self.d.xpath('//*[starts-with(@text,"¥")]').text
- price = float(re.search('[\d\.]+', price_str).group())
- print(f'获取到价格:{price}')
- return price
- except Exception as e:
- print(f'提取价格出错-->{e}')
- return None
- def restart_uiautomator_services(self, device_id):
- """
- 重启atx的uiautomator 服务
- :param device_id:
- :return:
- """
- stop_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d --stop'
- start_uiautomator_services = f'adb -s {device_id} shell /data/local/tmp/atx-agent server -d'
- # result = subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- # print(result.stdout)
- subprocess.run(stop_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- subprocess.run(start_uiautomator_services, capture_output=True, text=True, shell=True)
- time.sleep(self.get_sleep_time())
- def connect_devices(self, device_id):
- """
- 连接设备
- :return:
- """
- try:
- self.d = u2.connect_usb(device_id)
- # 设置隐形等待时间
- # self.d.implicitly_wait(5)
- self.restart_uiautomator_services(device_id)
- print(f'连接到设备:{device_id}')
- except Exception as e:
- print(f'{device_id} 连接错误: {e}')
- raise Exception(e)
- def get_ocr_res(self, img):
- try:
- request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license"
- # 二进制方式打开图片文件
- f = open(img, 'rb')
- img = base64.b64encode(f.read())
- params = {"image": img}
- # access_token = get_access_token()
- request_url = request_url + "?access_token=" + self.access_token
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- response = requests.post(request_url, data=params, headers=headers)
- if response:
- res = response.json()
- new_dic = dict()
- for ite in res['words_result'].keys():
- new_dic[ite] = res['words_result'][ite]['words']
- print('资质数据信息', new_dic)
- return new_dic
- else:
- return None
- except:
- return None
- def screenshot_the_business_license(self, qualification_number):
- screenshot_path = 'screenshot1.png'
- self.d.screenshot(screenshot_path)
- img = cv2.imread(screenshot_path)
- # 指定裁剪区域 (left, top, right, bottom)
- left = 0
- top = 480
- right = 720
- bottom = 1420
- cropped_img = img[top:bottom, left:right]
- if qualification_number:
- cropped_screenshot_path = 'D:\\work\\dfwy_spider\\drug_data\\mt\\screenshot\\' + qualification_number + '.png'
- else:
- cropped_screenshot_path = 'cropped_screenshot.png'
- cv2.imwrite(cropped_screenshot_path, cropped_img)
- def get_title(self):
- # try:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- # except:
- # title = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.TextView').text
-
- # title = self.d.xpath('//*[contains(@text, "舒肝颗粒")]').text
- title = self.d.xpath(f'//*[contains(@text, "{self.search_key}")]').text
- # title = self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]/android.widget.TextView').text
- print(f'获取到药品标题:{title}')
- # 从里面匹配出药品名和规格
- # drugs_name
- # specifications
- # match = re.search(r'([^\d]+)([\d\D]+)', title)
- match = re.search(r'(\[[^\]]+\])(.+?)(\d+.*)', title)
- if match:
- drugs_name = match.group(1).strip() + match.group(2).strip()
- specifications = match.group(3).strip()
- print("药品名:", drugs_name)
- print("规格:", specifications)
- print('完整药名:', drugs_name + specifications)
- return drugs_name, specifications
- else:
- print("没有匹配到预期格式")
- def enter_shop(self):
- """
- 进店,方便提取资质环境
- :return:
- """
- self.d.xpath('//*[@text="进店"]').click()
- time.sleep(self.get_sleep_time())
- def enter_shoper(self):
- """
- 进入商家
- :return:
- """
- self.d.xpath('//*[@text="商家"]').click()
- time.sleep(self.get_sleep_time())
- def scan_shoper_license(self):
- self.d.xpath('//*[@text="查看商家资质"]').click()
- time.sleep(self.get_sleep_time())
- def data_is_exists(self, data):
- """
- 检查指定数据是否已存在于数据库表中(仅检查存在性)
-
- 参数:
- data: 包含查询条件的字典,键为列名,值为条件值
-
- 返回:
- True: 数据存在
- False: 数据不存在
- None: 检查过程中出错
- """
- # dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- # 'platform': '美团'}
-
- # 1. 验证必要字段
- required_keys = ['product', 'min_price', 'shop', 'scrape_date', 'platform']
- if not all(key in data for key in required_keys):
- missing = [key for key in required_keys if key not in data]
- logging.error(f"缺少必要字段: {', '.join(missing)}")
- return None
-
- try:
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- # query_sql = f"SELECT * FROM {self.table_name} WHERE product = '{data['product']}' AND min_price = '{data['min_price']}' AND shop = '{data['shop']}' AND scrape_date = '{data['scrape_date']}' AND platform = '{data['platform']}'"
- # cur.execute(query_sql)
- query_sql = """
- SELECT * FROM {}
- WHERE product = %s
- AND min_price = %s
- AND shop = %s
- AND scrape_date = %s
- AND platform = %s
- """.format(self.table_name)
- cur.execute(query_sql, (
- data['product'],
- data['min_price'],
- data['shop'],
- data['scrape_date'],
- data['platform']
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
-
- # try:
- # columns = data.keys()
- # placeholders = [f"{col} = %({col})s" for col in columns]
- # query = f"SELECT * FROM `{self.table_name}` WHERE {' AND '.join(placeholders)}"
- # cur = self.mysql_client.cur
- # cur.execute(query, data)
- # exists = cur.fetchone()
- # return exists
- # except Exception as e:
- # print(f"MySQL 错误: {str(e)}")
- # return None
- def shop_is_exists_database(self, shop):
- try:
- # 连接数据库
- conn = get_mysql()
- # 创建游标对象
- cur = conn.cursor()
- query_sql = """
- SELECT * FROM {}
- WHERE shop = %s
- """.format(self.shop_table_name)
- cur.execute(query_sql, (
- shop
- ))
- result = cur.fetchone()
- return bool(result) # 如果存在返回True,否则False
- except Exception as e:
- print(f"MySQL 错误: {str(e)}")
-
-
- def get_instructions_data(self):
- """
- 确定有说明书之后,提取所有的说明书数据
- :return:
- """
- self.d.xpath('//*[@text="说明"]').click()
- time.sleep(random.randint(3, 5))
- self.d.xpath('//*[@text="查看详细说明"]').click()
- time.sleep(random.randint(3, 5))
- self.d.xpath('//*[@text="加载更多"]').click_exists()
- loop_page = 5
- new_list = list()
- for i in range(loop_page):
- self.d.xpath('//*[@text="加载更多"]').click_exists()
- time.sleep(1)
- if i == 0:
- self.d.swipe(200, 1000, 200, 300, 0.4)
- else:
- self.d.swipe(200, 1000, 200, 62)
- time.sleep(1)
- if self.d.xpath('//*[@text="加载更多"]').exists:
- self.d.xpath('//*[@text="加载更多"]').click()
- time.sleep(1)
- all_tt = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup').all()
- for idx in range(1, len(all_tt) + 1):
- all_tt1 = self.d.xpath(
- f'//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[{idx}]//android.widget.TextView').all()
- for tt in all_tt1:
- if tt.text:
- new_list.append(tt.text)
- if i == 0:
- height = 938
- else:
- drug_box = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]/android.widget.ScrollView[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]').info
- bounds = drug_box['bounds']
- height = bounds['bottom'] - bounds['top']
- if height < 938:
- # print('说明书翻页到底部')
- break
- # 展开全文
- new_list = [item for item in new_list if item != '展开全文']
- print(f'当前说明书列表数据:{new_list}')
- expiry_date_index = next(idx for idx, i in enumerate(new_list) if i == '有效期')
- manufacturer_index = next(idx for idx, i in enumerate(new_list) if i == '生产单位')
- approval_number_index = next(idx for idx, i in enumerate(new_list) if i == '批准文号')
- res_data = {
- "有效期": new_list[expiry_date_index + 1],
- "生产单位": new_list[manufacturer_index + 1],
- "批准文号": new_list[approval_number_index + 1]
- }
- print(f'当前说明书字典数据:{res_data}')
- return res_data
- def has_instructions(self):
- """
- 是否有说明书
- :return:
- """
- # 没有说明书的无法采集具体数据
- time.sleep(self.get_sleep_time())
- is_has_instructions = self.d.xpath('//*[@text="说明"]').exists
- return is_has_instructions
- def has_shop(self):
- """
- 是否有进店按钮
- :return:
- """
- # self.d.swipe_ext('up', 0.1)
- time.sleep(self.get_sleep_time())
- is_has_enter_shop = self.d.xpath('//*[@text="进店"]').exists
- return is_has_enter_shop
-
- def get_license_info_ex(self):
- self.enter_shop()
- self.enter_shoper()
- #获取地址
- contact_address = self.get_shop_address()
- # time.sleep(50000)
- ###
- self.scan_shoper_license()
- # 获取资质编码
- qualification_number = self.get_qualification_number()
- #营业执照公司名称
- business_license_company = ''
- #营业执照地址
- business_license_address = ''
- self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license(qualification_number)
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- print(f'ocr_res:{ocr_res}')
- #获取ocr_res 中的地址、单位名称
- if ocr_res:
- if '单位名称' in ocr_res.keys():
- business_license_company = ocr_res['单位名称']
-
- if '地址' in ocr_res.keys():
- business_license_address = ocr_res['地址']
-
- license_info_data = {'contact_address': contact_address, 'qualification_number': qualification_number, 'business_license_company': business_license_company, 'business_license_address': business_license_address}
- return license_info_data
- def get_license_info(self):
- self.enter_shop()
- self.enter_shoper()
- self.scan_shoper_license()
- # 获取资质编码
- qualification_number = self.get_qualification_number()
- if qualification_number:
- table_license_info = self.get_table_license_info(qualification_number)
- if table_license_info:
- return {
- '单位名称': table_license_info[0],
- '地址': table_license_info[1],
- '社会信用代码': table_license_info[2]
- }
- else:
- # operate_no = random.randint(0, 1)
- self.d.click(0.603, 0.27)
- # if operate_no == 0:
- # self.d.xpath('//*[@text="营业执照"]').click()
- # else:
- # self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license()
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- return ocr_res
- # operate_no = random.randint(0, 1)
- self.d.click(0.603, 0.27)
- # if operate_no == 0:
- # self.d.xpath('//*[@text="营业执照"]').click()
- # else:
- # self.d.click(0.603, 0.27)
- time.sleep(self.get_sleep_time())
- self.screenshot_the_business_license()
- ocr_res = self.get_ocr_res('cropped_screenshot.png')
- return ocr_res
- def distinct_target(self):
- is_position = self.d.xpath(
- '//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[3]').exists
- return is_position
- def enter_target_page(self):
- self.d.xpath('//*[@content-desc="看病买药"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/vf_search_carousel_text"]').click()
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@resource-id="com.sankuai.meituan:id/container"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[3]/android.widget.FrameLayout[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[2]/android.view.ViewGroup[1]').click()
- time.sleep(self.get_sleep_time())
- self.d.send_keys(self.search_key, clear=True)
- time.sleep(self.get_sleep_time())
- self.d.xpath('//*[@text="搜索"]').click()
- time.sleep(self.get_sleep_time())
- def get_table_license_info(self, qualification_number):
- try:
- sql = f'select business_license_company,city,credit_code from mt_drug where credit_code = "{qualification_number}"'
- self.mysql_client.cur.execute(sql)
- res = self.mysql_client.cur.fetchone()
- return res
- except:
- return None
- def integrate_data(self):
- """
- 整合数据
- :return:
- """
- title_info = self.get_title() # 药品,规格
- if title_info:
- product, specifications = title_info
- if self.search_key not in product.replace(' ', ''):
- self.swipe_back(1)
- self.unrelated_data += 1
- return
- else:
- self.swipe_back(1)
- return
- min_price = self.drug_price() # 最低价格
- #判断是否有自营的文本,有的话不需要获取店铺的信息
- if self.d.xpath('//*[@text="自营"]').exists:
- shop = "美团自营大药房(快递电商)"
- # 爬取日期
- scrape_date = self.get_current_date()
- dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- 'platform': '美团'}
- print(f'当前数据:{dup_data}')
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.swipe_back(1)
- return
- else:
- for i in range(3):
- if self.d.xpath('//*[@text="进店"]').exists:
- print('开始获取店铺名1')
- break
- self.d.swipe_ext('up', 0.2)
- time.sleep(1)
- # detail_info = self.d.xpath(
- # '//android.widget.ScrollView/android.view.ViewGroup[1]/android.view.ViewGroup[1]/android.view.ViewGroup[6]').info
- # bounds = detail_info['bounds']
- # height = bounds['bottom'] - bounds['top']
- # if self.d.xpath('//*[@text="进店"]').exists and height > 100:
- if self.d.xpath('//*[@text="进店"]').exists:
- print('开始获取店铺名2')
- break
- shop = self.get_shop_name()
- # 爬取日期
- scrape_date = self.get_current_date()
- dup_data = {'product': product, 'min_price': min_price, 'shop': shop, 'scrape_date': scrape_date,
- 'platform': '美团'}
- print(f'当前数据:{dup_data}')
- #获取店铺信息开始
-
- is_has_enter_shop = self.has_shop()
- #需要判断shop是否已经在数据库中存在,如果存在,则不再进入店铺,直接进入下一个商品
- shop_is_exists = self.shop_is_exists_database(shop)
-
- #存在进店 并且店铺的名称不包含美团官方的字样
- if is_has_enter_shop and '美团官方' not in shop and not shop_is_exists:
- license_info = self.get_license_info_ex()
- contact_address = license_info['contact_address']
- qualification_number = license_info['qualification_number']
- business_license_company = license_info['business_license_company']
- business_license_address = license_info['business_license_address']
- save_shop_data = {
- 'shop': shop,
- 'contact_address': contact_address,
- 'qualification_number': qualification_number,
- 'scrape_date': scrape_date,
- 'business_license_company':business_license_company,
- 'business_license_address':business_license_address,
- 'platform': '美团'
- }
- self.save_shop_info_to_database(save_shop_data)
-
- self.swipe_back(2)
-
- #获取店铺信息结束
- if self.data_is_exists(dup_data):
- print('存在相同数据不入库')
- self.swipe_back(1)
- return
-
- if not shop:
- print('未获取到店铺名:开始回退')
- self.swipe_back(1)
- return
- if not shop or '自营' in shop:
- self.swipe_back(1)
- return
- time.sleep(self.get_sleep_time())
-
-
-
- # 生产日期为空
- manufacture_date = ''
- # 执政信息
- # if is_has_enter_shop:
- # license_info = self.get_license_info()
- # business_license_company = license_info["单位名称"]
- # credit_code = license_info['社会信用代码']
- # city_str = license_info['地址']
- # # 先把省份啥的替换掉
- # city_sub_str = re.sub(r'[u4e00-\u9fa5]+省', '', city_str)
- # try:
- # city = re.search(r'[\u4e00-\u9fa5]+?(市|区|县)', city_sub_str).group(0)
- # except:
- # city = city_sub_str
- # try:
- # province = self.city2province[city]
- # except:
- # province = ''
- # self.swipe_back(2)
- # else:
- # business_license_company = ''
- # credit_code = ''
- # city = ''
- # province = ''
- business_license_company = ''
- credit_code = ''
- city = ''
- province = ''
- expiry_date = ''
- manufacturer = ''
- approval_number = ''
- #是否存在说明书
- is_has_instructions = self.has_instructions()
- # 说明书等信息
- if is_has_instructions:
- print('开始获取说明书信息')
- instructions_info = self.get_instructions_data()
- expiry_date = instructions_info['有效期'].strip('。')
- manufacturer = instructions_info['生产单位'].strip('。')
- approval_number = instructions_info['批准文号'].strip('。')
- else:
- # 没有说明书不入库
- self.swipe_back(1)
- return
- self.unrelated_data = 0
- # 商品链接
- product_link = ''
- # 爬取省份
- scrape_province = '广东' # 这里先默认广东
- # 是否有货
- availability = ''
- save_data = {
- 'product': product,
- 'min_price': min_price,
- 'manufacture_date': manufacture_date,
- 'expiry_date': expiry_date,
- 'shop': shop,
- 'business_license_company': business_license_company,
- 'province': province,
- 'city': city,
- 'manufacturer': manufacturer,
- 'specification': specifications,
- 'approval_number': approval_number,
- 'product_link': product_link,
- 'scrape_date': scrape_date,
- 'scrape_province': scrape_province,
- 'availability': availability,
- 'credit_code': credit_code,
- 'platform': '美团'
- }
- self.save_to_database(save_data)
- # time.sleep(100000)
- if self.distinct_target():
- print('已到达搜索列表页')
- else:
- for i in range(1):
- self.swipe_back(1)
- # 最外部有个定位按钮
- if self.distinct_target():
- break
- def main(self, device_id):
- spider_no = 0
- self.connect_devices(device_id)
- time.sleep(self.get_sleep_time())
- # 重新开启美团应用
- self.restart_app()
- # 搜索关键字
- self.enter_target_page()
- for idx in range(300):
- print(f'第{idx + 1}页')
- if spider_no > 30:
- time.sleep(120)
- spider_no = 0
- print('目前无关数据量: ', self.unrelated_data)
- # if self.unrelated_data > 10:
- # # 连续超过5个不达标的数据则停止采集
- # break
- drug_lis = self.d.xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout').all()
- lis_len = len(drug_lis)
- print(f'当前页面共有{lis_len}个商品')
- for drug_one in drug_lis:
- bounds = drug_one.info['bounds']
- top = bounds['top']
- bottom = bounds['bottom']
- # height = bottom - top
- print(f'当前商品高度{bottom - top}')
- if 304 <= top and bottom <= 1475: # 默认高度241的才行
- # print('目标-->', drug_one.info)
- drug_one.click()
- # print('点击目标药品完毕')
- time.sleep(2)
- # 采集药品信息
- try:
- self.integrate_data()
- # 检测下是否回退到列表页
- if self.distinct_target():
- print('回退到列表页', True)
- else:
- print('回退到列表页失败,终止采集')
- return
- time.sleep(self.get_sleep_time())
- spider_no += 1
- except Exception as e:
- print(f'采集药品详情数据出错:{e}')
- if not self.distinct_target():
- for i in range(1):
- self.swipe_back(1)
- # 最外部有个定位按钮
- if self.distinct_target():
- break
- if i == 0 and not self.distinct_target():
- print('页面出错,退出采集')
- return
- else:
- continue
- if self.d.xpath('//*[@text="已经到底啦"]').exists:
- print('已经到达列表页最底部')
- return
- search_list = self.d.xpath('//android.support.v7.widget.RecyclerView').info
- bounds = search_list['bounds']
- #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'])
- # 计算滑动距离
- scroll_distance = bounds['bottom'] - bounds['top'] # 正数
- start_y = 1400
- end_y = start_y - scroll_distance # 向上滑动,y 坐标减小
- # 确保 end_y 不小于 0
- end_y = max(end_y, 100) # 留出一点边距,避免滑出屏幕
- print('滑动起点 y:', start_y, '终点 y:', end_y)
- self.d.swipe(200, start_y, 200, end_y, 0.4)
- #print('搜索列表高度', 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'])
- # self.d.swipe(200, 1400, 200, 1400 + bounds['top'] - bounds['bottom'], 0.4)
- time.sleep(self.get_sleep_time())
- def unitest(self):
- """
- 单元测试
- :return:
- """
- save_data = {
- 'product':"[昆中药]舒肝颗粒(低糖型)",
- 'min_price': 14.0,
- 'manufacture_date': '',
- 'expiry_date': '36个月',
- 'shop': '美团自营大药房(快递电商)',
- 'business_license_company': '',
- 'province': '',
- 'city': '',
- 'manufacturer': '昆明中药厂有限公司',
- 'specification': '3g*16袋/盒',
- 'approval_number': '国药准字Z53021161',
- 'product_link': '',
- 'scrape_date': '2025/07/09',
- 'scrape_province': '广东',
- 'availability': '',
- 'credit_code': '',
- 'platform': '美团'
- }
- self.save_to_database(save_data)
-
- time.sleep(100000)
- pass
- def main():
- mt = MT('舒肝颗粒') # 参苓健脾胃颗粒 舒肝颗粒 清肺化痰丸 香砂平胃颗粒
- # mt.main('95b2c764')
- mt.main('fcb3c749')
- if __name__ == '__main__':
- main()
- # scheduler = BlockingScheduler()
- # scheduler.add_job(main, 'cron', hour=21, minute=30, misfire_grace_time=120)
- # try:
- # scheduler.start()
- # except (KeyboardInterrupt, SystemExit):
- # pass
|