| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477 |
- import os
- import json
- from dotenv import load_dotenv
- import pymysql
- from typing import List, Dict, Optional
- import time
- from playwright.sync_api import (
- sync_playwright,
- TimeoutError as PlaywrightTimeoutError,
- BrowserContext
- )
- import requests
- from urllib.parse import urlparse
- # 补充百度OCR所需依赖
- import base64
- from PIL import Image
- import io
- import asyncio
- # 加载环境变量
- load_dotenv()
- # ===================== 全局常量配置(集中管理,方便修改)=====================
- # 数据库默认配置
- DEFAULT_DB_CONFIG = {
- "host": "localhost",
- "port": 3306,
- "user": "root",
- "password": "",
- "db_name": "",
- "table_name": ""
- }
- # Playwright配置
- PLAYWRIGHT_CONFIG = {
- "headless": False,
- "slow_mo": 300,
- "browser_args": [
- "--start-maximized",
- "--disable-blink-features=AutomationControlled", # 核心防检测
- "--no-sandbox", # 兼容Windows/Linux
- "--disable-dev-shm-usage", # 解决内存不足
- "--disable-popup-blocking", # 禁用弹窗拦截
- "--disable-extensions", # 禁用扩展
- "--disable-gpu", # 禁用GPU加速
- "--lang=zh-CN,zh", # 中文语言
- "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
- ],
- "viewport": {"width": 2050, "height": 1200},
- "locale": "zh-CN",
- "timezone_id": "Asia/Shanghai",
- "default_timeout": 15000,
- "navigation_timeout": 30000,
- "login_state_path": "pdd_login_state.json", # 登录状态持久化文件
- "tianyancha_login_state": "tianyancha_login_state.json" # 天眼查登录状态保存路径
- }
- # 百度OCR配置
- BAIDU_OCR_CONFIG = {
- "api_key": os.getenv('APP_KEY'),
- "secret_key": os.getenv('APP_SECRET'),
- "scale": 1.5 # OCR图片放大倍数
- }
- # 图片保存配置
- IMAGE_CONFIG = {
- "save_dir": "pdd_goods_images", # 独立文件夹名(项目根目录下)
- "timeout": 10, # 图片下载超时时间(秒)
- "retry": 1 # 下载失败重试次数
- }
- # 1. 日期变量赋值SQL
- SET_DATE_SQL = "SET @date_constant = CURDATE();"
- # 2. 核心查询SQL
- QUERY_SQL = """
- SELECT
- product,
- shop,
- product_link,
- scrape_date,
- business_license_company,
- search_key
- FROM (
- SELECT
- pd.product,
- pd.shop,
- pd.product_link,
- pd.scrape_date,
- psi.business_license_company,
- pd.search_key,
- ROW_NUMBER() OVER (PARTITION BY pd.shop ORDER BY pd.search_key ASC) AS rn
- FROM pdd_drug_middle pd
- LEFT JOIN pdd_shop_info_middle psi ON psi.shop = pd.shop
- WHERE pd.scrape_date >= @date_constant
- AND psi.business_license_company IS NULL
- ) AS sub
- WHERE rn = 1
- ORDER BY search_key;
- """
- # ===================== 百度OCR类(完整整合)=====================
- class BaiduOCR:
- """百度 OCR 文字识别封装类"""
- def __init__(self, api_key: str, secret_key: str):
- """
- 初始化百度 OCR
- :param api_key: 百度智能云应用的 API Key
- :param secret_key: 百度智能云应用的 Secret Key
- """
- self.api_key = api_key
- self.secret_key = secret_key
- self.access_token: Optional[str] = None
- # 获取 access_token(有效期30天,建议缓存)
- self._get_access_token()
- def _get_access_token(self) -> bool:
- """
- 获取百度 OCR 的 access_token(有效期30天)
- :return: 是否获取成功
- """
- url = "https://aip.baidubce.com/oauth/2.0/token"
- params = {
- "grant_type": "client_credentials",
- "client_id": self.api_key,
- "client_secret": self.secret_key
- }
- try:
- response = requests.post(url, params=params, timeout=10)
- response.raise_for_status()
- result = response.json()
- if "access_token" in result:
- self.access_token = result["access_token"]
- print(f"✅ 成功获取 access_token:{self.access_token[:20]}...")
- return True
- else:
- print(f"❌ 获取 access_token 失败:{result}")
- return False
- except Exception as e:
- print(f"❌ 获取 access_token 异常:{e}")
- return False
- def _enlarge_and_crop_image(self, image_path: str, scale: float=1.5, crop_ratio: float=0.5) -> bytes:
- """
- 先裁剪图片上半部分(保留有效内容),再放大图片(解决OCR尺寸错误)
- :param image_path: 原图路径
- :param scale: 放大倍数(推荐1.5~3.0)
- :param crop_ratio: 裁剪比例(0.5=保留上50%,0.6=保留上60%,可根据图片调整)
- :return: 处理后的图片二进制数据
- """
- try:
- with Image.open(image_path) as img:
- # ========== 步骤1:裁剪上半部分(核心修复) ==========
- # 计算裁剪区域:左=0,上=0,右=原图宽度,下=原图高度×裁剪比例
- crop_box = (
- 0, # 左边界
- 0, # 上边界
- img.width, # 右边界
- int(img.height * crop_ratio) # 下边界(只保留上半部分)
- )
- img_cropped = img.crop(crop_box) # 执行裁剪
- print(f"✅ 图片裁剪完成:保留上{int(crop_ratio * 100)}%区域,尺寸={img_cropped.size}")
- # ========== 步骤2:放大裁剪后的图片 ==========
- new_width = int(img_cropped.width * scale)
- new_height = int(img_cropped.height * scale)
- # 高质量放大(Lanczos算法,最清晰)
- img_resized = img_cropped.resize(
- (new_width, new_height),
- Image.Resampling.LANCZOS
- )
- # ========== 仅5行,强制缩到4096×4096以内(核心微调) ==========
- MAX_OCR_SIZE = 4096 # 百度OCR最大允许宽度/高度
- if img_resized.width > MAX_OCR_SIZE:
- ratio = MAX_OCR_SIZE / img_resized.width # 计算缩放比例
- img_resized = img_resized.resize(
- (MAX_OCR_SIZE, int(img_resized.height * ratio)),
- Image.Resampling.LANCZOS
- )
- if img_resized.mode == 'RGBA':
- # 创建白色背景的RGB画布
- rgb_img = Image.new('RGB', img_resized.size, (255, 255, 255))
- # 将RGBA图片粘贴到RGB画布(透明区域显示白色)
- rgb_img.paste(img_resized, mask=img_resized.split()[3]) # mask=alpha通道
- img_resized = rgb_img
- # # ========== 保存处理后图片到本地 ==========
- # # 1. 确保pdd_goods_images文件夹存在(不存在则创建)
- # save_dir = "pdd_goods_images"
- # if not os.path.exists(save_dir):
- # os.makedirs(save_dir)
- # # 2. 提取原图片文件名(比如从image_path中拿到"鸿祥堂大药房旗舰店_1773649991220.jpeg")
- # file_name = os.path.basename(image_path)
- # # 3. 拼接保存路径
- # save_path = os.path.join(save_dir, file_name)
- # # 4. 保存图片到本地(质量和OCR用的一致)
- # img_resized.save(save_path, format='JPEG', quality=95)
- # print(f"✅ 处理后图片已保存到:{save_path}")
- # # ======================================================
- # 保存到内存(不生成本地文件)
- img_byte_arr = io.BytesIO()
- # 保存为 JPG,保证清晰度
- img_resized.save(img_byte_arr, format='JPEG', quality=95)
- img_byte_arr = img_byte_arr.getvalue()
- # 校验文件大小(超4MB则再次压缩)
- file_size = len(img_byte_arr) / 1024 / 1024 # 转MB
- if file_size > 4:
- print(f"⚠️ 文件超4MB({file_size:.2f}MB),二次压缩...")
- img_byte_arr = io.BytesIO()
- img_resized.save(img_byte_arr, format='JPEG', quality=70, optimize=True)
- img_byte_arr = img_byte_arr.getvalue()
- # 打印最终尺寸(方便调试)
- print(f"✅ 图片放大完成:最终尺寸={img_resized.size}")
- return img_byte_arr
- except Exception as e:
- print(f"❌ 图片裁剪/放大失败:{str(e)}")
- return b''
- def general_ocr(self, image_path: str, scale: float = 1.5) -> Optional[Dict]:
- """
- 调用百度通用文字识别(支持图片放大)
- :param image_path: 本地图片路径
- :param scale: 放大倍数,默认2倍
- :return: OCR识别结果
- """
- if not self.access_token:
- print("❌ access_token 无效,请先初始化")
- return None
- try:
- """
- 百度OCR通用识别(整合裁剪+放大)
- """
- # 替换原放大逻辑为「裁剪+放大」
- image_data = self._enlarge_and_crop_image(image_path, scale=scale, crop_ratio=0.5)
- if not image_data:
- print("❌ 图片处理失败,无法识别")
- return {}
- image_base64 = base64.b64encode(image_data).decode("utf-8")
- except Exception as e:
- print(f"❌ 图片放大/读取失败:{e}")
- return None
- # 调用 OCR 接口
- url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={self.access_token}"
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- data = {"image": image_base64}
- try:
- response = requests.post(url, headers=headers, data=data, timeout=10)
- response.raise_for_status()
- result = response.json()
- if "words_result" in result:
- print(f"✅ 识别成功,共识别到 {len(result['words_result'])} 行文字")
- return result
- else:
- print(f"❌ 识别失败:{result}")
- return None
- except Exception as e:
- print(f"❌ 调用 OCR 接口异常:{e}")
- return None
- def extract_enterprise_info(self, ocr_result: Dict) -> Dict:
- """
- 从OCR识别结果中提取企业名称和社会信用代码
- :param ocr_result: general_ocr 返回的识别结果字典
- :return: 包含企业名称和社会信用代码的JSON格式字典
- 格式:{"enterprise_name": "企业名称", "credit_code": "社会信用代码"}
- """
- # 初始化返回结果(默认空值)
- enterprise_info = {
- "enterprise_name": "",
- "credit_code": ""
- }
- if not ocr_result or "words_result" not in ocr_result:
- print("❌ OCR识别结果为空,无法提取企业信息")
- return enterprise_info
- all_text_lines = []
- # 遍历所有识别的文字行,匹配关键词
- for item in ocr_result["words_result"]:
- line_text = item["words"].strip() # 去除首尾空格
- if line_text and line_text not in all_text_lines: # 去空+去重
- all_text_lines.append(line_text)
- print(f"📝 OCR识别的有效行:{all_text_lines}")
- # ==================== 1. 提取并清洗企业名称 ====================
- enterprise_name = ""
- # 名称匹配关键词(覆盖所有场景)
- name_keywords = ["企业名称", "名称:", "名:", "称:"]
- # 常见错别字修正映射
- name_correction = {
- "人药房": "大药房",
- "有松司": "有限公司",
- "松司": "公司",
- "关药房": "大药房"
- }
- for idx, line_text in enumerate(all_text_lines):
- # 场景1:包含"企业名称"(处理括号冗余,如"企业名称(名称xxx")
- if "企业名称" in line_text:
- # 移除所有括号及内部内容,再提取名称
- import re
- # 正则移除括号(()/())及内容
- clean_line = re.sub(r'\([^)]*\)|\([^)]*\)', '', line_text)
- # 提取"企业名称"后的所有内容
- name_part = clean_line.split("企业名称")[-1].strip()
- # 若还有"名称"前缀,继续拆分
- if "名称" in name_part:
- name_part = name_part.split("名称")[-1].strip()
- enterprise_name = name_part
- break
- # 场景2:单行包含"名称:"/"名:"/"称:"
- elif any(key in line_text for key in ["名称:", "名:", "称:"]):
- name_part = line_text.split(":")[-1].strip() if ":" in line_text else line_text.split(":")[-1].strip()
- enterprise_name = name_part
- break
- # 场景3:跨行拆分(前一行是"名",当前行以"称:"开头)
- elif idx > 0 and all_text_lines[idx - 1] == "名" and line_text.startswith("称:"):
- name_part = line_text.split(":")[-1].strip()
- enterprise_name = name_part
- break
- # 清洗企业名称:修正错别字、移除多余空格
- if enterprise_name:
- for wrong, right in name_correction.items():
- enterprise_name = enterprise_name.replace(wrong, right)
- # 步骤2:移除开头/结尾的冒号(中文+英文)、空格、特殊符号
- enterprise_name = enterprise_name.strip(":: \t\n\r")
- # 步骤3:移除中间多余空格
- enterprise_name = enterprise_name.replace(" ", "") # 移除所有空格
- # ==================== 2. 提取并清洗社会信用代码 ====================
- credit_code = ""
- # 信用代码匹配关键词(兼容错别字+多格式)
- code_keywords = ["社会信用代码:", "统一社会信用代码:", "社会震用代码:"]
- for line_text in all_text_lines:
- # 匹配任意关键词
- for keyword in code_keywords:
- if keyword in line_text:
- code_part = line_text.split(keyword)[-1].strip()
- credit_code = code_part.replace(" ", "") # 移除空格(如"91360105 MAEGBDKMXF")
- break
- if credit_code: # 找到后跳出循环
- break
- # ==================== 赋值并打印结果 ====================
- enterprise_info["enterprise_name"] = enterprise_name
- enterprise_info["credit_code"] = credit_code
- # enterprise_info["address"] = address
- # 打印提取结果
- if enterprise_name:
- print(f"✅ 提取到企业名称:{enterprise_name}")
- else:
- print("⚠️ 未识别到企业名称字段")
- if credit_code:
- print(f"✅ 提取到社会信用代码:{credit_code}")
- else:
- print("⚠️ 未识别到社会信用代码字段")
- return enterprise_info
- # ===================== 数据库读取类 =====================
- class DBGoodsReader:
- """数据库商品链接读取器"""
- def __init__(
- self,
- host: str = DEFAULT_DB_CONFIG["host"],
- port: int = DEFAULT_DB_CONFIG["port"],
- user: str = DEFAULT_DB_CONFIG["user"],
- password: str = DEFAULT_DB_CONFIG["password"],
- db_name: str = DEFAULT_DB_CONFIG["db_name"],
- charset: str = "utf8mb4"
- ):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.db_name = db_name
- self.charset = charset
- self.conn: Optional[pymysql.connections.Connection] = None
- self.cursor: Optional[pymysql.cursors.DictCursor] = None
- def connect_db(self) -> bool:
- """连接数据库(带重试机制)"""
- max_retry = 2
- for retry in range(max_retry + 1):
- try:
- self.conn = pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.db_name,
- charset=self.charset,
- cursorclass=pymysql.cursors.DictCursor,
- connect_timeout=10 # 连接超时
- )
- self.cursor = self.conn.cursor()
- print(f"✅ 成功连接数据库:{self.db_name}")
- return True
- except pymysql.MySQLError as e:
- if retry < max_retry:
- print(f"❌ 数据库连接失败(重试{retry + 1}/{max_retry}):{e}")
- time.sleep(1)
- continue
- print(f"❌ 数据库连接最终失败:{e}")
- return False
- def get_shop_and_goods(self) -> List[Dict]:
- """
- 读取待补充企业信息的店铺数据(每个店铺仅取1条)
- 返回:包含product/shop/product_link等字段的字典列表
- """
- if not self.conn or not self.cursor:
- print("❌ 未连接数据库,请先调用 connect_db()")
- return []
- try:
- # 步骤1:执行日期变量赋值
- self.cursor.execute(SET_DATE_SQL)
- # 步骤2:执行核心查询
- self.cursor.execute(QUERY_SQL)
- # 步骤3:获取结果(DictCursor返回字典格式,字段名对应SQL列名)
- results = self.cursor.fetchall()
- print(f"✅ 成功读取 {len(results)} 条待补充企业信息的店铺数据")
- return results
- except pymysql.MySQLError as e:
- print(f"❌ 读取数据失败:{e}")
- return []
- def _get_next_id(self) -> int:
- """获取表中最大ID并+1,用于生成新插入数据的ID(若ID非自增)"""
- try:
- sql = "SELECT IFNULL(MAX(id), 0) + 1 AS next_id FROM pdd_shop_info_middle"
- self.cursor.execute(sql)
- result = self.cursor.fetchone()
- next_id = result.get("next_id", 9078) # 默认初始值9078(兼容示例)
- print(f"✅ 获取到下一个可用ID:{next_id}")
- return next_id
- except pymysql.MySQLError as e:
- print(f"❌ 获取自增ID失败,使用默认值9078:{e}")
- return 9078
- def insert_enterprise_info(self, shop_name: str, enterprise_info: Dict) -> bool:
- """
- 向pdd_shop_info_middle表插入企业信息(替代原更新逻辑)
- :param shop_name: 店铺名称(关联表的shop字段)
- :param enterprise_info: 包含tyc_company_name/tyc_company_code/tyc_company_address的字典
- :return: 插入是否成功
- """
- if not self.conn or not self.cursor:
- print("❌ 未连接数据库,请先调用 connect_db()")
- return False
- if not shop_name:
- print("❌ 店铺名称为空,无法更新")
- return False
- business_company_name = enterprise_info.get("tyc_company_name", "").strip()
- qualification_number = enterprise_info.get("tyc_company_code", "").strip()
- contact_address = enterprise_info.get("tyc_company_address", "").strip()
- business_license_address = contact_address # 两个地址字段都用同一个值
- # 空值校验提示
- empty_fields = []
- if not business_company_name:
- empty_fields.append("企业名称")
- if not qualification_number:
- empty_fields.append("统一信用代码")
- if not contact_address:
- empty_fields.append("企业地址")
- if empty_fields:
- print(f"⚠️ 店铺[{shop_name}]以下字段为空:{','.join(empty_fields)},仍继续插入(空值)")
- # 生成插入ID(若表ID为自增主键,可删除ID相关逻辑,SQL中也去掉id字段)
- next_id = self._get_next_id()
- insert_sql = """
- INSERT INTO `test2`.`pdd_shop_info_middle` (
- `id`,
- `shop`,
- `contact_address`,
- `qualification_number`,
- `business_license_company`,
- `business_license_address`,
- `scrape_date`,
- `platform`,
- `province`,
- `city`,
- `create_time`,
- `update_time`
- ) VALUES (%s, %s, %s, %s, %s, %s, CURDATE(), '拼多多', '', '', NOW(), NOW())
- """
- # 组装插入参数
- insert_params = [
- next_id,
- shop_name,
- contact_address,
- qualification_number,
- business_company_name,
- business_license_address
- ]
- try:
- # 执行插入
- self.cursor.execute(insert_sql, insert_params)
- self.conn.commit()
- # 检查影响行数
- affected_rows = self.cursor.rowcount
- if affected_rows > 0:
- print(f"✅ 店铺[{shop_name}]成功插入1条数据(ID:{next_id})")
- print(f" 插入内容:企业名称={business_company_name} | 信用代码={qualification_number} | 地址={contact_address}")
- return True
- else:
- print(f"⚠️ 店铺[{shop_name}]插入0行数据,无数据变更")
- return False
- except pymysql.MySQLError as e:
- print(f"❌ 店铺[{shop_name}]插入失败:{e}")
- self.conn.rollback() # 回滚事务
- return False
- except Exception as e:
- print(f"❌ 店铺[{shop_name}]插入异常:{e}")
- self.conn.rollback()
- return False
- def close_db(self) -> None:
- """安全关闭数据库连接"""
- if self.cursor:
- try:
- self.cursor.close()
- except Exception:
- pass
- if self.conn:
- try:
- self.conn.close()
- print("✅ 数据库连接已关闭")
- except Exception:
- pass
- # ===================== 天眼查浏览器类 =====================
- class TianyanchaBrowser:
- """天眼查浏览器:自动打开、登录、搜索企业名"""
- def __init__(self):
- self.pw = None
- self.browser = None
- self.context: Optional[BrowserContext] = None
- self.page = None
- self.login_state_path = PLAYWRIGHT_CONFIG["tianyancha_login_state"]
- def check_scan_login_prompt(self):
- """
- 检查是否出现「扫码登录」提示,若出现则暂停并提示手动扫码
- """
- try:
- # 定位扫码登录提示文本(结合父div,避免误匹配其他页面文本)
- scan_prompt_locator = self.page.locator(
- "div.scan-title",
- has_text="扫码登录 更快 更安全"
- )
- scan_prompt_locator.wait_for(
- state="visible",
- timeout=10000 # 超时10秒,可根据网络调整
- )
- # 提示出现,暂停脚本让你手动扫码
- print("⚠️ 检测到天眼查扫码登录提示!")
- input("请打开天眼查APP扫码完成登录后,按回车键继续执行脚本...")
- except PlaywrightTimeoutError:
- # 超时未出现,说明无需扫码,直接继续
- print("✅ 未检测到扫码登录提示,跳过扫码步骤")
- def _load_login_state(self) -> Optional[Dict]:
- """加载本地登录状态"""
- if os.path.exists(self.login_state_path):
- try:
- with open(self.login_state_path, "r", encoding="utf-8") as f:
- return json.load(f)
- except json.JSONDecodeError:
- print(f"⚠️ 天眼查登录状态文件损坏,将重新登录")
- os.remove(self.login_state_path)
- return None
- def _save_login_state(self) -> None:
- """保存登录状态"""
- if self.context:
- try:
- self.context.storage_state(path=self.login_state_path) # 同步保存
- print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}")
- except Exception as e:
- print(f"⚠️ 天眼查登录状态保存失败:{e}")
- def init_browser(self, pw) -> bool: # ✅ 保持async
- """初始化天眼查浏览器"""
- try:
- self.pw = pw # ✅ 核心修改:直接使用传进来的 playwright 引擎,不再自己 start()
- # 启动防检测浏览器
- self.browser = self.pw.chromium.launch(
- headless=PLAYWRIGHT_CONFIG["headless"],
- slow_mo=PLAYWRIGHT_CONFIG["slow_mo"],
- args=PLAYWRIGHT_CONFIG["browser_args"],
- ignore_default_args=["--enable-automation"],
- timeout=60000
- )
- # 加载登录状态或手动登录
- login_state = self._load_login_state()
- if login_state:
- self.context = self.browser.new_context(
- viewport=None, # ✅ 设为None,适配最大化窗口
- locale=PLAYWRIGHT_CONFIG["locale"],
- timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
- ignore_https_errors=True,
- storage_state=login_state # ✅ 加载已保存的登录状态
- )
- print("✅ 已加载天眼查本地登录状态")
- else: # ✅ 无登录状态:提示手动登录
- self.context = self.browser.new_context(
- viewport=None,
- locale=PLAYWRIGHT_CONFIG["locale"],
- timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
- ignore_https_errors=True,
- )
- # 初始化页面
- self.page = self.context.new_page()
- # self.page.window_maximize() # ✅ 强制窗口最大化(兜底)
- self.page.set_default_timeout(PLAYWRIGHT_CONFIG['default_timeout'])
- self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG['navigation_timeout'])
- return True
- except Exception as e:
- print(f"❌ 天眼查浏览器初始化失败:{e}")
- self.close() # ✅ await关闭
- return False
- def search_enterprise(self, enterprise_name: str) -> bool:
- """
- 打开天眼查并搜索指定企业名
- :param enterprise_name: 从OCR提取的企业名称
- :return: 搜索是否成功
- """
- if not self.page:
- print("❌ 天眼查浏览器未初始化!")
- return False
- if not enterprise_name or enterprise_name.strip() == '':
- print("❌ 企业名称为空!无法搜索")
- return False
- try:
- #设置一个计数器,去往官网只运行一次
- # 1. 打开天眼查首页(替换你指定的链接)
- print(f"\n📌 打开天眼查:https://www.tianyancha.com/")
- self.page.goto(
- "https://www.tianyancha.com/",
- wait_until="networkidle",
- timeout=30000
- )
- # input("天眼查登录")
- # 2. 定位天眼查搜索框(适配最新页面结构)
- # 搜索框selector:优先用placeholder匹配,兼容不同版本
- # 先检查是否需要扫码登录
- # self.check_scan_login_prompt()
- # ========== 核心:自动检测并处理登录 ==========
- # 定位「登录/注册」按钮(完全匹配你提供的HTML结构)
- login_button = self.page.locator(
- "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn",
- has_text="登录/注册"
- ).nth(0)
- try:
- # 等待按钮出现(最多10秒),如果出现说明未登录
- login_button.wait_for(state="visible", timeout=10000)
- print("⚠️ 检测到未登录状态,正在点击「登录/注册」按钮...")
- login_button.click() # 点击按钮,唤起扫码登录弹窗
- # 提示你手动扫码登录
- print("\n🔔 请打开天眼查APP,扫描页面上的登录二维码,只有四十秒,登录完成后按回车键继续...")
- # 等待登录完成:等待「登录/注册」按钮消失(说明已成功登录)
- self.page.wait_for_selector(
- "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn",
- state="hidden", # 等待元素隐藏
- timeout=40000 # 最多等30秒,给足扫码时间
- )
- print("✅ 扫码登录成功!")
- except PlaywrightTimeoutError:
- # 10秒内没找到「登录/注册」按钮 → 说明已经处于登录状态
- print("✅ 检测到已登录状态,无需重复登录")
- print("\n⚠️ 请先完成天眼查登录!")
- # self.page = self.context.new_page() # ✅ await创建页面
- # self.page.goto("https://www.tianyancha.com", timeout=30000) # ✅ await跳转
- # input("请在浏览器中完成天眼查登录,登录后按回车继续...")
- self.context.storage_state(path=self.login_state_path) # ✅ await保存状态
- print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}")
- search_locator = None
- try:
- # 优先定位:placeholder匹配
- search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]')
- if search_locator.count() > 1:
- search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]').nth(1)
- else:
- search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]')
- # # 备用定位:ID匹配
- # if not search_locator.count():
- # search_locator = self.page.locator('input#header-company-search')
- # 等待搜索框加载(超时会触发TimeoutError)
- search_locator.wait_for(timeout=10000, state="visible")
- print("✅ 定位到天眼查搜索框")
- except PlaywrightTimeoutError:
- print(f"❌ 搜索框定位超时:页面加载过慢或搜索框元素不存在")
- return False
- except Exception as e:
- print(f"❌ 搜索框定位失败:{str(e)}")
- return False
- # 3. 清空搜索框 + 输入企业名 + 回车搜索
- search_locator.click()
- search_locator.clear()
- print(f"📌 输入企业名:{enterprise_name}")
- # 模拟真人输入延迟
- search_locator.fill(enterprise_name)
- self.page.wait_for_timeout(1000)
- # 推荐:直接用键盘回车触发搜索,这在大部分前端框架中最稳定
- search_locator.press("Enter")
- print("🖱️ 已触发回车搜索")
- #点击搜索按钮
- # search_btn = self.page.locator("button.50ab4.tyc-header-suggest-button_52bf6")
- # await search_btn.click() # 回车搜索
- # 4. 等待搜索结果加载
- self.page.wait_for_load_state("networkidle", timeout=20000)
- print(f"✅ 天眼查搜索完成!已搜索:{enterprise_name}")
- return True
- except PlaywrightTimeoutError:
- print(f"❌ 天眼查搜索超时(企业名:{enterprise_name})")
- return False
- except Exception as e:
- print(f"❌ 天眼查搜索异常:{e}")
- return False
- def get_enterprise_info(self) -> Dict:
- """
- 从天眼查搜索结果页提取核心字段(可自定义字段)
- 返回:包含三个字段的字典(示例:法定代表人、注册资本、成立日期)
- """
- enterprise_detail = {
- "tyc_company_name": "", # 公司名
- "tyc_company_code": "", # 统一社会信用代码
- "tyc_company_address": "" # 成立日期
- }
- if not self.page:
- print("❌ 天眼查页面未初始化")
- return enterprise_detail
- try:
- # 等待详情页加载
- # self.page.('div.company-header-container', timeout=8000)
- self.page.wait_for_timeout(timeout=4000)
- try:
- # 提取公司名,可能会出现很多个结果,但路径都一样,一般取第一个。
- company_name_locator = self.page.locator("div.index_name__qEdWi span").nth(0)
- if company_name_locator.count():
- company_name = company_name_locator.inner_text()
- enterprise_detail['tyc_company_name'] = company_name.strip()
- print(f"获取到公司名:{enterprise_detail['tyc_company_name']}")
- else:
- print(f"没有获取到企业名,网页路径有问题")
- except Exception as e:
- input("提取企业元素发生问题,检查一下")
- print(f"提取企业名时发生异常:{str(e)},网页路径或元素定位异常")
- enterprise_detail['tyc_company_name'] = ""
- try:
- # 提取统一社会信用代码
- code_locator = self.page.locator("div.index_info-col__UVcZb.index_credit-code__kWuDZ span").nth(0)
- if code_locator.count():
- code = code_locator.inner_text()
- enterprise_detail['tyc_company_code'] = code.strip()
- print(f"获取到企业信用代码:{enterprise_detail['tyc_company_code']}")
- else:
- print(f"没有获取到企业信用代码,网页路径有问题")
- except Exception as e:
- print(f"提取统一社会信用代码时发生异常:{str(e)},网页路径或元素定位异常")
- enterprise_detail['tyc_company_code'] = ""
- try:
- address_locator = self.page.locator("div.index_contact-col__7AboU.index_address__mHjQD .index_value__Pl0Nh").nth(0)
- if address_locator.count():
- address = address_locator.inner_text()
- enterprise_detail['tyc_company_address'] = address.strip()
- print(f"获取到企业地址:{enterprise_detail['tyc_company_address']}")
- else:
- print(f"没有获取到企业地址,网页路径有问题")
- except Exception as e:
- print(f"提取企业地址时发生异常:{str(e)},网页路径或元素定位异常")
- enterprise_detail['tyc_company_address'] = ""
- print("\n📌 提取的企业核心信息:")
- print(f"公司名:{enterprise_detail['tyc_company_name']}")
- print(f"企业信用代码:{enterprise_detail['tyc_company_code']}")
- print(f"企业地址:{enterprise_detail['tyc_company_address']}")
- return enterprise_detail
- except Exception as e:
- print(f"❌ 提取企业信息失败:{e}")
- return enterprise_detail
- def close(self) -> None:
- """关闭浏览器"""
- if self.page:
- try:
- self.page.close()
- except Exception:
- pass
- if self.context:
- try:
- self.context.close()
- except Exception:
- pass
- if self.browser:
- try:
- self.browser.close()
- print("✅ 天眼查浏览器已关闭")
- except Exception:
- pass
- class PddLinkBrowser:
- """拼多多链接浏览器(支持登录持久化+图片下载+OCR识别)"""
- def __init__(self, login_state_path: str = PLAYWRIGHT_CONFIG["login_state_path"]):
- self.login_state_path = login_state_path
- self.browser = None
- self.context: Optional[BrowserContext] = None
- self.page = None
- # 初始化图片保存文件夹
- self._init_image_dir()
- # 初始化百度OCR实例
- self.ocr_client: Optional[BaiduOCR] = None
- self._init_ocr_client()
- # ========== 初始化天眼查浏览器 ==========
- self.tyc_browser = TianyanchaBrowser()
- def _init_ocr_client(self):
- """初始化百度OCR客户端"""
- api_key = BAIDU_OCR_CONFIG["api_key"]
- secret_key = BAIDU_OCR_CONFIG["secret_key"]
- if not api_key or not secret_key:
- print("⚠️ 未配置百度OCR的API_KEY/SECRET_KEY,将跳过OCR识别")
- return
- self.ocr_client = BaiduOCR(api_key=api_key, secret_key=secret_key)
- print("✅ 百度OCR客户端初始化完成")
- # ========== 检测手机登录框并等待手动登录 ==========
- def _check_login_box(self) -> bool:
- """
- 检测是否出现「手机登录」框(div.phone-login 包含"手机登录"文本)
- :return: True=检测到并完成登录;False=未检测到登录框
- """
- if not self.page:
- print("❌ 页面未初始化,无法检测登录框")
- return False
- try:
- # 精准定位登录框元素:div.phone-login 下的 span(包含"手机登录"文本)
- login_locator = self.page.locator("div.phone-login span")
- # 等待元素可见(最多5秒,超时则认为无登录框)
- login_locator.wait_for(timeout=5000, state="visible")
- # 获取元素文本(包含伪元素的"手机登录")
- login_text = login_locator.inner_text().strip()
- if "手机登录" in login_text:
- print("\n⚠️ 检测到【手机登录】框,请手动完成登录!")
- input("登录完成后,请按回车键继续执行脚本...")
- # 登录后等待页面重新加载完成(确保登录状态生效)
- self.page.wait_for_load_state("networkidle", timeout=15000)
- print("✅ 登录已完成,继续处理当前商品")
- return True
- return False
- except PlaywrightTimeoutError:
- # 超时说明页面没有登录框,直接返回False
- return False
- except Exception as e:
- print(f"⚠️ 检测登录框时发生异常:{str(e)[:60]},继续执行")
- return False
- # ========== 登录检测方法结束 ==========
- # ========== 自定义向下滑动方法 ==========
- def _scroll_down(self, distance: int = 500, step: int = 50, interval: int =100):
- """
- 分步向下滑动指定距离(解决图片懒加载,避免一次性跳转)
- :param distance: 总滑动距离(像素,默认500)
- :param step: 每次滑动的步长(像素,默认50,越小越平缓)
- :param interval: 每次滑动后的间隔时间(毫秒,默认100)
- """
- if not self.page:
- print("❌ 浏览器页面未初始化,无法滑动")
- return
- # 容错处理:步长/总距离为非正数时直接返回
- if step <= 0 or distance <= 0:
- print(f"⚠️ 无效的滑动参数(总距离:{distance},步长:{step}),跳过滑动")
- return
- try:
- remaining = distance # 剩余未滑动的距离
- print(f"📝 开始分步滑动:总距离{distance}像素,每次滑{step}像素,间隔{interval}ms")
- while remaining > 0:
- current_step = min(step, remaining)
- self.page.evaluate(f"window.scrollBy(0, {current_step})")
- remaining -= current_step
- self.page.wait_for_timeout(interval)
- self.page.wait_for_timeout(2000)
- print(f"✅ 分步滑动完成,总滑动距离:{distance}像素")
- except Exception as e:
- print(f"⚠️ 分步滑动失败:{str(e)[:50]}")
- # ========== 滑动方法结束 ==========
- def _init_image_dir(self):
- """创建图片保存文件夹(不存在则创建)"""
- if not os.path.exists(IMAGE_CONFIG["save_dir"]):
- os.makedirs(IMAGE_CONFIG['save_dir'])
- print(f"✅ 图片保存文件夹已创建:{os.path.abspath(IMAGE_CONFIG['save_dir'])}")
- else:
- print(f"✅ 图片保存文件夹已存在:{os.path.abspath(IMAGE_CONFIG['save_dir'])}")
- def _get_image_filename(self, img_src: str, shop_name: str) -> str:
- """生成唯一的图片文件名(避免重复)"""
- # 提取原始文件名后缀(如.png/.jpg)
- parsed_url = urlparse(img_src)
- ext = os.path.splitext(parsed_url.path)[-1] or '.png'
- # 清洗店铺名(避免特殊字符)
- clean_shop = "".join([c for c in shop_name if c.isalnum() or c in ["_", "-"]])[:20]
- # 时间戳+店铺名+随机数,确保唯一
- timestamp = str(int(time.time() * 1000))
- filename = f"{clean_shop}_{timestamp}{ext}"
- return filename
- def _download_image(self, img_src: str, shop_name: str) -> Optional[str]:
- """
- 下载图片到指定文件夹
- :return: 成功返回保存路径,失败返回None
- """
- if not img_src:
- print("⚠️ 图片链接为空,跳过下载")
- return None
- # 生成唯一文件名
- filename = self._get_image_filename(img_src, shop_name)
- save_path = os.path.join(IMAGE_CONFIG["save_dir"], filename)
- # 下载重试逻辑
- for retry in range(IMAGE_CONFIG["retry"] + 1):
- try:
- # 发送请求下载图片(添加headers模拟浏览器)
- headers = {
- "User-Agent": PLAYWRIGHT_CONFIG["browser_args"][-1].split("=")[1],
- "Referer": "https://www.pinduoduo.com/",
- "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8" # 新增:支持jpeg格式
- }
- response = requests.get(
- img_src,
- headers=headers,
- timeout=IMAGE_CONFIG["timeout"],
- stream=True, # 流式下载,避免内存溢出
- allow_redirects=True # 显式开启重定向(拼多多签名链接可能302)
- )
- response.raise_for_status() # 抛出HTTP错误(4xx/5xx)
- # 保存图片到文件
- with open(save_path, "wb") as f:
- for chunk in response.iter_content(chunk_size=8192):
- f.write(chunk)
- # 校验文件是否保存成功
- if os.path.getsize(save_path) > 0:
- print(f"✅ 图片下载成功:{save_path}")
- return save_path
- else:
- os.remove(save_path) # 删除空文件
- print(f"⚠️ 图片下载为空,重试{retry+1}/{IMAGE_CONFIG['retry']}")
- except requests.exceptions.HTTPError as e:
- if e.response.status_code == 403:
- print(f"❌ 图片签名过期/无权限:{img_src[:50]}...")
- return None # 403无需重试,直接跳过
- elif retry < IMAGE_CONFIG["retry"]:
- print(f"⚠️ HTTP错误(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{e}")
- time.sleep(1)
- continue
- print(f"❌ 图片下载失败:{e}")
- return None
- except Exception as e:
- if retry < IMAGE_CONFIG["retry"]:
- print(f"⚠️ 下载失败(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{str(e)[:50]}")
- time.sleep(1)
- continue
- print(f"❌ 图片最终下载失败:{str(e)[:50]}")
- return None
- def _process_ocr(self, image_path: str) -> Optional[Dict]:
- """OCR识别后立即调用天眼查搜索+提取企业信息"""
- """
- 调用OCR识别并提取企业信息
- :param image_path: 图片路径
- :return: 企业信息字典
- """
- if not self.ocr_client:
- print("⚠️ OCR客户端未初始化,跳过识别")
- return None
- if not os.path.exists(image_path):
- print(f"❌ 图片文件不存在:{image_path}")
- return None
- # 调用OCR识别
- ocr_result = self.ocr_client.general_ocr(
- image_path=image_path,
- scale=BAIDU_OCR_CONFIG["scale"]
- )
- print(f"识别结果{ocr_result}")
- if not ocr_result:
- return None
- # 提取企业信息
- enterprise_info = self.ocr_client.extract_enterprise_info(ocr_result)
- print("\n📌 提取的企业信息:")
- enterprise_name = enterprise_info.get("enterprise_name", "")
- if not enterprise_name:
- print("⚠️ 未提取到企业名称,跳过天眼查")
- return enterprise_info
- # 2. 调用天眼查搜索+提取字段
- if self.tyc_browser.search_enterprise(enterprise_name):
- # 提取三个核心字段
- tyc_info = self.tyc_browser.get_enterprise_info()
- # 合并OCR结果和天眼查字段
- enterprise_info.update(tyc_info)
- print("\n📌 最终整合结果:")
- print(json.dumps(enterprise_info, ensure_ascii=False, indent=4))
- return enterprise_info
- def _load_login_state(self) -> Optional[Dict]:
- """加载本地登录状态"""
- if os.path.exists(self.login_state_path):
- try:
- with open(self.login_state_path, "r", encoding="utf-8") as f:
- return json.load(f)
- except json.JSONDecodeError:
- print(f"⚠️ 登录状态文件损坏:{self.login_state_path},将重新登录")
- os.remove(self.login_state_path)
- return None
- def _save_login_state(self) -> None:
- """保存登录状态到本地"""
- if self.context:
- try:
- self.context.storage_state(path=self.login_state_path)
- print(f"✅ 登录状态已保存到:{self.login_state_path}")
- except Exception as e:
- print(f"⚠️ 保存登录状态失败:{e}")
- def init_browser(self) -> bool:
- """初始化浏览器(加载登录状态/提示登录)"""
- try:
- # ✅ 核心修改 1:全局只启动【一次】 Playwright 引擎!存到 self.pw 中
- self.pw = sync_playwright().start()
- # ✅ 核心修改 2:把启动好的引擎传给天眼查去用
- self.tyc_browser.init_browser(self.pw)
- # 启动优化后的浏览器
- # ✅ 核心修改 3:拼多多也用这同一个引擎启动浏览器
- self.browser = self.pw.chromium.launch(
- headless=PLAYWRIGHT_CONFIG["headless"],
- slow_mo=PLAYWRIGHT_CONFIG["slow_mo"],
- args=PLAYWRIGHT_CONFIG["browser_args"],
- ignore_default_args=["--enable-automation"], # 隐藏自动化标识
- timeout=60000
- )
- # 加载登录状态或创建新上下文
- login_state = self._load_login_state()
- if login_state:
- self.context = self.browser.new_context(
- viewport=PLAYWRIGHT_CONFIG["viewport"],
- locale=PLAYWRIGHT_CONFIG["locale"],
- timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
- ignore_https_errors=True,
- storage_state=login_state # 加载登录状态
- )
- print("✅ 已加载本地登录状态")
- else:
- self.context = self.browser.new_context(
- viewport=PLAYWRIGHT_CONFIG["viewport"],
- locale=PLAYWRIGHT_CONFIG["locale"],
- timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
- ignore_https_errors=True
- )
- print("\n⚠️ 未检测到登录状态,请先完成拼多多登录!")
- self.page = self.context.new_page()
- self.page.goto("https://www.pinduoduo.com", timeout=30000)
- input("请在浏览器中完成登录,登录后按回车继续...")
- self.context.storage_state(path=self.login_state_path) # ✅ 加await保存状态
- # 初始化页面
- self.page = self.context.new_page()
- self.page.set_default_timeout(PLAYWRIGHT_CONFIG["default_timeout"])
- self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG["navigation_timeout"])
- return True
- except Exception as e:
- print(f"❌ 浏览器初始化失败:{e}")
- self.close() # ✅ 核心修复:加await
- return False
- def open_links(self, goods_data: List[Dict], db_reader: DBGoodsReader) -> List[Dict]:
- """依次打开商品链接(支持店名过滤+图片下载+OCR识别+天眼查搜索提取)"""
- if not self.page:
- print("❌ 浏览器未初始化")
- return []
- total = len(goods_data)
- if total == 0:
- print("⚠️ 无商品链接可处理")
- return []
- print(f"\n📋 共待处理 {total} 条商品链接")
- # 收集所有抓取到的结果(可选,如果想最后统一保存的话)
- all_results = []
- for idx, item in enumerate(goods_data, 1):
- shop = item.get("shop", "未知店铺").strip()
- link = (item.get("product_link") or "").strip()
- if not link:
- print(f"\n⚠️ 第{idx}/{total}条:店铺【{shop}】链接为空,跳过")
- continue
- print(f"\n{'=' * 15} 第 {idx}/{total} 条 {'=' * 15}")
- print(f"🏪 数据库店名:{shop}")
- print(f"🔗 商品链接:{link}")
- # ========== 判断店名是否包含“旗舰店” ==========
- if "旗舰店" not in shop:
- print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称不含“旗舰店”,跳过")
- #涉及突破滑块验证的部分了。
- continue
- else:
- print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称包含“旗舰店”,打开商品链接")
- try:
- # 1. 打开商品链接
- self.page.goto(
- link,
- wait_until="load",
- timeout=PLAYWRIGHT_CONFIG["navigation_timeout"]
- )
- self.page.wait_for_load_state("networkidle", timeout=15000)
- print(f"✅ 页面加载成功:{self.page.title()}...")
- # 检测登录框
- self._check_login_box()
- #如果已售罄,不跳过。
- # sold_out_locator = self.page.locator("")
- # if sold_out_locator.count() > 0 :
- # print("该商品已售罄,跳过这次采集")
- #
- # 2. 店名匹配判断
- page_shop_locator = self.page.locator("div.BAq4Lzv7")
- try:
- page_shop_locator.wait_for(timeout=5000)
- page_shop_text = (page_shop_locator.inner_text()).strip().lower()
- except PlaywrightTimeoutError:
- print(f"❌ 未找到页面店名元素,可能页面结构改变或被风控,跳过")
- continue
- db_shop_text = shop.lower()
- print(f"🏪 页面元素店铺名:{page_shop_text}")
- if page_shop_text != db_shop_text:
- print(f"❌ 店名不匹配(数据库:{db_shop_text} | 页面:{page_shop_text}),跳过")
- self.page.wait_for_timeout(2000)
- continue
- print(f"✅ 店名匹配成功!")
- # 自定义滑动距离,触发图片懒加载
- self._scroll_down(distance=2100)
- # ========== 获取图片src并下载 ==========
- # shop_name = 'pdd_shop_info_middle_back'
- final_enterprise_info = None
- try:
- # ========== 原定位策略(优先使用) ==========
- img_locators = self.page.locator("img[role='img'][aria-label='查看图片']")
- img_count = img_locators.count()
- # ========== 原定位不足时,切换到备用定位 ==========
- if img_count < 2:
- print(f"⚠️ 原定位仅匹配到{img_count}个图片,尝试备用定位(拼多多懒加载图片)...")
- input("请手动检查页面图片元素,按回车继续...")
- continue
- # 备用定位:匹配截图里的「pdd-lazy-image」类资质图片(带水印的营业执照)
- # backup_img_locators = self.page.locator(
- # "img.pdd-lazy-image.loaded" # 精准匹配已加载的懒加载图片
- # )
- #
- # backup_count = backup_img_locators.count()
- #
- # if backup_count >= 2:
- # img_locators = backup_img_locators
- # img_count = backup_count
- # print(f"✅ 备用定位生效,匹配到图片元素:{img_count} 个")
- # else:
- # print(f"⚠️ 原定位({img_count}个) + 备用定位({backup_count}个)均不足2个,跳过下载")
- # input("请手动检查页面图片元素,按回车继续...")
- # continue # 跳过当前店铺,避免卡死
- print(f"📸 匹配到图片元素:{img_count} 个")
- # 3. 定位第二个元素
- target_img_locator = img_locators.nth(1)
- target_img_locator.wait_for(timeout=5000, state="visible")
- # 4. 获取第二个图片的src
- img_src = target_img_locator.get_attribute("src")
- if img_src:
- print(f"🖼️ 第2个图片 src:{img_src[:80]}...")
- image_path = self._download_image(img_src, shop)
- if image_path:
- # ========== 核心:调用OCR并获取最终的天眼查数据 ==========
- final_enterprise_info = self._process_ocr(image_path)
- else:
- print(f"⚠️ 第2个图片的src为空")
- except Exception as e:
- print(f"❌ 获取图片/识别失败:{str(e)[:100]}")
- # 3. 收集数据并自动循环
- if final_enterprise_info:
- # 将原数据库的店名也塞进去,方便后续入库对比
- print(f"天眼查---查出来的数据为{final_enterprise_info}")
- # final_enterprise_info['pdd_shop_name'] = shop
- all_results.append(final_enterprise_info)
- # 获取到的数据回填数据库
- update_success = db_reader.insert_enterprise_info(
- shop_name=shop,
- enterprise_info=final_enterprise_info, # 直接传入天眼查返回的字典
- )
- if update_success:
- print(f"✅ 店铺[{shop}]数据回填成功")
- else:
- print(f"❌ 店铺[{shop}]数据回填失败")
- print(f"\n🎉 成功获取数据,准备进入下一条...")
- else:
- print(f"\n⚠️ 本条未获取到有效企业信息,准备进入下一条...")
- self.page.wait_for_timeout(5000)
- except PlaywrightTimeoutError:
- print(f"⏰ 页面加载/元素定位超时:{link}")
- input("排查问题")
- continue
- except Exception as e:
- print(f"❌ 第{idx}条处理异常:{str(e)[:100]}...,跳过")
- continue
- return all_results
- def close(self) -> None:
- """关闭浏览器(异步版,补全所有await)"""
- # 先关闭天眼查浏览器
- if hasattr(self, 'tyc_browser') and self.tyc_browser:
- self.tyc_browser.close()
- if hasattr(self, 'pw') and self.pw:
- try:
- self.pw.stop()
- print("✅ Playwright 驱动已彻底停止")
- except Exception:
- pass
- # 关闭拼多多浏览器
- if self.page:
- try:
- self.page.close()
- except Exception:
- pass
- if self.context:
- try:
- self.context.close()
- except Exception:
- pass
- if self.browser:
- try:
- self.browser.close()
- print("✅ 拼多多浏览器已关闭")
- except Exception:
- pass
- def main():
- """主函数:整合数据库读取+链接浏览"""
- # 1. 读取环境变量并补全默认值
- db_config = {
- "host": os.getenv("DB_HOST", DEFAULT_DB_CONFIG["host"]),
- "port": int(os.getenv("DB_PORT", DEFAULT_DB_CONFIG["port"])),
- "user": os.getenv("DB_USERNAME", DEFAULT_DB_CONFIG["user"]),
- "password": os.getenv("DB_PASSWORD", DEFAULT_DB_CONFIG["password"]),
- "db_name": os.getenv("DB_DATABASE", DEFAULT_DB_CONFIG["db_name"]),
- "table_name": os.getenv("DB_TABLENAME", DEFAULT_DB_CONFIG["table_name"])
- }
- # 2. 初始化数据库读取器
- db_reader = DBGoodsReader(
- host=db_config["host"],
- port=db_config["port"],
- user=db_config["user"],
- password=db_config["password"],
- db_name=db_config["db_name"]
- )
- if not db_reader.connect_db():
- return
- # 3. 读取商品链接
- goods_data = db_reader.get_shop_and_goods()
- # 预览前5条数据
- if goods_data:
- print("\n📌 数据预览(前5条):")
- for idx, item in enumerate(goods_data[:5], 1):
- print(f"第{idx}条 | 店铺:{item['shop'][:20]} | 链接:{item['product_link'][:50]}...")
- # 4. 初始化浏览器并打开链接
- # 初始化拼多多浏览器
- pdd_browser = PddLinkBrowser()
- if not pdd_browser.init_browser():
- return
- # 接收返回的所有提取结果
- extracted_data = pdd_browser.open_links(goods_data, db_reader)
- # 打印最终统计
- print(f"\n📊 爬取任务结束,共成功提取 {len(extracted_data)} 条企业信息!")
- if extracted_data:
- # 这里你可以将 extracted_data 写入数据库,或者存为 json/csv
- # 例如打印第一条看看:
- print("💡 最终数据示例:", json.dumps(extracted_data[0], ensure_ascii=False, indent=2))
- pdd_browser.close()
- db_reader.close_db()
- if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- print("\n⚠️ 程序被用户中断")
- except Exception as e:
- print(f"\n❌ 程序运行出错:{e}")
|