get_pdd_company_name.py 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515
  1. import os
  2. import json
  3. from dotenv import load_dotenv
  4. import pymysql
  5. from typing import List, Dict, Optional
  6. import time
  7. from playwright.sync_api import (
  8. sync_playwright,
  9. TimeoutError as PlaywrightTimeoutError,
  10. BrowserContext
  11. )
  12. import requests
  13. from urllib.parse import urlparse
  14. # 补充百度OCR所需依赖
  15. import base64
  16. from PIL import Image
  17. import io
  18. from pathlib import Path
  19. import asyncio
  20. env_path = Path(__file__).parent / ".env.process"
  21. if not env_path.is_file():
  22. raise FileNotFoundError(f"配置文件缺失: {env_path}")
  23. else:
  24. print(f"配置文件加载成功: {env_path}")
  25. # 加载环境变量
  26. load_dotenv(env_path)
  27. # ===================== 全局常量配置(集中管理,方便修改)=====================
  28. # 数据库默认配置
  29. DEFAULT_DB_CONFIG = {
  30. "host": "localhost",
  31. "port": 3306,
  32. "user": "root",
  33. "password": "",
  34. "db_name": "",
  35. "table_name": ""
  36. }
  37. # Playwright配置
  38. PLAYWRIGHT_CONFIG = {
  39. "headless": False,
  40. "slow_mo": 300,
  41. "browser_args": [
  42. "--start-maximized",
  43. "--disable-blink-features=AutomationControlled", # 核心防检测
  44. "--no-sandbox", # 兼容Windows/Linux
  45. "--disable-dev-shm-usage", # 解决内存不足
  46. "--disable-popup-blocking", # 禁用弹窗拦截
  47. "--disable-extensions", # 禁用扩展
  48. "--disable-gpu", # 禁用GPU加速
  49. "--lang=zh-CN,zh", # 中文语言
  50. "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
  51. ],
  52. "viewport": {"width": 2050, "height": 1200},
  53. "locale": "zh-CN",
  54. "timezone_id": "Asia/Shanghai",
  55. "default_timeout": 15000,
  56. "navigation_timeout": 30000,
  57. "login_state_path": "pdd_login_state.json", # 登录状态持久化文件
  58. "tianyancha_login_state": "tianyancha_login_state.json" # 天眼查登录状态保存路径
  59. }
  60. # 百度OCR配置
  61. BAIDU_OCR_CONFIG = {
  62. "api_key": os.getenv('APP_KEY'),
  63. "secret_key": os.getenv('APP_SECRET'),
  64. "scale": 1.5 # OCR图片放大倍数
  65. }
  66. # 图片保存配置
  67. IMAGE_CONFIG = {
  68. "save_dir": "pdd_goods_images", # 独立文件夹名(项目根目录下)
  69. "timeout": 10, # 图片下载超时时间(秒)
  70. "retry": 1 # 下载失败重试次数
  71. }
  72. # 1. 日期变量赋值SQL
  73. SET_DATE_SQL = "SET @date_constant = CURDATE();"
  74. # 2. 核心查询SQL
  75. # QUERY_SQL = """
  76. # SELECT
  77. # product,
  78. # shop,
  79. # product_link,
  80. # scrape_date,
  81. # business_license_company,
  82. # search_key
  83. # FROM (
  84. # SELECT
  85. # pd.product,
  86. # pd.shop,
  87. # pd.product_link,
  88. # pd.scrape_date,
  89. # psi.business_license_company,
  90. # pd.search_key,
  91. # ROW_NUMBER() OVER (PARTITION BY pd.shop ORDER BY pd.search_key ASC) AS rn
  92. # FROM pdd_drug_middle pd
  93. # LEFT JOIN pdd_shop_info_middle psi ON psi.shop = pd.shop
  94. # WHERE pd.scrape_date = @date_constant
  95. # AND psi.business_license_company IS NULL
  96. # ) AS sub
  97. # WHERE rn = 1
  98. # ORDER BY search_key;
  99. # """
  100. QUERY_SQL = """
  101. SELECT
  102. *
  103. FROM
  104. pdd_shop_info_middle
  105. WHERE scrape_date = @date_constant
  106. """
  107. # ===================== 百度OCR类(完整整合)=====================
  108. class BaiduOCR:
  109. """百度 OCR 文字识别封装类"""
  110. def __init__(self, api_key: str, secret_key: str):
  111. """
  112. 初始化百度 OCR
  113. :param api_key: 百度智能云应用的 API Key
  114. :param secret_key: 百度智能云应用的 Secret Key
  115. """
  116. self.api_key = api_key
  117. self.secret_key = secret_key
  118. self.access_token: Optional[str] = None
  119. # 获取 access_token(有效期30天,建议缓存)
  120. self._get_access_token()
  121. def _get_access_token(self) -> bool:
  122. """
  123. 获取百度 OCR 的 access_token(有效期30天)
  124. :return: 是否获取成功
  125. """
  126. url = "https://aip.baidubce.com/oauth/2.0/token"
  127. params = {
  128. "grant_type": "client_credentials",
  129. "client_id": self.api_key,
  130. "client_secret": self.secret_key
  131. }
  132. try:
  133. response = requests.post(url, params=params, timeout=10)
  134. response.raise_for_status()
  135. result = response.json()
  136. if "access_token" in result:
  137. self.access_token = result["access_token"]
  138. print(f"✅ 成功获取 access_token:{self.access_token[:20]}...")
  139. return True
  140. else:
  141. print(f"❌ 获取 access_token 失败:{result}")
  142. return False
  143. except Exception as e:
  144. print(f"❌ 获取 access_token 异常:{e}")
  145. return False
  146. def _enlarge_and_crop_image(self, image_path: str, scale: float=1.5, crop_ratio: float=0.5) -> bytes:
  147. """
  148. 先裁剪图片上半部分(保留有效内容),再放大图片(解决OCR尺寸错误)
  149. :param image_path: 原图路径
  150. :param scale: 放大倍数(推荐1.5~3.0)
  151. :param crop_ratio: 裁剪比例(0.5=保留上50%,0.6=保留上60%,可根据图片调整)
  152. :return: 处理后的图片二进制数据
  153. """
  154. try:
  155. with Image.open(image_path) as img:
  156. # ========== 步骤1:裁剪上半部分(核心修复) ==========
  157. # 计算裁剪区域:左=0,上=0,右=原图宽度,下=原图高度×裁剪比例
  158. crop_box = (
  159. 0, # 左边界
  160. 0, # 上边界
  161. img.width, # 右边界
  162. int(img.height * crop_ratio) # 下边界(只保留上半部分)
  163. )
  164. img_cropped = img.crop(crop_box) # 执行裁剪
  165. print(f"✅ 图片裁剪完成:保留上{int(crop_ratio * 100)}%区域,尺寸={img_cropped.size}")
  166. # ========== 步骤2:放大裁剪后的图片 ==========
  167. new_width = int(img_cropped.width * scale)
  168. new_height = int(img_cropped.height * scale)
  169. # 高质量放大(Lanczos算法,最清晰)
  170. img_resized = img_cropped.resize(
  171. (new_width, new_height),
  172. Image.Resampling.LANCZOS
  173. )
  174. # ========== 仅5行,强制缩到4096×4096以内(核心微调) ==========
  175. MAX_OCR_SIZE = 4096 # 百度OCR最大允许宽度/高度
  176. if img_resized.width > MAX_OCR_SIZE:
  177. ratio = MAX_OCR_SIZE / img_resized.width # 计算缩放比例
  178. img_resized = img_resized.resize(
  179. (MAX_OCR_SIZE, int(img_resized.height * ratio)),
  180. Image.Resampling.LANCZOS
  181. )
  182. if img_resized.mode == 'RGBA':
  183. # 创建白色背景的RGB画布
  184. rgb_img = Image.new('RGB', img_resized.size, (255, 255, 255))
  185. # 将RGBA图片粘贴到RGB画布(透明区域显示白色)
  186. rgb_img.paste(img_resized, mask=img_resized.split()[3]) # mask=alpha通道
  187. img_resized = rgb_img
  188. # # ========== 保存处理后图片到本地 ==========
  189. # # 1. 确保pdd_goods_images文件夹存在(不存在则创建)
  190. # save_dir = "pdd_goods_images"
  191. # if not os.path.exists(save_dir):
  192. # os.makedirs(save_dir)
  193. # # 2. 提取原图片文件名(比如从image_path中拿到"鸿祥堂大药房旗舰店_1773649991220.jpeg")
  194. # file_name = os.path.basename(image_path)
  195. # # 3. 拼接保存路径
  196. # save_path = os.path.join(save_dir, file_name)
  197. # # 4. 保存图片到本地(质量和OCR用的一致)
  198. # img_resized.save(save_path, format='JPEG', quality=95)
  199. # print(f"✅ 处理后图片已保存到:{save_path}")
  200. # # ======================================================
  201. # 保存到内存(不生成本地文件)
  202. img_byte_arr = io.BytesIO()
  203. # 保存为 JPG,保证清晰度
  204. img_resized.save(img_byte_arr, format='JPEG', quality=95)
  205. img_byte_arr = img_byte_arr.getvalue()
  206. # 校验文件大小(超4MB则再次压缩)
  207. file_size = len(img_byte_arr) / 1024 / 1024 # 转MB
  208. if file_size > 4:
  209. print(f"⚠️ 文件超4MB({file_size:.2f}MB),二次压缩...")
  210. img_byte_arr = io.BytesIO()
  211. img_resized.save(img_byte_arr, format='JPEG', quality=70, optimize=True)
  212. img_byte_arr = img_byte_arr.getvalue()
  213. # 打印最终尺寸(方便调试)
  214. print(f"✅ 图片放大完成:最终尺寸={img_resized.size}")
  215. return img_byte_arr
  216. except Exception as e:
  217. print(f"❌ 图片裁剪/放大失败:{str(e)}")
  218. return b''
  219. def general_ocr(self, image_path: str, scale: float = 1.5) -> Optional[Dict]:
  220. """
  221. 调用百度通用文字识别(支持图片放大)
  222. :param image_path: 本地图片路径
  223. :param scale: 放大倍数,默认2倍
  224. :return: OCR识别结果
  225. """
  226. if not self.access_token:
  227. print("❌ access_token 无效,请先初始化")
  228. return None
  229. try:
  230. """
  231. 百度OCR通用识别(整合裁剪+放大)
  232. """
  233. # 替换原放大逻辑为「裁剪+放大」
  234. image_data = self._enlarge_and_crop_image(image_path, scale=scale, crop_ratio=0.5)
  235. if not image_data:
  236. print("❌ 图片处理失败,无法识别")
  237. return {}
  238. image_base64 = base64.b64encode(image_data).decode("utf-8")
  239. except Exception as e:
  240. print(f"❌ 图片放大/读取失败:{e}")
  241. return None
  242. # 调用 OCR 接口
  243. url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={self.access_token}"
  244. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  245. data = {"image": image_base64}
  246. try:
  247. response = requests.post(url, headers=headers, data=data, timeout=10)
  248. response.raise_for_status()
  249. result = response.json()
  250. if "words_result" in result:
  251. print(f"✅ 识别成功,共识别到 {len(result['words_result'])} 行文字")
  252. return result
  253. else:
  254. print(f"❌ 识别失败:{result}")
  255. return None
  256. except Exception as e:
  257. print(f"❌ 调用 OCR 接口异常:{e}")
  258. return None
  259. def extract_enterprise_info(self, ocr_result: Dict) -> Dict:
  260. """
  261. 从OCR识别结果中提取企业名称和社会信用代码
  262. :param ocr_result: general_ocr 返回的识别结果字典
  263. :return: 包含企业名称和社会信用代码的JSON格式字典
  264. 格式:{"enterprise_name": "企业名称", "credit_code": "社会信用代码"}
  265. """
  266. # 初始化返回结果(默认空值)
  267. enterprise_info = {
  268. "enterprise_name": "",
  269. "credit_code": ""
  270. }
  271. if not ocr_result or "words_result" not in ocr_result:
  272. print("❌ OCR识别结果为空,无法提取企业信息")
  273. return enterprise_info
  274. all_text_lines = []
  275. # 遍历所有识别的文字行,匹配关键词
  276. for item in ocr_result["words_result"]:
  277. line_text = item["words"].strip() # 去除首尾空格
  278. if line_text and line_text not in all_text_lines: # 去空+去重
  279. all_text_lines.append(line_text)
  280. print(f"📝 OCR识别的有效行:{all_text_lines}")
  281. # ==================== 1. 提取并清洗企业名称 ====================
  282. enterprise_name = ""
  283. # 名称匹配关键词(覆盖所有场景)
  284. name_keywords = ["企业名称", "名称:", "名:", "称:"]
  285. # 常见错别字修正映射
  286. name_correction = {
  287. "人药房": "大药房",
  288. "有松司": "有限公司",
  289. "松司": "公司",
  290. "关药房": "大药房"
  291. }
  292. for idx, line_text in enumerate(all_text_lines):
  293. # 场景1:包含"企业名称"(处理括号冗余,如"企业名称(名称xxx")
  294. if "企业名称" in line_text:
  295. # 移除所有括号及内部内容,再提取名称
  296. import re
  297. # 正则移除括号(()/())及内容
  298. clean_line = re.sub(r'\([^)]*\)|\([^)]*\)', '', line_text)
  299. # 提取"企业名称"后的所有内容
  300. name_part = clean_line.split("企业名称")[-1].strip()
  301. # 若还有"名称"前缀,继续拆分
  302. if "名称" in name_part:
  303. name_part = name_part.split("名称")[-1].strip()
  304. enterprise_name = name_part
  305. break
  306. # 场景2:单行包含"名称:"/"名:"/"称:"
  307. elif any(key in line_text for key in ["名称:", "名:", "称:"]):
  308. name_part = line_text.split(":")[-1].strip() if ":" in line_text else line_text.split(":")[-1].strip()
  309. enterprise_name = name_part
  310. break
  311. # 场景3:跨行拆分(前一行是"名",当前行以"称:"开头)
  312. elif idx > 0 and all_text_lines[idx - 1] == "名" and line_text.startswith("称:"):
  313. name_part = line_text.split(":")[-1].strip()
  314. enterprise_name = name_part
  315. break
  316. # 清洗企业名称:修正错别字、移除多余空格
  317. if enterprise_name:
  318. for wrong, right in name_correction.items():
  319. enterprise_name = enterprise_name.replace(wrong, right)
  320. # 步骤2:移除开头/结尾的冒号(中文+英文)、空格、特殊符号
  321. enterprise_name = enterprise_name.strip(":: \t\n\r")
  322. # 步骤3:移除中间多余空格
  323. enterprise_name = enterprise_name.replace(" ", "") # 移除所有空格
  324. # ==================== 2. 提取并清洗社会信用代码 ====================
  325. credit_code = ""
  326. # 信用代码匹配关键词(兼容错别字+多格式)
  327. code_keywords = ["社会信用代码:", "统一社会信用代码:", "社会震用代码:"]
  328. for line_text in all_text_lines:
  329. # 匹配任意关键词
  330. for keyword in code_keywords:
  331. if keyword in line_text:
  332. code_part = line_text.split(keyword)[-1].strip()
  333. credit_code = code_part.replace(" ", "") # 移除空格(如"91360105 MAEGBDKMXF")
  334. break
  335. if credit_code: # 找到后跳出循环
  336. break
  337. # ==================== 赋值并打印结果 ====================
  338. enterprise_info["enterprise_name"] = enterprise_name
  339. enterprise_info["credit_code"] = credit_code
  340. # enterprise_info["address"] = address
  341. # 打印提取结果
  342. if enterprise_name:
  343. print(f"✅ 提取到企业名称:{enterprise_name}")
  344. else:
  345. print("⚠️ 未识别到企业名称字段")
  346. if credit_code:
  347. print(f"✅ 提取到社会信用代码:{credit_code}")
  348. else:
  349. print("⚠️ 未识别到社会信用代码字段")
  350. return enterprise_info
  351. # ===================== 数据库读取类 =====================
  352. class DBGoodsReader:
  353. """数据库商品链接读取器"""
  354. def __init__(
  355. self,
  356. host: str = DEFAULT_DB_CONFIG["host"],
  357. port: int = DEFAULT_DB_CONFIG["port"],
  358. user: str = DEFAULT_DB_CONFIG["user"],
  359. password: str = DEFAULT_DB_CONFIG["password"],
  360. db_name: str = DEFAULT_DB_CONFIG["db_name"],
  361. charset: str = "utf8mb4"
  362. ):
  363. self.host = host
  364. self.port = port
  365. self.user = user
  366. self.password = password
  367. self.db_name = db_name
  368. self.charset = charset
  369. self.conn: Optional[pymysql.connections.Connection] = None
  370. self.cursor: Optional[pymysql.cursors.DictCursor] = None
  371. def connect_db(self) -> bool:
  372. """连接数据库(带重试机制)"""
  373. max_retry = 2
  374. for retry in range(max_retry + 1):
  375. try:
  376. self.conn = pymysql.connect(
  377. host=self.host,
  378. port=self.port,
  379. user=self.user,
  380. password=self.password,
  381. database=self.db_name,
  382. charset=self.charset,
  383. cursorclass=pymysql.cursors.DictCursor,
  384. connect_timeout=10 # 连接超时
  385. )
  386. self.cursor = self.conn.cursor()
  387. print(f"✅ 成功连接数据库:{self.db_name}")
  388. return True
  389. except pymysql.MySQLError as e:
  390. if retry < max_retry:
  391. print(f"❌ 数据库连接失败(重试{retry + 1}/{max_retry}):{e}")
  392. time.sleep(1)
  393. continue
  394. print(f"❌ 数据库连接最终失败:{e}")
  395. return False
  396. def get_shop_and_goods(self) -> List[Dict]:
  397. """
  398. 读取待补充企业信息的店铺数据(每个店铺仅取1条)
  399. 返回:包含product/shop/store_url等字段的字典列表
  400. """
  401. if not self.conn or not self.cursor:
  402. print("❌ 未连接数据库,请先调用 connect_db()")
  403. return []
  404. try:
  405. # 步骤1:执行日期变量赋值
  406. self.cursor.execute(SET_DATE_SQL)
  407. # 步骤2:执行核心查询
  408. self.cursor.execute(QUERY_SQL)
  409. # 步骤3:获取结果(DictCursor返回字典格式,字段名对应SQL列名)
  410. results = self.cursor.fetchall()
  411. print(f"✅ 成功读取 {len(results)} 条待补充企业信息的店铺数据")
  412. return results
  413. except pymysql.MySQLError as e:
  414. print(f"❌ 读取数据失败:{e}")
  415. return []
  416. def _get_next_id(self) -> int:
  417. """获取表中最大ID并+1,用于生成新插入数据的ID(若ID非自增)"""
  418. try:
  419. sql = "SELECT IFNULL(MAX(id), 0) + 1 AS next_id FROM pdd_shop_info_middle"
  420. self.cursor.execute(sql)
  421. result = self.cursor.fetchone()
  422. next_id = result.get("next_id", 9078) # 默认初始值9078(兼容示例)
  423. print(f"✅ 获取到下一个可用ID:{next_id}")
  424. return next_id
  425. except pymysql.MySQLError as e:
  426. print(f"❌ 获取自增ID失败,使用默认值9078:{e}")
  427. return 9078
  428. def insert_enterprise_info(self, shop_name: str, enterprise_info: Dict) -> bool:
  429. """
  430. 向pdd_shop_info_middle表插入企业信息
  431. :param shop_name: 店铺名称(关联表的shop字段)
  432. :param enterprise_info: 包含tyc_company_name/tyc_company_code/tyc_company_address的字典
  433. :return: 插入是否成功
  434. """
  435. if not self.conn or not self.cursor:
  436. print("❌ 未连接数据库,请先调用 connect_db()")
  437. return False
  438. if not shop_name:
  439. print("❌ 店铺名称为空,无法更新")
  440. return False
  441. business_company_name = enterprise_info.get("tyc_company_name", "").strip()
  442. qualification_number = enterprise_info.get("tyc_company_code", "").strip()
  443. contact_address = enterprise_info.get("tyc_company_address", "").strip()
  444. business_license_address = contact_address # 两个地址字段都用同一个值
  445. # 空值校验提示
  446. empty_fields = []
  447. if not business_company_name:
  448. empty_fields.append("企业名称")
  449. if not qualification_number:
  450. empty_fields.append("统一信用代码")
  451. if not contact_address:
  452. empty_fields.append("企业地址")
  453. if empty_fields:
  454. print(f"⚠️ 店铺[{shop_name}]以下字段为空:{','.join(empty_fields)},仍继续插入(空值)")
  455. # 生成插入ID(若表ID为自增主键,可删除ID相关逻辑,SQL中也去掉id字段)
  456. next_id = self._get_next_id()
  457. insert_sql = """
  458. INSERT INTO `test2`.`pdd_shop_info_middle` (
  459. `id`,
  460. `shop`,
  461. `contact_address`,
  462. `qualification_number`,
  463. `business_license_company`,
  464. `business_license_address`,
  465. `store_url`,
  466. `scrape_date`,
  467. `platform`,
  468. `province`,
  469. `city`,
  470. `create_time`,
  471. `update_time`
  472. ) VALUES (%s, %s, %s, %s, %s, %s, %s, CURDATE(), '拼多多', '', '', NOW(), NOW())
  473. """
  474. # 组装插入参数
  475. insert_params = [
  476. next_id,
  477. shop_name,
  478. contact_address,
  479. qualification_number,
  480. business_company_name,
  481. business_license_address
  482. ]
  483. try:
  484. # 执行插入
  485. self.cursor.execute(insert_sql, insert_params)
  486. self.conn.commit()
  487. # 检查影响行数
  488. affected_rows = self.cursor.rowcount
  489. if affected_rows > 0:
  490. print(f"✅ 店铺[{shop_name}]成功插入1条数据(ID:{next_id})")
  491. print(f" 插入内容:企业名称={business_company_name} | 信用代码={qualification_number} | 地址={contact_address}")
  492. return True
  493. else:
  494. print(f"⚠️ 店铺[{shop_name}]插入0行数据,无数据变更")
  495. return False
  496. except pymysql.MySQLError as e:
  497. print(f"❌ 店铺[{shop_name}]插入失败:{e}")
  498. self.conn.rollback() # 回滚事务
  499. return False
  500. except Exception as e:
  501. print(f"❌ 店铺[{shop_name}]插入异常:{e}")
  502. self.conn.rollback()
  503. return False
  504. def close_db(self) -> None:
  505. """安全关闭数据库连接"""
  506. if self.cursor:
  507. try:
  508. self.cursor.close()
  509. except Exception:
  510. pass
  511. if self.conn:
  512. try:
  513. self.conn.close()
  514. print("✅ 数据库连接已关闭")
  515. except Exception:
  516. pass
  517. # ===================== 天眼查浏览器类 =====================
  518. class TianyanchaBrowser:
  519. """天眼查浏览器:自动打开、登录、搜索企业名"""
  520. def __init__(self):
  521. self.pw = None
  522. self.browser = None
  523. self.context: Optional[BrowserContext] = None
  524. self.page = None
  525. self.login_state_path = PLAYWRIGHT_CONFIG["tianyancha_login_state"]
  526. def check_scan_login_prompt(self):
  527. """
  528. 检查是否出现「扫码登录」提示,若出现则暂停并提示手动扫码
  529. """
  530. try:
  531. # 定位扫码登录提示文本(结合父div,避免误匹配其他页面文本)
  532. scan_prompt_locator = self.page.locator(
  533. "div.scan-title",
  534. has_text="扫码登录 更快 更安全"
  535. )
  536. scan_prompt_locator.wait_for(
  537. state="visible",
  538. timeout=10000 # 超时10秒,可根据网络调整
  539. )
  540. # 用账密登录
  541. login_btn_locator = self.page.locator('.login-toggle.-scan')
  542. login_btn_locator.wait_for(state="visible", timeout=5000)
  543. # 现在可以使用该定位器进行操作
  544. login_btn_locator.click()
  545. #密码登录
  546. enter_secret_locator = self.page.locator('.title-password')
  547. enter_secret_locator.click()
  548. #点击手机号,自定义输入手机号
  549. self.page.locator('input[placeholder="手机号"]').fill('13809841368')
  550. #点击密码,自定义输入密码
  551. self.page.locator('input[placeholder="登录密码"]').fill('')
  552. #登录
  553. self.page.get_by_text("登录").last.click()
  554. #同意协议
  555. self.page.get_by_text("我已阅读并同意").click()
  556. # 提示出现,暂停脚本让你手动扫码
  557. print("⚠️ 检测到天眼查扫码登录提示!,回车继续")
  558. input("请打开天眼查APP扫码完成登录后,按回车键继续执行脚本...")
  559. except PlaywrightTimeoutError:
  560. # 超时未出现,说明无需扫码,直接继续
  561. print("✅ 未检测到扫码登录提示,跳过扫码步骤")
  562. def _load_login_state(self) -> Optional[Dict]:
  563. """加载本地登录状态"""
  564. if os.path.exists(self.login_state_path):
  565. try:
  566. with open(self.login_state_path, "r", encoding="utf-8") as f:
  567. return json.load(f)
  568. except json.JSONDecodeError:
  569. print(f"⚠️ 天眼查登录状态文件损坏,将重新登录")
  570. os.remove(self.login_state_path)
  571. return None
  572. def _save_login_state(self) -> None:
  573. """保存登录状态"""
  574. if self.context:
  575. try:
  576. self.context.storage_state(path=self.login_state_path) # 同步保存
  577. print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}")
  578. except Exception as e:
  579. print(f"⚠️ 天眼查登录状态保存失败:{e}")
  580. def init_browser(self, pw) -> bool: # ✅ 保持async
  581. """初始化天眼查浏览器"""
  582. try:
  583. self.pw = pw # ✅ 核心修改:直接使用传进来的 playwright 引擎,不再自己 start()
  584. # 启动防检测浏览器
  585. self.browser = self.pw.chromium.launch(
  586. headless=PLAYWRIGHT_CONFIG["headless"],
  587. slow_mo=PLAYWRIGHT_CONFIG["slow_mo"],
  588. args=PLAYWRIGHT_CONFIG["browser_args"],
  589. ignore_default_args=["--enable-automation"],
  590. timeout=60000
  591. )
  592. # 加载登录状态或手动登录
  593. login_state = self._load_login_state()
  594. if login_state:
  595. self.context = self.browser.new_context(
  596. viewport=None, # ✅ 设为None,适配最大化窗口
  597. locale=PLAYWRIGHT_CONFIG["locale"],
  598. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  599. ignore_https_errors=True,
  600. storage_state=login_state # ✅ 加载已保存的登录状态
  601. )
  602. print("✅ 已加载天眼查本地登录状态")
  603. else: # ✅ 无登录状态:提示手动登录
  604. self.context = self.browser.new_context(
  605. viewport=None,
  606. locale=PLAYWRIGHT_CONFIG["locale"],
  607. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  608. ignore_https_errors=True,
  609. )
  610. # 初始化页面
  611. self.page = self.context.new_page()
  612. # self.page.window_maximize() # ✅ 强制窗口最大化(兜底)
  613. self.page.set_default_timeout(PLAYWRIGHT_CONFIG['default_timeout'])
  614. self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG['navigation_timeout'])
  615. return True
  616. except Exception as e:
  617. print(f"❌ 天眼查浏览器初始化失败:{e}")
  618. self.close() # ✅ await关闭
  619. return False
  620. def search_enterprise(self, enterprise_name: str) -> bool:
  621. """
  622. 打开天眼查并搜索指定企业名
  623. :param enterprise_name: 从OCR提取的企业名称
  624. :return: 搜索是否成功
  625. """
  626. if not self.page:
  627. print("❌ 天眼查浏览器未初始化!")
  628. return False
  629. if not enterprise_name or enterprise_name.strip() == '':
  630. print("❌ 企业名称为空!无法搜索")
  631. return False
  632. try:
  633. #设置一个计数器,去往官网只运行一次
  634. # 1. 打开天眼查首页(替换你指定的链接)
  635. print(f"\n📌 打开天眼查:https://www.tianyancha.com/")
  636. self.page.goto(
  637. "https://www.tianyancha.com/",
  638. wait_until="networkidle",
  639. timeout=30000
  640. )
  641. # input("天眼查登录")
  642. # 2. 定位天眼查搜索框(适配最新页面结构)
  643. # 搜索框selector:优先用placeholder匹配,兼容不同版本
  644. # 先检查是否需要扫码登录
  645. # self.check_scan_login_prompt()
  646. # ========== 核心:自动检测并处理登录 ==========
  647. # 定位「登录/注册」按钮(完全匹配你提供的HTML结构)
  648. login_button = self.page.locator(
  649. "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn",
  650. has_text="登录/注册"
  651. ).nth(0)
  652. try:
  653. # 等待按钮出现(最多10秒),如果出现说明未登录
  654. login_button.wait_for(state="visible", timeout=10000)
  655. print("⚠️ 检测到未登录状态,正在点击「登录/注册」按钮...")
  656. login_button.click() # 点击按钮,唤起扫码登录弹窗
  657. # 提示你手动扫码登录
  658. print("\n🔔 请打开天眼查APP,扫描页面上的登录二维码,只有四十秒,登录完成后按回车键继续...")
  659. # 等待登录完成:等待「登录/注册」按钮消失(说明已成功登录)
  660. self.page.wait_for_selector(
  661. "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn",
  662. state="hidden", # 等待元素隐藏
  663. timeout=40000 # 最多等30秒,给足扫码时间
  664. )
  665. print("✅ 扫码登录成功!")
  666. except PlaywrightTimeoutError:
  667. # 10秒内没找到「登录/注册」按钮 → 说明已经处于登录状态
  668. print("✅ 检测到已登录状态,无需重复登录")
  669. print("\n⚠️ 请先完成天眼查登录!")
  670. # self.page = self.context.new_page() # ✅ await创建页面
  671. # self.page.goto("https://www.tianyancha.com", timeout=30000) # ✅ await跳转
  672. # input("请在浏览器中完成天眼查登录,登录后按回车继续...")
  673. self.context.storage_state(path=self.login_state_path) # ✅ await保存状态
  674. print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}")
  675. search_locator = None
  676. try:
  677. # 优先定位:placeholder匹配
  678. search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]')
  679. if search_locator.count() > 1:
  680. search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]').nth(1)
  681. else:
  682. search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]')
  683. # # 备用定位:ID匹配
  684. # if not search_locator.count():
  685. # search_locator = self.page.locator('input#header-company-search')
  686. # 等待搜索框加载(超时会触发TimeoutError)
  687. search_locator.wait_for(timeout=10000, state="visible")
  688. print("✅ 定位到天眼查搜索框")
  689. except PlaywrightTimeoutError:
  690. print(f"❌ 搜索框定位超时:页面加载过慢或搜索框元素不存在")
  691. return False
  692. except Exception as e:
  693. print(f"❌ 搜索框定位失败:{str(e)}")
  694. return False
  695. # 3. 清空搜索框 + 输入企业名 + 回车搜索
  696. search_locator.click()
  697. search_locator.clear()
  698. print(f"📌 输入企业名:{enterprise_name}")
  699. # 模拟真人输入延迟
  700. search_locator.fill(enterprise_name)
  701. self.page.wait_for_timeout(1000)
  702. # 推荐:直接用键盘回车触发搜索,这在大部分前端框架中最稳定
  703. search_locator.press("Enter")
  704. print("🖱️ 已触发回车搜索")
  705. #点击搜索按钮
  706. # search_btn = self.page.locator("button.50ab4.tyc-header-suggest-button_52bf6")
  707. # await search_btn.click() # 回车搜索
  708. # 4. 等待搜索结果加载
  709. self.page.wait_for_load_state("networkidle", timeout=20000)
  710. print(f"✅ 天眼查搜索完成!已搜索:{enterprise_name}")
  711. return True
  712. except PlaywrightTimeoutError:
  713. print(f"❌ 天眼查搜索超时(企业名:{enterprise_name})")
  714. return False
  715. except Exception as e:
  716. print(f"❌ 天眼查搜索异常:{e}")
  717. return False
  718. def get_enterprise_info(self) -> Dict:
  719. """
  720. 从天眼查搜索结果页提取核心字段(可自定义字段)
  721. 返回:包含三个字段的字典(示例:法定代表人、注册资本、成立日期)
  722. """
  723. enterprise_detail = {
  724. "tyc_company_name": "", # 公司名
  725. "tyc_company_code": "", # 统一社会信用代码
  726. "tyc_company_address": "" # 成立日期
  727. }
  728. if not self.page:
  729. print("❌ 天眼查页面未初始化")
  730. return enterprise_detail
  731. try:
  732. # 等待详情页加载
  733. # self.page.('div.company-header-container', timeout=8000)
  734. self.page.wait_for_timeout(timeout=4000)
  735. try:
  736. # 提取公司名,可能会出现很多个结果,但路径都一样,一般取第一个。
  737. company_name_locator = self.page.locator("div.index_name__qEdWi span").nth(0)
  738. if company_name_locator.count():
  739. company_name = company_name_locator.inner_text()
  740. enterprise_detail['tyc_company_name'] = company_name.strip()
  741. print(f"获取到公司名:{enterprise_detail['tyc_company_name']}")
  742. else:
  743. print(f"没有获取到企业名,网页路径有问题")
  744. except Exception as e:
  745. input("提取企业元素发生问题,检查一下")
  746. print(f"提取企业名时发生异常:{str(e)},网页路径或元素定位异常")
  747. enterprise_detail['tyc_company_name'] = ""
  748. try:
  749. # 提取统一社会信用代码
  750. code_locator = self.page.locator("div.index_info-col__UVcZb.index_credit-code__kWuDZ span").nth(0)
  751. if code_locator.count():
  752. code = code_locator.inner_text()
  753. enterprise_detail['tyc_company_code'] = code.strip()
  754. print(f"获取到企业信用代码:{enterprise_detail['tyc_company_code']}")
  755. else:
  756. print(f"没有获取到企业信用代码,网页路径有问题")
  757. except Exception as e:
  758. print(f"提取统一社会信用代码时发生异常:{str(e)},网页路径或元素定位异常")
  759. enterprise_detail['tyc_company_code'] = ""
  760. try:
  761. address_locator = self.page.locator("div.index_contact-col__7AboU.index_address__mHjQD .index_value__Pl0Nh").nth(0)
  762. if address_locator.count():
  763. address = address_locator.inner_text()
  764. enterprise_detail['tyc_company_address'] = address.strip()
  765. print(f"获取到企业地址:{enterprise_detail['tyc_company_address']}")
  766. else:
  767. print(f"没有获取到企业地址,网页路径有问题")
  768. except Exception as e:
  769. print(f"提取企业地址时发生异常:{str(e)},网页路径或元素定位异常")
  770. enterprise_detail['tyc_company_address'] = ""
  771. print("\n📌 提取的企业核心信息:")
  772. print(f"公司名:{enterprise_detail['tyc_company_name']}")
  773. print(f"企业信用代码:{enterprise_detail['tyc_company_code']}")
  774. print(f"企业地址:{enterprise_detail['tyc_company_address']}")
  775. return enterprise_detail
  776. except Exception as e:
  777. print(f"❌ 提取企业信息失败:{e}")
  778. return enterprise_detail
  779. def close(self) -> None:
  780. """关闭浏览器"""
  781. if self.page:
  782. try:
  783. self.page.close()
  784. except Exception:
  785. pass
  786. if self.context:
  787. try:
  788. self.context.close()
  789. except Exception:
  790. pass
  791. if self.browser:
  792. try:
  793. self.browser.close()
  794. print("✅ 天眼查浏览器已关闭")
  795. except Exception:
  796. pass
  797. class PddLinkBrowser:
  798. """拼多多链接浏览器(支持登录持久化+图片下载+OCR识别)"""
  799. def __init__(self, login_state_path: str = PLAYWRIGHT_CONFIG["login_state_path"]):
  800. self.login_state_path = login_state_path
  801. self.browser = None
  802. self.context: Optional[BrowserContext] = None
  803. self.page = None
  804. # 初始化图片保存文件夹
  805. self._init_image_dir()
  806. # 初始化百度OCR实例
  807. self.ocr_client: Optional[BaiduOCR] = None
  808. self._init_ocr_client()
  809. # ========== 初始化天眼查浏览器 ==========
  810. self.tyc_browser = TianyanchaBrowser()
  811. def _init_ocr_client(self):
  812. """初始化百度OCR客户端"""
  813. api_key = BAIDU_OCR_CONFIG["api_key"]
  814. secret_key = BAIDU_OCR_CONFIG["secret_key"]
  815. if not api_key or not secret_key:
  816. print("⚠️ 未配置百度OCR的API_KEY/SECRET_KEY,将跳过OCR识别")
  817. return
  818. self.ocr_client = BaiduOCR(api_key=api_key, secret_key=secret_key)
  819. print("✅ 百度OCR客户端初始化完成")
  820. # ========== 检测手机登录框并等待手动登录 ==========
  821. def _check_login_box(self) -> bool:
  822. """
  823. 检测是否出现「手机登录」框(div.phone-login 包含"手机登录"文本)
  824. :return: True=检测到并完成登录;False=未检测到登录框
  825. """
  826. if not self.page:
  827. print("❌ 页面未初始化,无法检测登录框")
  828. return False
  829. try:
  830. # 精准定位登录框元素:div.phone-login 下的 span(包含"手机登录"文本)
  831. login_locator = self.page.locator("div.phone-login span")
  832. # 等待元素可见(最多5秒,超时则认为无登录框)
  833. login_locator.wait_for(timeout=5000, state="visible")
  834. # 获取元素文本(包含伪元素的"手机登录")
  835. login_text = login_locator.inner_text().strip()
  836. if "手机登录" in login_text:
  837. print("\n⚠️ 检测到【手机登录】框,请手动完成登录!")
  838. input("登录完成后,请按回车键继续执行脚本...")
  839. # 登录后等待页面重新加载完成(确保登录状态生效)
  840. self.page.wait_for_load_state("networkidle", timeout=15000)
  841. print("✅ 登录已完成,继续处理当前商品")
  842. return True
  843. return False
  844. except PlaywrightTimeoutError:
  845. # 超时说明页面没有登录框,直接返回False
  846. return False
  847. except Exception as e:
  848. print(f"⚠️ 检测登录框时发生异常:{str(e)[:60]},继续执行")
  849. return False
  850. # ========== 登录检测方法结束 ==========
  851. # ========== 自定义向下滑动方法 ==========
  852. def _scroll_down(self, distance: int = 500, step: int = 50, interval: int =100):
  853. """
  854. 分步向下滑动指定距离(解决图片懒加载,避免一次性跳转)
  855. :param distance: 总滑动距离(像素,默认500)
  856. :param step: 每次滑动的步长(像素,默认50,越小越平缓)
  857. :param interval: 每次滑动后的间隔时间(毫秒,默认100)
  858. """
  859. if not self.page:
  860. print("❌ 浏览器页面未初始化,无法滑动")
  861. return
  862. # 容错处理:步长/总距离为非正数时直接返回
  863. if step <= 0 or distance <= 0:
  864. print(f"⚠️ 无效的滑动参数(总距离:{distance},步长:{step}),跳过滑动")
  865. return
  866. try:
  867. remaining = distance # 剩余未滑动的距离
  868. print(f"📝 开始分步滑动:总距离{distance}像素,每次滑{step}像素,间隔{interval}ms")
  869. while remaining > 0:
  870. current_step = min(step, remaining)
  871. self.page.evaluate(f"window.scrollBy(0, {current_step})")
  872. remaining -= current_step
  873. self.page.wait_for_timeout(interval)
  874. self.page.wait_for_timeout(2000)
  875. print(f"✅ 分步滑动完成,总滑动距离:{distance}像素")
  876. except Exception as e:
  877. print(f"⚠️ 分步滑动失败:{str(e)[:50]}")
  878. # ========== 滑动方法结束 ==========
  879. def _init_image_dir(self):
  880. """创建图片保存文件夹(不存在则创建)"""
  881. if not os.path.exists(IMAGE_CONFIG["save_dir"]):
  882. os.makedirs(IMAGE_CONFIG['save_dir'])
  883. print(f"✅ 图片保存文件夹已创建:{os.path.abspath(IMAGE_CONFIG['save_dir'])}")
  884. else:
  885. print(f"✅ 图片保存文件夹已存在:{os.path.abspath(IMAGE_CONFIG['save_dir'])}")
  886. def _get_image_filename(self, img_src: str, shop_name: str) -> str:
  887. """生成唯一的图片文件名(避免重复)"""
  888. # 提取原始文件名后缀(如.png/.jpg)
  889. parsed_url = urlparse(img_src)
  890. ext = os.path.splitext(parsed_url.path)[-1] or '.png'
  891. # 清洗店铺名(避免特殊字符)
  892. clean_shop = "".join([c for c in shop_name if c.isalnum() or c in ["_", "-"]])[:20]
  893. # 时间戳+店铺名+随机数,确保唯一
  894. timestamp = str(int(time.time() * 1000))
  895. filename = f"{clean_shop}_{timestamp}{ext}"
  896. return filename
  897. def _download_image(self, img_src: str, shop_name: str) -> Optional[str]:
  898. """
  899. 下载图片到指定文件夹
  900. :return: 成功返回保存路径,失败返回None
  901. """
  902. if not img_src:
  903. print("⚠️ 图片链接为空,跳过下载")
  904. return None
  905. # 生成唯一文件名
  906. filename = self._get_image_filename(img_src, shop_name)
  907. save_path = os.path.join(IMAGE_CONFIG["save_dir"], filename)
  908. # 下载重试逻辑
  909. for retry in range(IMAGE_CONFIG["retry"] + 1):
  910. try:
  911. # 发送请求下载图片(添加headers模拟浏览器)
  912. headers = {
  913. "User-Agent": PLAYWRIGHT_CONFIG["browser_args"][-1].split("=")[1],
  914. "Referer": "https://www.pinduoduo.com/",
  915. "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8" # 新增:支持jpeg格式
  916. }
  917. response = requests.get(
  918. img_src,
  919. headers=headers,
  920. timeout=IMAGE_CONFIG["timeout"],
  921. stream=True, # 流式下载,避免内存溢出
  922. allow_redirects=True # 显式开启重定向(拼多多签名链接可能302)
  923. )
  924. response.raise_for_status() # 抛出HTTP错误(4xx/5xx)
  925. # 保存图片到文件
  926. with open(save_path, "wb") as f:
  927. for chunk in response.iter_content(chunk_size=8192):
  928. f.write(chunk)
  929. # 校验文件是否保存成功
  930. if os.path.getsize(save_path) > 0:
  931. print(f"✅ 图片下载成功:{save_path}")
  932. return save_path
  933. else:
  934. os.remove(save_path) # 删除空文件
  935. print(f"⚠️ 图片下载为空,重试{retry+1}/{IMAGE_CONFIG['retry']}")
  936. except requests.exceptions.HTTPError as e:
  937. if e.response.status_code == 403:
  938. print(f"❌ 图片签名过期/无权限:{img_src[:50]}...")
  939. return None # 403无需重试,直接跳过
  940. elif retry < IMAGE_CONFIG["retry"]:
  941. print(f"⚠️ HTTP错误(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{e}")
  942. time.sleep(1)
  943. continue
  944. print(f"❌ 图片下载失败:{e}")
  945. return None
  946. except Exception as e:
  947. if retry < IMAGE_CONFIG["retry"]:
  948. print(f"⚠️ 下载失败(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{str(e)[:50]}")
  949. time.sleep(1)
  950. continue
  951. print(f"❌ 图片最终下载失败:{str(e)[:50]}")
  952. return None
  953. def _process_ocr(self, image_path: str) -> Optional[Dict]:
  954. """OCR识别后立即调用天眼查搜索+提取企业信息"""
  955. """
  956. 调用OCR识别并提取企业信息
  957. :param image_path: 图片路径
  958. :return: 企业信息字典
  959. """
  960. if not self.ocr_client:
  961. print("⚠️ OCR客户端未初始化,跳过识别")
  962. return None
  963. if not os.path.exists(image_path):
  964. print(f"❌ 图片文件不存在:{image_path}")
  965. return None
  966. # 调用OCR识别
  967. ocr_result = self.ocr_client.general_ocr(
  968. image_path=image_path,
  969. scale=BAIDU_OCR_CONFIG["scale"]
  970. )
  971. print(f"识别结果{ocr_result}")
  972. if not ocr_result:
  973. return None
  974. # 提取企业信息
  975. enterprise_info = self.ocr_client.extract_enterprise_info(ocr_result)
  976. print("\n📌 提取的企业信息:")
  977. enterprise_name = enterprise_info.get("enterprise_name", "")
  978. if not enterprise_name:
  979. print("⚠️ 未提取到企业名称,跳过天眼查")
  980. return enterprise_info
  981. # 2. 调用天眼查搜索+提取字段
  982. if self.tyc_browser.search_enterprise(enterprise_name):
  983. # 提取三个核心字段
  984. tyc_info = self.tyc_browser.get_enterprise_info()
  985. # 合并OCR结果和天眼查字段
  986. enterprise_info.update(tyc_info)
  987. print("\n📌 最终整合结果:")
  988. print(json.dumps(enterprise_info, ensure_ascii=False, indent=4))
  989. return enterprise_info
  990. def _load_login_state(self) -> Optional[Dict]:
  991. """加载本地登录状态"""
  992. if os.path.exists(self.login_state_path):
  993. try:
  994. with open(self.login_state_path, "r", encoding="utf-8") as f:
  995. return json.load(f)
  996. except json.JSONDecodeError:
  997. print(f"⚠️ 登录状态文件损坏:{self.login_state_path},将重新登录")
  998. os.remove(self.login_state_path)
  999. return None
  1000. def _save_login_state(self) -> None:
  1001. """保存登录状态到本地"""
  1002. if self.context:
  1003. try:
  1004. self.context.storage_state(path=self.login_state_path)
  1005. print(f"✅ 登录状态已保存到:{self.login_state_path}")
  1006. except Exception as e:
  1007. print(f"⚠️ 保存登录状态失败:{e}")
  1008. def init_browser(self) -> bool:
  1009. """初始化浏览器(加载登录状态/提示登录)"""
  1010. try:
  1011. # ✅ 核心修改 1:全局只启动【一次】 Playwright 引擎!存到 self.pw 中
  1012. self.pw = sync_playwright().start()
  1013. # ✅ 核心修改 2:把启动好的引擎传给天眼查去用
  1014. self.tyc_browser.init_browser(self.pw)
  1015. # 启动优化后的浏览器
  1016. # ✅ 核心修改 3:拼多多也用这同一个引擎启动浏览器
  1017. self.browser = self.pw.chromium.launch(
  1018. headless=PLAYWRIGHT_CONFIG["headless"],
  1019. slow_mo=PLAYWRIGHT_CONFIG["slow_mo"],
  1020. args=PLAYWRIGHT_CONFIG["browser_args"],
  1021. ignore_default_args=["--enable-automation"], # 隐藏自动化标识
  1022. timeout=60000
  1023. )
  1024. # 加载登录状态或创建新上下文
  1025. login_state = self._load_login_state()
  1026. if login_state:
  1027. self.context = self.browser.new_context(
  1028. viewport=PLAYWRIGHT_CONFIG["viewport"],
  1029. locale=PLAYWRIGHT_CONFIG["locale"],
  1030. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  1031. ignore_https_errors=True,
  1032. storage_state=login_state # 加载登录状态
  1033. )
  1034. print("✅ 已加载本地登录状态")
  1035. else:
  1036. self.context = self.browser.new_context(
  1037. viewport=PLAYWRIGHT_CONFIG["viewport"],
  1038. locale=PLAYWRIGHT_CONFIG["locale"],
  1039. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  1040. ignore_https_errors=True
  1041. )
  1042. print("\n⚠️ 未检测到登录状态,请先完成拼多多登录!")
  1043. self.page = self.context.new_page()
  1044. self.page.goto("https://www.pinduoduo.com", timeout=30000)
  1045. input("请在浏览器中完成登录,登录后按回车继续...")
  1046. self.context.storage_state(path=self.login_state_path) # ✅ 加await保存状态
  1047. # 初始化页面
  1048. self.page = self.context.new_page()
  1049. self.page.set_default_timeout(PLAYWRIGHT_CONFIG["default_timeout"])
  1050. self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG["navigation_timeout"])
  1051. return True
  1052. except Exception as e:
  1053. print(f"❌ 浏览器初始化失败:{e}")
  1054. self.close() # ✅ 核心修复:加await
  1055. return False
  1056. def open_links(self, goods_data: List[Dict], db_reader: DBGoodsReader) -> List[Dict]:
  1057. """依次打开商品链接(支持店名过滤+图片下载+OCR识别+天眼查搜索提取)"""
  1058. if not self.page:
  1059. print("❌ 浏览器未初始化")
  1060. return []
  1061. total = len(goods_data)
  1062. if total == 0:
  1063. print("⚠️ 无商品链接可处理")
  1064. return []
  1065. print(f"\n📋 共待处理 {total} 条商品链接")
  1066. # 收集所有抓取到的结果(可选,如果想最后统一保存的话)
  1067. all_results = []
  1068. for idx, item in enumerate(goods_data, 1):
  1069. shop = item.get("shop", "未知店铺").strip()
  1070. link = (item.get("store_url") or "").strip()
  1071. if not link:
  1072. print(f"\n⚠️ 第{idx}/{total}条:店铺【{shop}】链接为空,跳过")
  1073. continue
  1074. print(f"\n{'=' * 15} 第 {idx}/{total} 条 {'=' * 15}")
  1075. print(f"🏪 数据库店名:{shop}")
  1076. print(f"🔗 商品链接:{link}")
  1077. # ========== 判断店名是否包含“旗舰店” ==========
  1078. if "旗舰店" not in shop:
  1079. print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称不含“旗舰店”,跳过")
  1080. #涉及突破滑块验证的部分了。
  1081. continue
  1082. else:
  1083. print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称包含“旗舰店”,打开商品链接")
  1084. try:
  1085. # 1. 打开商品链接
  1086. self.page.goto(
  1087. link,
  1088. wait_until="load",
  1089. timeout=PLAYWRIGHT_CONFIG["navigation_timeout"]
  1090. )
  1091. self.page.wait_for_load_state("networkidle", timeout=15000)
  1092. print(f"✅ 页面加载成功:{self.page.title()}...")
  1093. # 检测登录框
  1094. self._check_login_box()
  1095. #如果已售罄,不跳过。
  1096. # sold_out_locator = self.page.locator("")
  1097. # if sold_out_locator.count() > 0 :
  1098. # print("该商品已售罄,跳过这次采集")
  1099. #
  1100. # 2. 店名匹配判断
  1101. page_shop_locator = self.page.locator("div.BAq4Lzv7")
  1102. try:
  1103. page_shop_locator.wait_for(timeout=5000)
  1104. page_shop_text = (page_shop_locator.inner_text()).strip().lower()
  1105. except PlaywrightTimeoutError:
  1106. print(f"❌ 未找到页面店名元素,可能页面结构改变或被风控,跳过")
  1107. continue
  1108. db_shop_text = shop.lower()
  1109. print(f"🏪 页面元素店铺名:{page_shop_text}")
  1110. if page_shop_text != db_shop_text:
  1111. print(f"❌ 店名不匹配(数据库:{db_shop_text} | 页面:{page_shop_text}),跳过")
  1112. self.page.wait_for_timeout(2000)
  1113. continue
  1114. print(f"✅ 店名匹配成功!")
  1115. # 自定义滑动距离,触发图片懒加载
  1116. self._scroll_down(distance=2100)
  1117. # ========== 获取图片src并下载 ==========
  1118. # shop_name = 'pdd_shop_info_middle_back'
  1119. final_enterprise_info = None
  1120. try:
  1121. # ========== 原定位策略(优先使用) ==========
  1122. img_locators = self.page.locator("img[role='img'][aria-label='查看图片']")
  1123. img_count = img_locators.count()
  1124. # ========== 原定位不足时,切换到备用定位 ==========
  1125. if img_count < 2:
  1126. print(f"⚠️ 原定位仅匹配到{img_count}个图片,尝试备用定位(拼多多懒加载图片)...")
  1127. input("请手动检查页面图片元素,按回车继续...")
  1128. continue
  1129. # 备用定位:匹配截图里的「pdd-lazy-image」类资质图片(带水印的营业执照)
  1130. # backup_img_locators = self.page.locator(
  1131. # "img.pdd-lazy-image.loaded" # 精准匹配已加载的懒加载图片
  1132. # )
  1133. #
  1134. # backup_count = backup_img_locators.count()
  1135. #
  1136. # if backup_count >= 2:
  1137. # img_locators = backup_img_locators
  1138. # img_count = backup_count
  1139. # print(f"✅ 备用定位生效,匹配到图片元素:{img_count} 个")
  1140. # else:
  1141. # print(f"⚠️ 原定位({img_count}个) + 备用定位({backup_count}个)均不足2个,跳过下载")
  1142. # input("请手动检查页面图片元素,按回车继续...")
  1143. # continue # 跳过当前店铺,避免卡死
  1144. print(f"📸 匹配到图片元素:{img_count} 个")
  1145. # 3. 定位第二个元素
  1146. target_img_locator = img_locators.nth(1)
  1147. target_img_locator.wait_for(timeout=5000, state="visible")
  1148. # 4. 获取第二个图片的src
  1149. img_src = target_img_locator.get_attribute("src")
  1150. if img_src:
  1151. print(f"🖼️ 第2个图片 src:{img_src[:80]}...")
  1152. image_path = self._download_image(img_src, shop)
  1153. if image_path:
  1154. # ========== 核心:调用OCR并获取最终的天眼查数据 ==========
  1155. final_enterprise_info = self._process_ocr(image_path)
  1156. else:
  1157. print(f"⚠️ 第2个图片的src为空")
  1158. except Exception as e:
  1159. print(f"❌ 获取图片/识别失败:{str(e)[:100]}")
  1160. # 3. 收集数据并自动循环
  1161. if final_enterprise_info:
  1162. # 将原数据库的店名也塞进去,方便后续入库对比
  1163. print(f"天眼查---查出来的数据为{final_enterprise_info}")
  1164. # final_enterprise_info['pdd_shop_name'] = shop
  1165. all_results.append(final_enterprise_info)
  1166. # 获取到的数据回填数据库
  1167. update_success = db_reader.insert_enterprise_info(
  1168. shop_name=shop,
  1169. enterprise_info=final_enterprise_info, # 直接传入天眼查返回的字典
  1170. )
  1171. if update_success:
  1172. print(f"✅ 店铺[{shop}]数据回填成功")
  1173. else:
  1174. print(f"❌ 店铺[{shop}]数据回填失败")
  1175. print(f"\n🎉 成功获取数据,准备进入下一条...")
  1176. else:
  1177. print(f"\n⚠️ 本条未获取到有效企业信息,准备进入下一条...")
  1178. self.page.wait_for_timeout(5000)
  1179. except PlaywrightTimeoutError:
  1180. print(f"⏰ 页面加载/元素定位超时:{link}")
  1181. input("排查问题")
  1182. continue
  1183. except Exception as e:
  1184. print(f"❌ 第{idx}条处理异常:{str(e)[:100]}...,跳过")
  1185. continue
  1186. return all_results
  1187. def close(self) -> None:
  1188. """关闭浏览器(异步版,补全所有await)"""
  1189. # 先关闭天眼查浏览器
  1190. if hasattr(self, 'tyc_browser') and self.tyc_browser:
  1191. self.tyc_browser.close()
  1192. if hasattr(self, 'pw') and self.pw:
  1193. try:
  1194. self.pw.stop()
  1195. print("✅ Playwright 驱动已彻底停止")
  1196. except Exception:
  1197. pass
  1198. # 关闭拼多多浏览器
  1199. if self.page:
  1200. try:
  1201. self.page.close()
  1202. except Exception:
  1203. pass
  1204. if self.context:
  1205. try:
  1206. self.context.close()
  1207. except Exception:
  1208. pass
  1209. if self.browser:
  1210. try:
  1211. self.browser.close()
  1212. print("✅ 拼多多浏览器已关闭")
  1213. except Exception:
  1214. pass
  1215. def main():
  1216. """主函数:整合数据库读取+链接浏览"""
  1217. # 1. 读取环境变量并补全默认值
  1218. db_config = {
  1219. "host": os.getenv("DB_HOST", DEFAULT_DB_CONFIG["host"]),
  1220. "port": int(os.getenv("DB_PORT", DEFAULT_DB_CONFIG["port"])),
  1221. "user": os.getenv("DB_USERNAME", DEFAULT_DB_CONFIG["user"]),
  1222. "password": os.getenv("DB_PASSWORD", DEFAULT_DB_CONFIG["password"]),
  1223. "db_name": os.getenv("DB_DATABASE", DEFAULT_DB_CONFIG["db_name"]),
  1224. "table_name": os.getenv("DB_TABLENAME", DEFAULT_DB_CONFIG["table_name"])
  1225. }
  1226. # 2. 初始化数据库读取器
  1227. db_reader = DBGoodsReader(
  1228. host=db_config["host"],
  1229. port=db_config["port"],
  1230. user=db_config["user"],
  1231. password=db_config["password"],
  1232. db_name=db_config["db_name"]
  1233. )
  1234. if not db_reader.connect_db():
  1235. return
  1236. # 3. 读取商品链接
  1237. goods_data = db_reader.get_shop_and_goods()
  1238. # 预览前5条数据
  1239. if goods_data:
  1240. print("\n📌 数据预览(前5条):")
  1241. for idx, item in enumerate(goods_data[:5], 1):
  1242. print(f"第{idx}条 | 店铺:{item['shop'][:20]} | 链接:{item['store_url'][:50]}...")
  1243. else:
  1244. print("\n📌 今天没有店铺数据:")
  1245. return
  1246. # 4. 初始化浏览器并打开链接
  1247. # 初始化拼多多浏览器
  1248. pdd_browser = PddLinkBrowser()
  1249. if not pdd_browser.init_browser():
  1250. return
  1251. # 接收返回的所有提取结果
  1252. extracted_data = pdd_browser.open_links(goods_data, db_reader)
  1253. # 打印最终统计
  1254. print(f"\n📊 爬取任务结束,共成功提取 {len(extracted_data)} 条企业信息!")
  1255. if extracted_data:
  1256. # 这里你可以将 extracted_data 写入数据库,或者存为 json/csv
  1257. # 例如打印第一条看看:
  1258. print("💡 最终数据示例:", json.dumps(extracted_data[0], ensure_ascii=False, indent=2))
  1259. pdd_browser.close()
  1260. db_reader.close_db()
  1261. if __name__ == "__main__":
  1262. try:
  1263. main()
  1264. except KeyboardInterrupt:
  1265. print("\n⚠️ 程序被用户中断")
  1266. except Exception as e:
  1267. print(f"\n❌ 程序运行出错:{e}")