import os import json from dotenv import load_dotenv import pymysql from typing import List, Dict, Optional import time from playwright.sync_api import ( sync_playwright, TimeoutError as PlaywrightTimeoutError, BrowserContext ) import requests from urllib.parse import urlparse # 补充百度OCR所需依赖 import base64 from PIL import Image import io import asyncio # 加载环境变量 load_dotenv() # ===================== 全局常量配置(集中管理,方便修改)===================== # 数据库默认配置 DEFAULT_DB_CONFIG = { "host": "localhost", "port": 3306, "user": "root", "password": "", "db_name": "", "table_name": "" } # Playwright配置 PLAYWRIGHT_CONFIG = { "headless": False, "slow_mo": 300, "browser_args": [ "--start-maximized", "--disable-blink-features=AutomationControlled", # 核心防检测 "--no-sandbox", # 兼容Windows/Linux "--disable-dev-shm-usage", # 解决内存不足 "--disable-popup-blocking", # 禁用弹窗拦截 "--disable-extensions", # 禁用扩展 "--disable-gpu", # 禁用GPU加速 "--lang=zh-CN,zh", # 中文语言 "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ], "viewport": {"width": 2050, "height": 1200}, "locale": "zh-CN", "timezone_id": "Asia/Shanghai", "default_timeout": 15000, "navigation_timeout": 30000, "login_state_path": "pdd_login_state.json", # 登录状态持久化文件 "tianyancha_login_state": "tianyancha_login_state.json" # 天眼查登录状态保存路径 } # 百度OCR配置 BAIDU_OCR_CONFIG = { "api_key": os.getenv('APP_KEY'), "secret_key": os.getenv('APP_SECRET'), "scale": 1.5 # OCR图片放大倍数 } # 图片保存配置 IMAGE_CONFIG = { "save_dir": "pdd_goods_images", # 独立文件夹名(项目根目录下) "timeout": 10, # 图片下载超时时间(秒) "retry": 1 # 下载失败重试次数 } # 1. 日期变量赋值SQL SET_DATE_SQL = "SET @date_constant = CURDATE();" # 2. 核心查询SQL QUERY_SQL = """ SELECT product, shop, product_link, scrape_date, business_license_company, search_key FROM ( SELECT pd.product, pd.shop, pd.product_link, pd.scrape_date, psi.business_license_company, pd.search_key, ROW_NUMBER() OVER (PARTITION BY pd.shop ORDER BY pd.search_key ASC) AS rn FROM pdd_drug_middle pd LEFT JOIN pdd_shop_info_middle psi ON psi.shop = pd.shop WHERE pd.scrape_date >= @date_constant AND psi.business_license_company IS NULL ) AS sub WHERE rn = 1 ORDER BY search_key; """ # ===================== 百度OCR类(完整整合)===================== class BaiduOCR: """百度 OCR 文字识别封装类""" def __init__(self, api_key: str, secret_key: str): """ 初始化百度 OCR :param api_key: 百度智能云应用的 API Key :param secret_key: 百度智能云应用的 Secret Key """ self.api_key = api_key self.secret_key = secret_key self.access_token: Optional[str] = None # 获取 access_token(有效期30天,建议缓存) self._get_access_token() def _get_access_token(self) -> bool: """ 获取百度 OCR 的 access_token(有效期30天) :return: 是否获取成功 """ url = "https://aip.baidubce.com/oauth/2.0/token" params = { "grant_type": "client_credentials", "client_id": self.api_key, "client_secret": self.secret_key } try: response = requests.post(url, params=params, timeout=10) response.raise_for_status() result = response.json() if "access_token" in result: self.access_token = result["access_token"] print(f"✅ 成功获取 access_token:{self.access_token[:20]}...") return True else: print(f"❌ 获取 access_token 失败:{result}") return False except Exception as e: print(f"❌ 获取 access_token 异常:{e}") return False def _enlarge_and_crop_image(self, image_path: str, scale: float=1.5, crop_ratio: float=0.5) -> bytes: """ 先裁剪图片上半部分(保留有效内容),再放大图片(解决OCR尺寸错误) :param image_path: 原图路径 :param scale: 放大倍数(推荐1.5~3.0) :param crop_ratio: 裁剪比例(0.5=保留上50%,0.6=保留上60%,可根据图片调整) :return: 处理后的图片二进制数据 """ try: with Image.open(image_path) as img: # ========== 步骤1:裁剪上半部分(核心修复) ========== # 计算裁剪区域:左=0,上=0,右=原图宽度,下=原图高度×裁剪比例 crop_box = ( 0, # 左边界 0, # 上边界 img.width, # 右边界 int(img.height * crop_ratio) # 下边界(只保留上半部分) ) img_cropped = img.crop(crop_box) # 执行裁剪 print(f"✅ 图片裁剪完成:保留上{int(crop_ratio * 100)}%区域,尺寸={img_cropped.size}") # ========== 步骤2:放大裁剪后的图片 ========== new_width = int(img_cropped.width * scale) new_height = int(img_cropped.height * scale) # 高质量放大(Lanczos算法,最清晰) img_resized = img_cropped.resize( (new_width, new_height), Image.Resampling.LANCZOS ) # ========== 仅5行,强制缩到4096×4096以内(核心微调) ========== MAX_OCR_SIZE = 4096 # 百度OCR最大允许宽度/高度 if img_resized.width > MAX_OCR_SIZE: ratio = MAX_OCR_SIZE / img_resized.width # 计算缩放比例 img_resized = img_resized.resize( (MAX_OCR_SIZE, int(img_resized.height * ratio)), Image.Resampling.LANCZOS ) if img_resized.mode == 'RGBA': # 创建白色背景的RGB画布 rgb_img = Image.new('RGB', img_resized.size, (255, 255, 255)) # 将RGBA图片粘贴到RGB画布(透明区域显示白色) rgb_img.paste(img_resized, mask=img_resized.split()[3]) # mask=alpha通道 img_resized = rgb_img # # ========== 保存处理后图片到本地 ========== # # 1. 确保pdd_goods_images文件夹存在(不存在则创建) # save_dir = "pdd_goods_images" # if not os.path.exists(save_dir): # os.makedirs(save_dir) # # 2. 提取原图片文件名(比如从image_path中拿到"鸿祥堂大药房旗舰店_1773649991220.jpeg") # file_name = os.path.basename(image_path) # # 3. 拼接保存路径 # save_path = os.path.join(save_dir, file_name) # # 4. 保存图片到本地(质量和OCR用的一致) # img_resized.save(save_path, format='JPEG', quality=95) # print(f"✅ 处理后图片已保存到:{save_path}") # # ====================================================== # 保存到内存(不生成本地文件) img_byte_arr = io.BytesIO() # 保存为 JPG,保证清晰度 img_resized.save(img_byte_arr, format='JPEG', quality=95) img_byte_arr = img_byte_arr.getvalue() # 校验文件大小(超4MB则再次压缩) file_size = len(img_byte_arr) / 1024 / 1024 # 转MB if file_size > 4: print(f"⚠️ 文件超4MB({file_size:.2f}MB),二次压缩...") img_byte_arr = io.BytesIO() img_resized.save(img_byte_arr, format='JPEG', quality=70, optimize=True) img_byte_arr = img_byte_arr.getvalue() # 打印最终尺寸(方便调试) print(f"✅ 图片放大完成:最终尺寸={img_resized.size}") return img_byte_arr except Exception as e: print(f"❌ 图片裁剪/放大失败:{str(e)}") return b'' def general_ocr(self, image_path: str, scale: float = 1.5) -> Optional[Dict]: """ 调用百度通用文字识别(支持图片放大) :param image_path: 本地图片路径 :param scale: 放大倍数,默认2倍 :return: OCR识别结果 """ if not self.access_token: print("❌ access_token 无效,请先初始化") return None try: """ 百度OCR通用识别(整合裁剪+放大) """ # 替换原放大逻辑为「裁剪+放大」 image_data = self._enlarge_and_crop_image(image_path, scale=scale, crop_ratio=0.5) if not image_data: print("❌ 图片处理失败,无法识别") return {} image_base64 = base64.b64encode(image_data).decode("utf-8") except Exception as e: print(f"❌ 图片放大/读取失败:{e}") return None # 调用 OCR 接口 url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={self.access_token}" headers = {"Content-Type": "application/x-www-form-urlencoded"} data = {"image": image_base64} try: response = requests.post(url, headers=headers, data=data, timeout=10) response.raise_for_status() result = response.json() if "words_result" in result: print(f"✅ 识别成功,共识别到 {len(result['words_result'])} 行文字") return result else: print(f"❌ 识别失败:{result}") return None except Exception as e: print(f"❌ 调用 OCR 接口异常:{e}") return None def extract_enterprise_info(self, ocr_result: Dict) -> Dict: """ 从OCR识别结果中提取企业名称和社会信用代码 :param ocr_result: general_ocr 返回的识别结果字典 :return: 包含企业名称和社会信用代码的JSON格式字典 格式:{"enterprise_name": "企业名称", "credit_code": "社会信用代码"} """ # 初始化返回结果(默认空值) enterprise_info = { "enterprise_name": "", "credit_code": "" } if not ocr_result or "words_result" not in ocr_result: print("❌ OCR识别结果为空,无法提取企业信息") return enterprise_info all_text_lines = [] # 遍历所有识别的文字行,匹配关键词 for item in ocr_result["words_result"]: line_text = item["words"].strip() # 去除首尾空格 if line_text and line_text not in all_text_lines: # 去空+去重 all_text_lines.append(line_text) print(f"📝 OCR识别的有效行:{all_text_lines}") # ==================== 1. 提取并清洗企业名称 ==================== enterprise_name = "" # 名称匹配关键词(覆盖所有场景) name_keywords = ["企业名称", "名称:", "名:", "称:"] # 常见错别字修正映射 name_correction = { "人药房": "大药房", "有松司": "有限公司", "松司": "公司", "关药房": "大药房" } for idx, line_text in enumerate(all_text_lines): # 场景1:包含"企业名称"(处理括号冗余,如"企业名称(名称xxx") if "企业名称" in line_text: # 移除所有括号及内部内容,再提取名称 import re # 正则移除括号(()/())及内容 clean_line = re.sub(r'\([^)]*\)|\([^)]*\)', '', line_text) # 提取"企业名称"后的所有内容 name_part = clean_line.split("企业名称")[-1].strip() # 若还有"名称"前缀,继续拆分 if "名称" in name_part: name_part = name_part.split("名称")[-1].strip() enterprise_name = name_part break # 场景2:单行包含"名称:"/"名:"/"称:" elif any(key in line_text for key in ["名称:", "名:", "称:"]): name_part = line_text.split(":")[-1].strip() if ":" in line_text else line_text.split(":")[-1].strip() enterprise_name = name_part break # 场景3:跨行拆分(前一行是"名",当前行以"称:"开头) elif idx > 0 and all_text_lines[idx - 1] == "名" and line_text.startswith("称:"): name_part = line_text.split(":")[-1].strip() enterprise_name = name_part break # 清洗企业名称:修正错别字、移除多余空格 if enterprise_name: for wrong, right in name_correction.items(): enterprise_name = enterprise_name.replace(wrong, right) # 步骤2:移除开头/结尾的冒号(中文+英文)、空格、特殊符号 enterprise_name = enterprise_name.strip(":: \t\n\r") # 步骤3:移除中间多余空格 enterprise_name = enterprise_name.replace(" ", "") # 移除所有空格 # ==================== 2. 提取并清洗社会信用代码 ==================== credit_code = "" # 信用代码匹配关键词(兼容错别字+多格式) code_keywords = ["社会信用代码:", "统一社会信用代码:", "社会震用代码:"] for line_text in all_text_lines: # 匹配任意关键词 for keyword in code_keywords: if keyword in line_text: code_part = line_text.split(keyword)[-1].strip() credit_code = code_part.replace(" ", "") # 移除空格(如"91360105 MAEGBDKMXF") break if credit_code: # 找到后跳出循环 break # ==================== 赋值并打印结果 ==================== enterprise_info["enterprise_name"] = enterprise_name enterprise_info["credit_code"] = credit_code # enterprise_info["address"] = address # 打印提取结果 if enterprise_name: print(f"✅ 提取到企业名称:{enterprise_name}") else: print("⚠️ 未识别到企业名称字段") if credit_code: print(f"✅ 提取到社会信用代码:{credit_code}") else: print("⚠️ 未识别到社会信用代码字段") return enterprise_info # ===================== 数据库读取类 ===================== class DBGoodsReader: """数据库商品链接读取器""" def __init__( self, host: str = DEFAULT_DB_CONFIG["host"], port: int = DEFAULT_DB_CONFIG["port"], user: str = DEFAULT_DB_CONFIG["user"], password: str = DEFAULT_DB_CONFIG["password"], db_name: str = DEFAULT_DB_CONFIG["db_name"], charset: str = "utf8mb4" ): self.host = host self.port = port self.user = user self.password = password self.db_name = db_name self.charset = charset self.conn: Optional[pymysql.connections.Connection] = None self.cursor: Optional[pymysql.cursors.DictCursor] = None def connect_db(self) -> bool: """连接数据库(带重试机制)""" max_retry = 2 for retry in range(max_retry + 1): try: self.conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.db_name, charset=self.charset, cursorclass=pymysql.cursors.DictCursor, connect_timeout=10 # 连接超时 ) self.cursor = self.conn.cursor() print(f"✅ 成功连接数据库:{self.db_name}") return True except pymysql.MySQLError as e: if retry < max_retry: print(f"❌ 数据库连接失败(重试{retry + 1}/{max_retry}):{e}") time.sleep(1) continue print(f"❌ 数据库连接最终失败:{e}") return False def get_shop_and_goods(self) -> List[Dict]: """ 读取待补充企业信息的店铺数据(每个店铺仅取1条) 返回:包含product/shop/product_link等字段的字典列表 """ if not self.conn or not self.cursor: print("❌ 未连接数据库,请先调用 connect_db()") return [] try: # 步骤1:执行日期变量赋值 self.cursor.execute(SET_DATE_SQL) # 步骤2:执行核心查询 self.cursor.execute(QUERY_SQL) # 步骤3:获取结果(DictCursor返回字典格式,字段名对应SQL列名) results = self.cursor.fetchall() print(f"✅ 成功读取 {len(results)} 条待补充企业信息的店铺数据") return results except pymysql.MySQLError as e: print(f"❌ 读取数据失败:{e}") return [] def _get_next_id(self) -> int: """获取表中最大ID并+1,用于生成新插入数据的ID(若ID非自增)""" try: sql = "SELECT IFNULL(MAX(id), 0) + 1 AS next_id FROM pdd_shop_info_middle" self.cursor.execute(sql) result = self.cursor.fetchone() next_id = result.get("next_id", 9078) # 默认初始值9078(兼容示例) print(f"✅ 获取到下一个可用ID:{next_id}") return next_id except pymysql.MySQLError as e: print(f"❌ 获取自增ID失败,使用默认值9078:{e}") return 9078 def insert_enterprise_info(self, shop_name: str, enterprise_info: Dict) -> bool: """ 向pdd_shop_info_middle表插入企业信息(替代原更新逻辑) :param shop_name: 店铺名称(关联表的shop字段) :param enterprise_info: 包含tyc_company_name/tyc_company_code/tyc_company_address的字典 :return: 插入是否成功 """ if not self.conn or not self.cursor: print("❌ 未连接数据库,请先调用 connect_db()") return False if not shop_name: print("❌ 店铺名称为空,无法更新") return False business_company_name = enterprise_info.get("tyc_company_name", "").strip() qualification_number = enterprise_info.get("tyc_company_code", "").strip() contact_address = enterprise_info.get("tyc_company_address", "").strip() business_license_address = contact_address # 两个地址字段都用同一个值 # 空值校验提示 empty_fields = [] if not business_company_name: empty_fields.append("企业名称") if not qualification_number: empty_fields.append("统一信用代码") if not contact_address: empty_fields.append("企业地址") if empty_fields: print(f"⚠️ 店铺[{shop_name}]以下字段为空:{','.join(empty_fields)},仍继续插入(空值)") # 生成插入ID(若表ID为自增主键,可删除ID相关逻辑,SQL中也去掉id字段) next_id = self._get_next_id() insert_sql = """ INSERT INTO `test2`.`pdd_shop_info_middle` ( `id`, `shop`, `contact_address`, `qualification_number`, `business_license_company`, `business_license_address`, `scrape_date`, `platform`, `province`, `city`, `create_time`, `update_time` ) VALUES (%s, %s, %s, %s, %s, %s, CURDATE(), '拼多多', '', '', NOW(), NOW()) """ # 组装插入参数 insert_params = [ next_id, shop_name, contact_address, qualification_number, business_company_name, business_license_address ] try: # 执行插入 self.cursor.execute(insert_sql, insert_params) self.conn.commit() # 检查影响行数 affected_rows = self.cursor.rowcount if affected_rows > 0: print(f"✅ 店铺[{shop_name}]成功插入1条数据(ID:{next_id})") print(f" 插入内容:企业名称={business_company_name} | 信用代码={qualification_number} | 地址={contact_address}") return True else: print(f"⚠️ 店铺[{shop_name}]插入0行数据,无数据变更") return False except pymysql.MySQLError as e: print(f"❌ 店铺[{shop_name}]插入失败:{e}") self.conn.rollback() # 回滚事务 return False except Exception as e: print(f"❌ 店铺[{shop_name}]插入异常:{e}") self.conn.rollback() return False def close_db(self) -> None: """安全关闭数据库连接""" if self.cursor: try: self.cursor.close() except Exception: pass if self.conn: try: self.conn.close() print("✅ 数据库连接已关闭") except Exception: pass # ===================== 天眼查浏览器类 ===================== class TianyanchaBrowser: """天眼查浏览器:自动打开、登录、搜索企业名""" def __init__(self): self.pw = None self.browser = None self.context: Optional[BrowserContext] = None self.page = None self.login_state_path = PLAYWRIGHT_CONFIG["tianyancha_login_state"] def check_scan_login_prompt(self): """ 检查是否出现「扫码登录」提示,若出现则暂停并提示手动扫码 """ try: # 定位扫码登录提示文本(结合父div,避免误匹配其他页面文本) scan_prompt_locator = self.page.locator( "div.scan-title", has_text="扫码登录 更快 更安全" ) scan_prompt_locator.wait_for( state="visible", timeout=10000 # 超时10秒,可根据网络调整 ) # 提示出现,暂停脚本让你手动扫码 print("⚠️ 检测到天眼查扫码登录提示!") input("请打开天眼查APP扫码完成登录后,按回车键继续执行脚本...") except PlaywrightTimeoutError: # 超时未出现,说明无需扫码,直接继续 print("✅ 未检测到扫码登录提示,跳过扫码步骤") def _load_login_state(self) -> Optional[Dict]: """加载本地登录状态""" if os.path.exists(self.login_state_path): try: with open(self.login_state_path, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: print(f"⚠️ 天眼查登录状态文件损坏,将重新登录") os.remove(self.login_state_path) return None def _save_login_state(self) -> None: """保存登录状态""" if self.context: try: self.context.storage_state(path=self.login_state_path) # 同步保存 print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}") except Exception as e: print(f"⚠️ 天眼查登录状态保存失败:{e}") def init_browser(self, pw) -> bool: # ✅ 保持async """初始化天眼查浏览器""" try: self.pw = pw # ✅ 核心修改:直接使用传进来的 playwright 引擎,不再自己 start() # 启动防检测浏览器 self.browser = self.pw.chromium.launch( headless=PLAYWRIGHT_CONFIG["headless"], slow_mo=PLAYWRIGHT_CONFIG["slow_mo"], args=PLAYWRIGHT_CONFIG["browser_args"], ignore_default_args=["--enable-automation"], timeout=60000 ) # 加载登录状态或手动登录 login_state = self._load_login_state() if login_state: self.context = self.browser.new_context( viewport=None, # ✅ 设为None,适配最大化窗口 locale=PLAYWRIGHT_CONFIG["locale"], timezone_id=PLAYWRIGHT_CONFIG["timezone_id"], ignore_https_errors=True, storage_state=login_state # ✅ 加载已保存的登录状态 ) print("✅ 已加载天眼查本地登录状态") else: # ✅ 无登录状态:提示手动登录 self.context = self.browser.new_context( viewport=None, locale=PLAYWRIGHT_CONFIG["locale"], timezone_id=PLAYWRIGHT_CONFIG["timezone_id"], ignore_https_errors=True, ) # 初始化页面 self.page = self.context.new_page() # self.page.window_maximize() # ✅ 强制窗口最大化(兜底) self.page.set_default_timeout(PLAYWRIGHT_CONFIG['default_timeout']) self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG['navigation_timeout']) return True except Exception as e: print(f"❌ 天眼查浏览器初始化失败:{e}") self.close() # ✅ await关闭 return False def search_enterprise(self, enterprise_name: str) -> bool: """ 打开天眼查并搜索指定企业名 :param enterprise_name: 从OCR提取的企业名称 :return: 搜索是否成功 """ if not self.page: print("❌ 天眼查浏览器未初始化!") return False if not enterprise_name or enterprise_name.strip() == '': print("❌ 企业名称为空!无法搜索") return False try: #设置一个计数器,去往官网只运行一次 # 1. 打开天眼查首页(替换你指定的链接) print(f"\n📌 打开天眼查:https://www.tianyancha.com/") self.page.goto( "https://www.tianyancha.com/", wait_until="networkidle", timeout=30000 ) # input("天眼查登录") # 2. 定位天眼查搜索框(适配最新页面结构) # 搜索框selector:优先用placeholder匹配,兼容不同版本 # 先检查是否需要扫码登录 # self.check_scan_login_prompt() # ========== 核心:自动检测并处理登录 ========== # 定位「登录/注册」按钮(完全匹配你提供的HTML结构) login_button = self.page.locator( "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn", has_text="登录/注册" ).nth(0) try: # 等待按钮出现(最多10秒),如果出现说明未登录 login_button.wait_for(state="visible", timeout=10000) print("⚠️ 检测到未登录状态,正在点击「登录/注册」按钮...") login_button.click() # 点击按钮,唤起扫码登录弹窗 # 提示你手动扫码登录 print("\n🔔 请打开天眼查APP,扫描页面上的登录二维码,只有四十秒,登录完成后按回车键继续...") # 等待登录完成:等待「登录/注册」按钮消失(说明已成功登录) self.page.wait_for_selector( "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn", state="hidden", # 等待元素隐藏 timeout=40000 # 最多等30秒,给足扫码时间 ) print("✅ 扫码登录成功!") except PlaywrightTimeoutError: # 10秒内没找到「登录/注册」按钮 → 说明已经处于登录状态 print("✅ 检测到已登录状态,无需重复登录") print("\n⚠️ 请先完成天眼查登录!") # self.page = self.context.new_page() # ✅ await创建页面 # self.page.goto("https://www.tianyancha.com", timeout=30000) # ✅ await跳转 # input("请在浏览器中完成天眼查登录,登录后按回车继续...") self.context.storage_state(path=self.login_state_path) # ✅ await保存状态 print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}") search_locator = None try: # 优先定位:placeholder匹配 search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]') if search_locator.count() > 1: search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]').nth(1) else: search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]') # # 备用定位:ID匹配 # if not search_locator.count(): # search_locator = self.page.locator('input#header-company-search') # 等待搜索框加载(超时会触发TimeoutError) search_locator.wait_for(timeout=10000, state="visible") print("✅ 定位到天眼查搜索框") except PlaywrightTimeoutError: print(f"❌ 搜索框定位超时:页面加载过慢或搜索框元素不存在") return False except Exception as e: print(f"❌ 搜索框定位失败:{str(e)}") return False # 3. 清空搜索框 + 输入企业名 + 回车搜索 search_locator.click() search_locator.clear() print(f"📌 输入企业名:{enterprise_name}") # 模拟真人输入延迟 search_locator.fill(enterprise_name) self.page.wait_for_timeout(1000) # 推荐:直接用键盘回车触发搜索,这在大部分前端框架中最稳定 search_locator.press("Enter") print("🖱️ 已触发回车搜索") #点击搜索按钮 # search_btn = self.page.locator("button.50ab4.tyc-header-suggest-button_52bf6") # await search_btn.click() # 回车搜索 # 4. 等待搜索结果加载 self.page.wait_for_load_state("networkidle", timeout=20000) print(f"✅ 天眼查搜索完成!已搜索:{enterprise_name}") return True except PlaywrightTimeoutError: print(f"❌ 天眼查搜索超时(企业名:{enterprise_name})") return False except Exception as e: print(f"❌ 天眼查搜索异常:{e}") return False def get_enterprise_info(self) -> Dict: """ 从天眼查搜索结果页提取核心字段(可自定义字段) 返回:包含三个字段的字典(示例:法定代表人、注册资本、成立日期) """ enterprise_detail = { "tyc_company_name": "", # 公司名 "tyc_company_code": "", # 统一社会信用代码 "tyc_company_address": "" # 成立日期 } if not self.page: print("❌ 天眼查页面未初始化") return enterprise_detail try: # 等待详情页加载 # self.page.('div.company-header-container', timeout=8000) self.page.wait_for_timeout(timeout=4000) try: # 提取公司名,可能会出现很多个结果,但路径都一样,一般取第一个。 company_name_locator = self.page.locator("div.index_name__qEdWi span").nth(0) if company_name_locator.count(): company_name = company_name_locator.inner_text() enterprise_detail['tyc_company_name'] = company_name.strip() print(f"获取到公司名:{enterprise_detail['tyc_company_name']}") else: print(f"没有获取到企业名,网页路径有问题") except Exception as e: input("提取企业元素发生问题,检查一下") print(f"提取企业名时发生异常:{str(e)},网页路径或元素定位异常") enterprise_detail['tyc_company_name'] = "" try: # 提取统一社会信用代码 code_locator = self.page.locator("div.index_info-col__UVcZb.index_credit-code__kWuDZ span").nth(0) if code_locator.count(): code = code_locator.inner_text() enterprise_detail['tyc_company_code'] = code.strip() print(f"获取到企业信用代码:{enterprise_detail['tyc_company_code']}") else: print(f"没有获取到企业信用代码,网页路径有问题") except Exception as e: print(f"提取统一社会信用代码时发生异常:{str(e)},网页路径或元素定位异常") enterprise_detail['tyc_company_code'] = "" try: address_locator = self.page.locator("div.index_contact-col__7AboU.index_address__mHjQD .index_value__Pl0Nh").nth(0) if address_locator.count(): address = address_locator.inner_text() enterprise_detail['tyc_company_address'] = address.strip() print(f"获取到企业地址:{enterprise_detail['tyc_company_address']}") else: print(f"没有获取到企业地址,网页路径有问题") except Exception as e: print(f"提取企业地址时发生异常:{str(e)},网页路径或元素定位异常") enterprise_detail['tyc_company_address'] = "" print("\n📌 提取的企业核心信息:") print(f"公司名:{enterprise_detail['tyc_company_name']}") print(f"企业信用代码:{enterprise_detail['tyc_company_code']}") print(f"企业地址:{enterprise_detail['tyc_company_address']}") return enterprise_detail except Exception as e: print(f"❌ 提取企业信息失败:{e}") return enterprise_detail def close(self) -> None: """关闭浏览器""" if self.page: try: self.page.close() except Exception: pass if self.context: try: self.context.close() except Exception: pass if self.browser: try: self.browser.close() print("✅ 天眼查浏览器已关闭") except Exception: pass class PddLinkBrowser: """拼多多链接浏览器(支持登录持久化+图片下载+OCR识别)""" def __init__(self, login_state_path: str = PLAYWRIGHT_CONFIG["login_state_path"]): self.login_state_path = login_state_path self.browser = None self.context: Optional[BrowserContext] = None self.page = None # 初始化图片保存文件夹 self._init_image_dir() # 初始化百度OCR实例 self.ocr_client: Optional[BaiduOCR] = None self._init_ocr_client() # ========== 初始化天眼查浏览器 ========== self.tyc_browser = TianyanchaBrowser() def _init_ocr_client(self): """初始化百度OCR客户端""" api_key = BAIDU_OCR_CONFIG["api_key"] secret_key = BAIDU_OCR_CONFIG["secret_key"] if not api_key or not secret_key: print("⚠️ 未配置百度OCR的API_KEY/SECRET_KEY,将跳过OCR识别") return self.ocr_client = BaiduOCR(api_key=api_key, secret_key=secret_key) print("✅ 百度OCR客户端初始化完成") # ========== 检测手机登录框并等待手动登录 ========== def _check_login_box(self) -> bool: """ 检测是否出现「手机登录」框(div.phone-login 包含"手机登录"文本) :return: True=检测到并完成登录;False=未检测到登录框 """ if not self.page: print("❌ 页面未初始化,无法检测登录框") return False try: # 精准定位登录框元素:div.phone-login 下的 span(包含"手机登录"文本) login_locator = self.page.locator("div.phone-login span") # 等待元素可见(最多5秒,超时则认为无登录框) login_locator.wait_for(timeout=5000, state="visible") # 获取元素文本(包含伪元素的"手机登录") login_text = login_locator.inner_text().strip() if "手机登录" in login_text: print("\n⚠️ 检测到【手机登录】框,请手动完成登录!") input("登录完成后,请按回车键继续执行脚本...") # 登录后等待页面重新加载完成(确保登录状态生效) self.page.wait_for_load_state("networkidle", timeout=15000) print("✅ 登录已完成,继续处理当前商品") return True return False except PlaywrightTimeoutError: # 超时说明页面没有登录框,直接返回False return False except Exception as e: print(f"⚠️ 检测登录框时发生异常:{str(e)[:60]},继续执行") return False # ========== 登录检测方法结束 ========== # ========== 自定义向下滑动方法 ========== def _scroll_down(self, distance: int = 500, step: int = 50, interval: int =100): """ 分步向下滑动指定距离(解决图片懒加载,避免一次性跳转) :param distance: 总滑动距离(像素,默认500) :param step: 每次滑动的步长(像素,默认50,越小越平缓) :param interval: 每次滑动后的间隔时间(毫秒,默认100) """ if not self.page: print("❌ 浏览器页面未初始化,无法滑动") return # 容错处理:步长/总距离为非正数时直接返回 if step <= 0 or distance <= 0: print(f"⚠️ 无效的滑动参数(总距离:{distance},步长:{step}),跳过滑动") return try: remaining = distance # 剩余未滑动的距离 print(f"📝 开始分步滑动:总距离{distance}像素,每次滑{step}像素,间隔{interval}ms") while remaining > 0: current_step = min(step, remaining) self.page.evaluate(f"window.scrollBy(0, {current_step})") remaining -= current_step self.page.wait_for_timeout(interval) self.page.wait_for_timeout(2000) print(f"✅ 分步滑动完成,总滑动距离:{distance}像素") except Exception as e: print(f"⚠️ 分步滑动失败:{str(e)[:50]}") # ========== 滑动方法结束 ========== def _init_image_dir(self): """创建图片保存文件夹(不存在则创建)""" if not os.path.exists(IMAGE_CONFIG["save_dir"]): os.makedirs(IMAGE_CONFIG['save_dir']) print(f"✅ 图片保存文件夹已创建:{os.path.abspath(IMAGE_CONFIG['save_dir'])}") else: print(f"✅ 图片保存文件夹已存在:{os.path.abspath(IMAGE_CONFIG['save_dir'])}") def _get_image_filename(self, img_src: str, shop_name: str) -> str: """生成唯一的图片文件名(避免重复)""" # 提取原始文件名后缀(如.png/.jpg) parsed_url = urlparse(img_src) ext = os.path.splitext(parsed_url.path)[-1] or '.png' # 清洗店铺名(避免特殊字符) clean_shop = "".join([c for c in shop_name if c.isalnum() or c in ["_", "-"]])[:20] # 时间戳+店铺名+随机数,确保唯一 timestamp = str(int(time.time() * 1000)) filename = f"{clean_shop}_{timestamp}{ext}" return filename def _download_image(self, img_src: str, shop_name: str) -> Optional[str]: """ 下载图片到指定文件夹 :return: 成功返回保存路径,失败返回None """ if not img_src: print("⚠️ 图片链接为空,跳过下载") return None # 生成唯一文件名 filename = self._get_image_filename(img_src, shop_name) save_path = os.path.join(IMAGE_CONFIG["save_dir"], filename) # 下载重试逻辑 for retry in range(IMAGE_CONFIG["retry"] + 1): try: # 发送请求下载图片(添加headers模拟浏览器) headers = { "User-Agent": PLAYWRIGHT_CONFIG["browser_args"][-1].split("=")[1], "Referer": "https://www.pinduoduo.com/", "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8" # 新增:支持jpeg格式 } response = requests.get( img_src, headers=headers, timeout=IMAGE_CONFIG["timeout"], stream=True, # 流式下载,避免内存溢出 allow_redirects=True # 显式开启重定向(拼多多签名链接可能302) ) response.raise_for_status() # 抛出HTTP错误(4xx/5xx) # 保存图片到文件 with open(save_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # 校验文件是否保存成功 if os.path.getsize(save_path) > 0: print(f"✅ 图片下载成功:{save_path}") return save_path else: os.remove(save_path) # 删除空文件 print(f"⚠️ 图片下载为空,重试{retry+1}/{IMAGE_CONFIG['retry']}") except requests.exceptions.HTTPError as e: if e.response.status_code == 403: print(f"❌ 图片签名过期/无权限:{img_src[:50]}...") return None # 403无需重试,直接跳过 elif retry < IMAGE_CONFIG["retry"]: print(f"⚠️ HTTP错误(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{e}") time.sleep(1) continue print(f"❌ 图片下载失败:{e}") return None except Exception as e: if retry < IMAGE_CONFIG["retry"]: print(f"⚠️ 下载失败(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{str(e)[:50]}") time.sleep(1) continue print(f"❌ 图片最终下载失败:{str(e)[:50]}") return None def _process_ocr(self, image_path: str) -> Optional[Dict]: """OCR识别后立即调用天眼查搜索+提取企业信息""" """ 调用OCR识别并提取企业信息 :param image_path: 图片路径 :return: 企业信息字典 """ if not self.ocr_client: print("⚠️ OCR客户端未初始化,跳过识别") return None if not os.path.exists(image_path): print(f"❌ 图片文件不存在:{image_path}") return None # 调用OCR识别 ocr_result = self.ocr_client.general_ocr( image_path=image_path, scale=BAIDU_OCR_CONFIG["scale"] ) print(f"识别结果{ocr_result}") if not ocr_result: return None # 提取企业信息 enterprise_info = self.ocr_client.extract_enterprise_info(ocr_result) print("\n📌 提取的企业信息:") enterprise_name = enterprise_info.get("enterprise_name", "") if not enterprise_name: print("⚠️ 未提取到企业名称,跳过天眼查") return enterprise_info # 2. 调用天眼查搜索+提取字段 if self.tyc_browser.search_enterprise(enterprise_name): # 提取三个核心字段 tyc_info = self.tyc_browser.get_enterprise_info() # 合并OCR结果和天眼查字段 enterprise_info.update(tyc_info) print("\n📌 最终整合结果:") print(json.dumps(enterprise_info, ensure_ascii=False, indent=4)) return enterprise_info def _load_login_state(self) -> Optional[Dict]: """加载本地登录状态""" if os.path.exists(self.login_state_path): try: with open(self.login_state_path, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: print(f"⚠️ 登录状态文件损坏:{self.login_state_path},将重新登录") os.remove(self.login_state_path) return None def _save_login_state(self) -> None: """保存登录状态到本地""" if self.context: try: self.context.storage_state(path=self.login_state_path) print(f"✅ 登录状态已保存到:{self.login_state_path}") except Exception as e: print(f"⚠️ 保存登录状态失败:{e}") def init_browser(self) -> bool: """初始化浏览器(加载登录状态/提示登录)""" try: # ✅ 核心修改 1:全局只启动【一次】 Playwright 引擎!存到 self.pw 中 self.pw = sync_playwright().start() # ✅ 核心修改 2:把启动好的引擎传给天眼查去用 self.tyc_browser.init_browser(self.pw) # 启动优化后的浏览器 # ✅ 核心修改 3:拼多多也用这同一个引擎启动浏览器 self.browser = self.pw.chromium.launch( headless=PLAYWRIGHT_CONFIG["headless"], slow_mo=PLAYWRIGHT_CONFIG["slow_mo"], args=PLAYWRIGHT_CONFIG["browser_args"], ignore_default_args=["--enable-automation"], # 隐藏自动化标识 timeout=60000 ) # 加载登录状态或创建新上下文 login_state = self._load_login_state() if login_state: self.context = self.browser.new_context( viewport=PLAYWRIGHT_CONFIG["viewport"], locale=PLAYWRIGHT_CONFIG["locale"], timezone_id=PLAYWRIGHT_CONFIG["timezone_id"], ignore_https_errors=True, storage_state=login_state # 加载登录状态 ) print("✅ 已加载本地登录状态") else: self.context = self.browser.new_context( viewport=PLAYWRIGHT_CONFIG["viewport"], locale=PLAYWRIGHT_CONFIG["locale"], timezone_id=PLAYWRIGHT_CONFIG["timezone_id"], ignore_https_errors=True ) print("\n⚠️ 未检测到登录状态,请先完成拼多多登录!") self.page = self.context.new_page() self.page.goto("https://www.pinduoduo.com", timeout=30000) input("请在浏览器中完成登录,登录后按回车继续...") self.context.storage_state(path=self.login_state_path) # ✅ 加await保存状态 # 初始化页面 self.page = self.context.new_page() self.page.set_default_timeout(PLAYWRIGHT_CONFIG["default_timeout"]) self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG["navigation_timeout"]) return True except Exception as e: print(f"❌ 浏览器初始化失败:{e}") self.close() # ✅ 核心修复:加await return False def open_links(self, goods_data: List[Dict], db_reader: DBGoodsReader) -> List[Dict]: """依次打开商品链接(支持店名过滤+图片下载+OCR识别+天眼查搜索提取)""" if not self.page: print("❌ 浏览器未初始化") return [] total = len(goods_data) if total == 0: print("⚠️ 无商品链接可处理") return [] print(f"\n📋 共待处理 {total} 条商品链接") # 收集所有抓取到的结果(可选,如果想最后统一保存的话) all_results = [] for idx, item in enumerate(goods_data, 1): shop = item.get("shop", "未知店铺").strip() link = (item.get("product_link") or "").strip() if not link: print(f"\n⚠️ 第{idx}/{total}条:店铺【{shop}】链接为空,跳过") continue print(f"\n{'=' * 15} 第 {idx}/{total} 条 {'=' * 15}") print(f"🏪 数据库店名:{shop}") print(f"🔗 商品链接:{link}") # ========== 判断店名是否包含“旗舰店” ========== if "旗舰店" not in shop: print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称不含“旗舰店”,跳过") #涉及突破滑块验证的部分了。 continue else: print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称包含“旗舰店”,打开商品链接") try: # 1. 打开商品链接 self.page.goto( link, wait_until="load", timeout=PLAYWRIGHT_CONFIG["navigation_timeout"] ) self.page.wait_for_load_state("networkidle", timeout=15000) print(f"✅ 页面加载成功:{self.page.title()}...") # 检测登录框 self._check_login_box() #如果已售罄,不跳过。 # sold_out_locator = self.page.locator("") # if sold_out_locator.count() > 0 : # print("该商品已售罄,跳过这次采集") # # 2. 店名匹配判断 page_shop_locator = self.page.locator("div.BAq4Lzv7") try: page_shop_locator.wait_for(timeout=5000) page_shop_text = (page_shop_locator.inner_text()).strip().lower() except PlaywrightTimeoutError: print(f"❌ 未找到页面店名元素,可能页面结构改变或被风控,跳过") continue db_shop_text = shop.lower() print(f"🏪 页面元素店铺名:{page_shop_text}") if page_shop_text != db_shop_text: print(f"❌ 店名不匹配(数据库:{db_shop_text} | 页面:{page_shop_text}),跳过") self.page.wait_for_timeout(2000) continue print(f"✅ 店名匹配成功!") # 自定义滑动距离,触发图片懒加载 self._scroll_down(distance=2100) # ========== 获取图片src并下载 ========== # shop_name = 'pdd_shop_info_middle_back' final_enterprise_info = None try: # ========== 原定位策略(优先使用) ========== img_locators = self.page.locator("img[role='img'][aria-label='查看图片']") img_count = img_locators.count() # ========== 原定位不足时,切换到备用定位 ========== if img_count < 2: print(f"⚠️ 原定位仅匹配到{img_count}个图片,尝试备用定位(拼多多懒加载图片)...") input("请手动检查页面图片元素,按回车继续...") continue # 备用定位:匹配截图里的「pdd-lazy-image」类资质图片(带水印的营业执照) # backup_img_locators = self.page.locator( # "img.pdd-lazy-image.loaded" # 精准匹配已加载的懒加载图片 # ) # # backup_count = backup_img_locators.count() # # if backup_count >= 2: # img_locators = backup_img_locators # img_count = backup_count # print(f"✅ 备用定位生效,匹配到图片元素:{img_count} 个") # else: # print(f"⚠️ 原定位({img_count}个) + 备用定位({backup_count}个)均不足2个,跳过下载") # input("请手动检查页面图片元素,按回车继续...") # continue # 跳过当前店铺,避免卡死 print(f"📸 匹配到图片元素:{img_count} 个") # 3. 定位第二个元素 target_img_locator = img_locators.nth(1) target_img_locator.wait_for(timeout=5000, state="visible") # 4. 获取第二个图片的src img_src = target_img_locator.get_attribute("src") if img_src: print(f"🖼️ 第2个图片 src:{img_src[:80]}...") image_path = self._download_image(img_src, shop) if image_path: # ========== 核心:调用OCR并获取最终的天眼查数据 ========== final_enterprise_info = self._process_ocr(image_path) else: print(f"⚠️ 第2个图片的src为空") except Exception as e: print(f"❌ 获取图片/识别失败:{str(e)[:100]}") # 3. 收集数据并自动循环 if final_enterprise_info: # 将原数据库的店名也塞进去,方便后续入库对比 print(f"天眼查---查出来的数据为{final_enterprise_info}") # final_enterprise_info['pdd_shop_name'] = shop all_results.append(final_enterprise_info) # 获取到的数据回填数据库 update_success = db_reader.insert_enterprise_info( shop_name=shop, enterprise_info=final_enterprise_info, # 直接传入天眼查返回的字典 ) if update_success: print(f"✅ 店铺[{shop}]数据回填成功") else: print(f"❌ 店铺[{shop}]数据回填失败") print(f"\n🎉 成功获取数据,准备进入下一条...") else: print(f"\n⚠️ 本条未获取到有效企业信息,准备进入下一条...") self.page.wait_for_timeout(5000) except PlaywrightTimeoutError: print(f"⏰ 页面加载/元素定位超时:{link}") input("排查问题") continue except Exception as e: print(f"❌ 第{idx}条处理异常:{str(e)[:100]}...,跳过") continue return all_results def close(self) -> None: """关闭浏览器(异步版,补全所有await)""" # 先关闭天眼查浏览器 if hasattr(self, 'tyc_browser') and self.tyc_browser: self.tyc_browser.close() if hasattr(self, 'pw') and self.pw: try: self.pw.stop() print("✅ Playwright 驱动已彻底停止") except Exception: pass # 关闭拼多多浏览器 if self.page: try: self.page.close() except Exception: pass if self.context: try: self.context.close() except Exception: pass if self.browser: try: self.browser.close() print("✅ 拼多多浏览器已关闭") except Exception: pass def main(): """主函数:整合数据库读取+链接浏览""" # 1. 读取环境变量并补全默认值 db_config = { "host": os.getenv("DB_HOST", DEFAULT_DB_CONFIG["host"]), "port": int(os.getenv("DB_PORT", DEFAULT_DB_CONFIG["port"])), "user": os.getenv("DB_USERNAME", DEFAULT_DB_CONFIG["user"]), "password": os.getenv("DB_PASSWORD", DEFAULT_DB_CONFIG["password"]), "db_name": os.getenv("DB_DATABASE", DEFAULT_DB_CONFIG["db_name"]), "table_name": os.getenv("DB_TABLENAME", DEFAULT_DB_CONFIG["table_name"]) } # 2. 初始化数据库读取器 db_reader = DBGoodsReader( host=db_config["host"], port=db_config["port"], user=db_config["user"], password=db_config["password"], db_name=db_config["db_name"] ) if not db_reader.connect_db(): return # 3. 读取商品链接 goods_data = db_reader.get_shop_and_goods() # 预览前5条数据 if goods_data: print("\n📌 数据预览(前5条):") for idx, item in enumerate(goods_data[:5], 1): print(f"第{idx}条 | 店铺:{item['shop'][:20]} | 链接:{item['product_link'][:50]}...") # 4. 初始化浏览器并打开链接 # 初始化拼多多浏览器 pdd_browser = PddLinkBrowser() if not pdd_browser.init_browser(): return # 接收返回的所有提取结果 extracted_data = pdd_browser.open_links(goods_data, db_reader) # 打印最终统计 print(f"\n📊 爬取任务结束,共成功提取 {len(extracted_data)} 条企业信息!") if extracted_data: # 这里你可以将 extracted_data 写入数据库,或者存为 json/csv # 例如打印第一条看看: print("💡 最终数据示例:", json.dumps(extracted_data[0], ensure_ascii=False, indent=2)) pdd_browser.close() db_reader.close_db() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n⚠️ 程序被用户中断") except Exception as e: print(f"\n❌ 程序运行出错:{e}")