get_pdd_shop_name.py 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477
  1. import os
  2. import json
  3. from dotenv import load_dotenv
  4. import pymysql
  5. from typing import List, Dict, Optional
  6. import time
  7. from playwright.sync_api import (
  8. sync_playwright,
  9. TimeoutError as PlaywrightTimeoutError,
  10. BrowserContext
  11. )
  12. import requests
  13. from urllib.parse import urlparse
  14. # 补充百度OCR所需依赖
  15. import base64
  16. from PIL import Image
  17. import io
  18. import asyncio
  19. # 加载环境变量
  20. load_dotenv()
  21. # ===================== 全局常量配置(集中管理,方便修改)=====================
  22. # 数据库默认配置
  23. DEFAULT_DB_CONFIG = {
  24. "host": "localhost",
  25. "port": 3306,
  26. "user": "root",
  27. "password": "",
  28. "db_name": "",
  29. "table_name": ""
  30. }
  31. # Playwright配置
  32. PLAYWRIGHT_CONFIG = {
  33. "headless": False,
  34. "slow_mo": 300,
  35. "browser_args": [
  36. "--start-maximized",
  37. "--disable-blink-features=AutomationControlled", # 核心防检测
  38. "--no-sandbox", # 兼容Windows/Linux
  39. "--disable-dev-shm-usage", # 解决内存不足
  40. "--disable-popup-blocking", # 禁用弹窗拦截
  41. "--disable-extensions", # 禁用扩展
  42. "--disable-gpu", # 禁用GPU加速
  43. "--lang=zh-CN,zh", # 中文语言
  44. "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
  45. ],
  46. "viewport": {"width": 2050, "height": 1200},
  47. "locale": "zh-CN",
  48. "timezone_id": "Asia/Shanghai",
  49. "default_timeout": 15000,
  50. "navigation_timeout": 30000,
  51. "login_state_path": "pdd_login_state.json", # 登录状态持久化文件
  52. "tianyancha_login_state": "tianyancha_login_state.json" # 天眼查登录状态保存路径
  53. }
  54. # 百度OCR配置
  55. BAIDU_OCR_CONFIG = {
  56. "api_key": os.getenv('APP_KEY'),
  57. "secret_key": os.getenv('APP_SECRET'),
  58. "scale": 1.5 # OCR图片放大倍数
  59. }
  60. # 图片保存配置
  61. IMAGE_CONFIG = {
  62. "save_dir": "pdd_goods_images", # 独立文件夹名(项目根目录下)
  63. "timeout": 10, # 图片下载超时时间(秒)
  64. "retry": 1 # 下载失败重试次数
  65. }
  66. # 1. 日期变量赋值SQL
  67. SET_DATE_SQL = "SET @date_constant = CURDATE();"
  68. # 2. 核心查询SQL
  69. QUERY_SQL = """
  70. SELECT
  71. product,
  72. shop,
  73. product_link,
  74. scrape_date,
  75. business_license_company,
  76. search_key
  77. FROM (
  78. SELECT
  79. pd.product,
  80. pd.shop,
  81. pd.product_link,
  82. pd.scrape_date,
  83. psi.business_license_company,
  84. pd.search_key,
  85. ROW_NUMBER() OVER (PARTITION BY pd.shop ORDER BY pd.search_key ASC) AS rn
  86. FROM pdd_drug_middle pd
  87. LEFT JOIN pdd_shop_info_middle psi ON psi.shop = pd.shop
  88. WHERE pd.scrape_date >= @date_constant
  89. AND psi.business_license_company IS NULL
  90. ) AS sub
  91. WHERE rn = 1
  92. ORDER BY search_key;
  93. """
  94. # ===================== 百度OCR类(完整整合)=====================
  95. class BaiduOCR:
  96. """百度 OCR 文字识别封装类"""
  97. def __init__(self, api_key: str, secret_key: str):
  98. """
  99. 初始化百度 OCR
  100. :param api_key: 百度智能云应用的 API Key
  101. :param secret_key: 百度智能云应用的 Secret Key
  102. """
  103. self.api_key = api_key
  104. self.secret_key = secret_key
  105. self.access_token: Optional[str] = None
  106. # 获取 access_token(有效期30天,建议缓存)
  107. self._get_access_token()
  108. def _get_access_token(self) -> bool:
  109. """
  110. 获取百度 OCR 的 access_token(有效期30天)
  111. :return: 是否获取成功
  112. """
  113. url = "https://aip.baidubce.com/oauth/2.0/token"
  114. params = {
  115. "grant_type": "client_credentials",
  116. "client_id": self.api_key,
  117. "client_secret": self.secret_key
  118. }
  119. try:
  120. response = requests.post(url, params=params, timeout=10)
  121. response.raise_for_status()
  122. result = response.json()
  123. if "access_token" in result:
  124. self.access_token = result["access_token"]
  125. print(f"✅ 成功获取 access_token:{self.access_token[:20]}...")
  126. return True
  127. else:
  128. print(f"❌ 获取 access_token 失败:{result}")
  129. return False
  130. except Exception as e:
  131. print(f"❌ 获取 access_token 异常:{e}")
  132. return False
  133. def _enlarge_and_crop_image(self, image_path: str, scale: float=1.5, crop_ratio: float=0.5) -> bytes:
  134. """
  135. 先裁剪图片上半部分(保留有效内容),再放大图片(解决OCR尺寸错误)
  136. :param image_path: 原图路径
  137. :param scale: 放大倍数(推荐1.5~3.0)
  138. :param crop_ratio: 裁剪比例(0.5=保留上50%,0.6=保留上60%,可根据图片调整)
  139. :return: 处理后的图片二进制数据
  140. """
  141. try:
  142. with Image.open(image_path) as img:
  143. # ========== 步骤1:裁剪上半部分(核心修复) ==========
  144. # 计算裁剪区域:左=0,上=0,右=原图宽度,下=原图高度×裁剪比例
  145. crop_box = (
  146. 0, # 左边界
  147. 0, # 上边界
  148. img.width, # 右边界
  149. int(img.height * crop_ratio) # 下边界(只保留上半部分)
  150. )
  151. img_cropped = img.crop(crop_box) # 执行裁剪
  152. print(f"✅ 图片裁剪完成:保留上{int(crop_ratio * 100)}%区域,尺寸={img_cropped.size}")
  153. # ========== 步骤2:放大裁剪后的图片 ==========
  154. new_width = int(img_cropped.width * scale)
  155. new_height = int(img_cropped.height * scale)
  156. # 高质量放大(Lanczos算法,最清晰)
  157. img_resized = img_cropped.resize(
  158. (new_width, new_height),
  159. Image.Resampling.LANCZOS
  160. )
  161. # ========== 仅5行,强制缩到4096×4096以内(核心微调) ==========
  162. MAX_OCR_SIZE = 4096 # 百度OCR最大允许宽度/高度
  163. if img_resized.width > MAX_OCR_SIZE:
  164. ratio = MAX_OCR_SIZE / img_resized.width # 计算缩放比例
  165. img_resized = img_resized.resize(
  166. (MAX_OCR_SIZE, int(img_resized.height * ratio)),
  167. Image.Resampling.LANCZOS
  168. )
  169. if img_resized.mode == 'RGBA':
  170. # 创建白色背景的RGB画布
  171. rgb_img = Image.new('RGB', img_resized.size, (255, 255, 255))
  172. # 将RGBA图片粘贴到RGB画布(透明区域显示白色)
  173. rgb_img.paste(img_resized, mask=img_resized.split()[3]) # mask=alpha通道
  174. img_resized = rgb_img
  175. # # ========== 保存处理后图片到本地 ==========
  176. # # 1. 确保pdd_goods_images文件夹存在(不存在则创建)
  177. # save_dir = "pdd_goods_images"
  178. # if not os.path.exists(save_dir):
  179. # os.makedirs(save_dir)
  180. # # 2. 提取原图片文件名(比如从image_path中拿到"鸿祥堂大药房旗舰店_1773649991220.jpeg")
  181. # file_name = os.path.basename(image_path)
  182. # # 3. 拼接保存路径
  183. # save_path = os.path.join(save_dir, file_name)
  184. # # 4. 保存图片到本地(质量和OCR用的一致)
  185. # img_resized.save(save_path, format='JPEG', quality=95)
  186. # print(f"✅ 处理后图片已保存到:{save_path}")
  187. # # ======================================================
  188. # 保存到内存(不生成本地文件)
  189. img_byte_arr = io.BytesIO()
  190. # 保存为 JPG,保证清晰度
  191. img_resized.save(img_byte_arr, format='JPEG', quality=95)
  192. img_byte_arr = img_byte_arr.getvalue()
  193. # 校验文件大小(超4MB则再次压缩)
  194. file_size = len(img_byte_arr) / 1024 / 1024 # 转MB
  195. if file_size > 4:
  196. print(f"⚠️ 文件超4MB({file_size:.2f}MB),二次压缩...")
  197. img_byte_arr = io.BytesIO()
  198. img_resized.save(img_byte_arr, format='JPEG', quality=70, optimize=True)
  199. img_byte_arr = img_byte_arr.getvalue()
  200. # 打印最终尺寸(方便调试)
  201. print(f"✅ 图片放大完成:最终尺寸={img_resized.size}")
  202. return img_byte_arr
  203. except Exception as e:
  204. print(f"❌ 图片裁剪/放大失败:{str(e)}")
  205. return b''
  206. def general_ocr(self, image_path: str, scale: float = 1.5) -> Optional[Dict]:
  207. """
  208. 调用百度通用文字识别(支持图片放大)
  209. :param image_path: 本地图片路径
  210. :param scale: 放大倍数,默认2倍
  211. :return: OCR识别结果
  212. """
  213. if not self.access_token:
  214. print("❌ access_token 无效,请先初始化")
  215. return None
  216. try:
  217. """
  218. 百度OCR通用识别(整合裁剪+放大)
  219. """
  220. # 替换原放大逻辑为「裁剪+放大」
  221. image_data = self._enlarge_and_crop_image(image_path, scale=scale, crop_ratio=0.5)
  222. if not image_data:
  223. print("❌ 图片处理失败,无法识别")
  224. return {}
  225. image_base64 = base64.b64encode(image_data).decode("utf-8")
  226. except Exception as e:
  227. print(f"❌ 图片放大/读取失败:{e}")
  228. return None
  229. # 调用 OCR 接口
  230. url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={self.access_token}"
  231. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  232. data = {"image": image_base64}
  233. try:
  234. response = requests.post(url, headers=headers, data=data, timeout=10)
  235. response.raise_for_status()
  236. result = response.json()
  237. if "words_result" in result:
  238. print(f"✅ 识别成功,共识别到 {len(result['words_result'])} 行文字")
  239. return result
  240. else:
  241. print(f"❌ 识别失败:{result}")
  242. return None
  243. except Exception as e:
  244. print(f"❌ 调用 OCR 接口异常:{e}")
  245. return None
  246. def extract_enterprise_info(self, ocr_result: Dict) -> Dict:
  247. """
  248. 从OCR识别结果中提取企业名称和社会信用代码
  249. :param ocr_result: general_ocr 返回的识别结果字典
  250. :return: 包含企业名称和社会信用代码的JSON格式字典
  251. 格式:{"enterprise_name": "企业名称", "credit_code": "社会信用代码"}
  252. """
  253. # 初始化返回结果(默认空值)
  254. enterprise_info = {
  255. "enterprise_name": "",
  256. "credit_code": ""
  257. }
  258. if not ocr_result or "words_result" not in ocr_result:
  259. print("❌ OCR识别结果为空,无法提取企业信息")
  260. return enterprise_info
  261. all_text_lines = []
  262. # 遍历所有识别的文字行,匹配关键词
  263. for item in ocr_result["words_result"]:
  264. line_text = item["words"].strip() # 去除首尾空格
  265. if line_text and line_text not in all_text_lines: # 去空+去重
  266. all_text_lines.append(line_text)
  267. print(f"📝 OCR识别的有效行:{all_text_lines}")
  268. # ==================== 1. 提取并清洗企业名称 ====================
  269. enterprise_name = ""
  270. # 名称匹配关键词(覆盖所有场景)
  271. name_keywords = ["企业名称", "名称:", "名:", "称:"]
  272. # 常见错别字修正映射
  273. name_correction = {
  274. "人药房": "大药房",
  275. "有松司": "有限公司",
  276. "松司": "公司",
  277. "关药房": "大药房"
  278. }
  279. for idx, line_text in enumerate(all_text_lines):
  280. # 场景1:包含"企业名称"(处理括号冗余,如"企业名称(名称xxx")
  281. if "企业名称" in line_text:
  282. # 移除所有括号及内部内容,再提取名称
  283. import re
  284. # 正则移除括号(()/())及内容
  285. clean_line = re.sub(r'\([^)]*\)|\([^)]*\)', '', line_text)
  286. # 提取"企业名称"后的所有内容
  287. name_part = clean_line.split("企业名称")[-1].strip()
  288. # 若还有"名称"前缀,继续拆分
  289. if "名称" in name_part:
  290. name_part = name_part.split("名称")[-1].strip()
  291. enterprise_name = name_part
  292. break
  293. # 场景2:单行包含"名称:"/"名:"/"称:"
  294. elif any(key in line_text for key in ["名称:", "名:", "称:"]):
  295. name_part = line_text.split(":")[-1].strip() if ":" in line_text else line_text.split(":")[-1].strip()
  296. enterprise_name = name_part
  297. break
  298. # 场景3:跨行拆分(前一行是"名",当前行以"称:"开头)
  299. elif idx > 0 and all_text_lines[idx - 1] == "名" and line_text.startswith("称:"):
  300. name_part = line_text.split(":")[-1].strip()
  301. enterprise_name = name_part
  302. break
  303. # 清洗企业名称:修正错别字、移除多余空格
  304. if enterprise_name:
  305. for wrong, right in name_correction.items():
  306. enterprise_name = enterprise_name.replace(wrong, right)
  307. # 步骤2:移除开头/结尾的冒号(中文+英文)、空格、特殊符号
  308. enterprise_name = enterprise_name.strip(":: \t\n\r")
  309. # 步骤3:移除中间多余空格
  310. enterprise_name = enterprise_name.replace(" ", "") # 移除所有空格
  311. # ==================== 2. 提取并清洗社会信用代码 ====================
  312. credit_code = ""
  313. # 信用代码匹配关键词(兼容错别字+多格式)
  314. code_keywords = ["社会信用代码:", "统一社会信用代码:", "社会震用代码:"]
  315. for line_text in all_text_lines:
  316. # 匹配任意关键词
  317. for keyword in code_keywords:
  318. if keyword in line_text:
  319. code_part = line_text.split(keyword)[-1].strip()
  320. credit_code = code_part.replace(" ", "") # 移除空格(如"91360105 MAEGBDKMXF")
  321. break
  322. if credit_code: # 找到后跳出循环
  323. break
  324. # ==================== 赋值并打印结果 ====================
  325. enterprise_info["enterprise_name"] = enterprise_name
  326. enterprise_info["credit_code"] = credit_code
  327. # enterprise_info["address"] = address
  328. # 打印提取结果
  329. if enterprise_name:
  330. print(f"✅ 提取到企业名称:{enterprise_name}")
  331. else:
  332. print("⚠️ 未识别到企业名称字段")
  333. if credit_code:
  334. print(f"✅ 提取到社会信用代码:{credit_code}")
  335. else:
  336. print("⚠️ 未识别到社会信用代码字段")
  337. return enterprise_info
  338. # ===================== 数据库读取类 =====================
  339. class DBGoodsReader:
  340. """数据库商品链接读取器"""
  341. def __init__(
  342. self,
  343. host: str = DEFAULT_DB_CONFIG["host"],
  344. port: int = DEFAULT_DB_CONFIG["port"],
  345. user: str = DEFAULT_DB_CONFIG["user"],
  346. password: str = DEFAULT_DB_CONFIG["password"],
  347. db_name: str = DEFAULT_DB_CONFIG["db_name"],
  348. charset: str = "utf8mb4"
  349. ):
  350. self.host = host
  351. self.port = port
  352. self.user = user
  353. self.password = password
  354. self.db_name = db_name
  355. self.charset = charset
  356. self.conn: Optional[pymysql.connections.Connection] = None
  357. self.cursor: Optional[pymysql.cursors.DictCursor] = None
  358. def connect_db(self) -> bool:
  359. """连接数据库(带重试机制)"""
  360. max_retry = 2
  361. for retry in range(max_retry + 1):
  362. try:
  363. self.conn = pymysql.connect(
  364. host=self.host,
  365. port=self.port,
  366. user=self.user,
  367. password=self.password,
  368. database=self.db_name,
  369. charset=self.charset,
  370. cursorclass=pymysql.cursors.DictCursor,
  371. connect_timeout=10 # 连接超时
  372. )
  373. self.cursor = self.conn.cursor()
  374. print(f"✅ 成功连接数据库:{self.db_name}")
  375. return True
  376. except pymysql.MySQLError as e:
  377. if retry < max_retry:
  378. print(f"❌ 数据库连接失败(重试{retry + 1}/{max_retry}):{e}")
  379. time.sleep(1)
  380. continue
  381. print(f"❌ 数据库连接最终失败:{e}")
  382. return False
  383. def get_shop_and_goods(self) -> List[Dict]:
  384. """
  385. 读取待补充企业信息的店铺数据(每个店铺仅取1条)
  386. 返回:包含product/shop/product_link等字段的字典列表
  387. """
  388. if not self.conn or not self.cursor:
  389. print("❌ 未连接数据库,请先调用 connect_db()")
  390. return []
  391. try:
  392. # 步骤1:执行日期变量赋值
  393. self.cursor.execute(SET_DATE_SQL)
  394. # 步骤2:执行核心查询
  395. self.cursor.execute(QUERY_SQL)
  396. # 步骤3:获取结果(DictCursor返回字典格式,字段名对应SQL列名)
  397. results = self.cursor.fetchall()
  398. print(f"✅ 成功读取 {len(results)} 条待补充企业信息的店铺数据")
  399. return results
  400. except pymysql.MySQLError as e:
  401. print(f"❌ 读取数据失败:{e}")
  402. return []
  403. def _get_next_id(self) -> int:
  404. """获取表中最大ID并+1,用于生成新插入数据的ID(若ID非自增)"""
  405. try:
  406. sql = "SELECT IFNULL(MAX(id), 0) + 1 AS next_id FROM pdd_shop_info_middle"
  407. self.cursor.execute(sql)
  408. result = self.cursor.fetchone()
  409. next_id = result.get("next_id", 9078) # 默认初始值9078(兼容示例)
  410. print(f"✅ 获取到下一个可用ID:{next_id}")
  411. return next_id
  412. except pymysql.MySQLError as e:
  413. print(f"❌ 获取自增ID失败,使用默认值9078:{e}")
  414. return 9078
  415. def insert_enterprise_info(self, shop_name: str, enterprise_info: Dict) -> bool:
  416. """
  417. 向pdd_shop_info_middle表插入企业信息(替代原更新逻辑)
  418. :param shop_name: 店铺名称(关联表的shop字段)
  419. :param enterprise_info: 包含tyc_company_name/tyc_company_code/tyc_company_address的字典
  420. :return: 插入是否成功
  421. """
  422. if not self.conn or not self.cursor:
  423. print("❌ 未连接数据库,请先调用 connect_db()")
  424. return False
  425. if not shop_name:
  426. print("❌ 店铺名称为空,无法更新")
  427. return False
  428. business_company_name = enterprise_info.get("tyc_company_name", "").strip()
  429. qualification_number = enterprise_info.get("tyc_company_code", "").strip()
  430. contact_address = enterprise_info.get("tyc_company_address", "").strip()
  431. business_license_address = contact_address # 两个地址字段都用同一个值
  432. # 空值校验提示
  433. empty_fields = []
  434. if not business_company_name:
  435. empty_fields.append("企业名称")
  436. if not qualification_number:
  437. empty_fields.append("统一信用代码")
  438. if not contact_address:
  439. empty_fields.append("企业地址")
  440. if empty_fields:
  441. print(f"⚠️ 店铺[{shop_name}]以下字段为空:{','.join(empty_fields)},仍继续插入(空值)")
  442. # 生成插入ID(若表ID为自增主键,可删除ID相关逻辑,SQL中也去掉id字段)
  443. next_id = self._get_next_id()
  444. insert_sql = """
  445. INSERT INTO `test2`.`pdd_shop_info_middle` (
  446. `id`,
  447. `shop`,
  448. `contact_address`,
  449. `qualification_number`,
  450. `business_license_company`,
  451. `business_license_address`,
  452. `scrape_date`,
  453. `platform`,
  454. `province`,
  455. `city`,
  456. `create_time`,
  457. `update_time`
  458. ) VALUES (%s, %s, %s, %s, %s, %s, CURDATE(), '拼多多', '', '', NOW(), NOW())
  459. """
  460. # 组装插入参数
  461. insert_params = [
  462. next_id,
  463. shop_name,
  464. contact_address,
  465. qualification_number,
  466. business_company_name,
  467. business_license_address
  468. ]
  469. try:
  470. # 执行插入
  471. self.cursor.execute(insert_sql, insert_params)
  472. self.conn.commit()
  473. # 检查影响行数
  474. affected_rows = self.cursor.rowcount
  475. if affected_rows > 0:
  476. print(f"✅ 店铺[{shop_name}]成功插入1条数据(ID:{next_id})")
  477. print(f" 插入内容:企业名称={business_company_name} | 信用代码={qualification_number} | 地址={contact_address}")
  478. return True
  479. else:
  480. print(f"⚠️ 店铺[{shop_name}]插入0行数据,无数据变更")
  481. return False
  482. except pymysql.MySQLError as e:
  483. print(f"❌ 店铺[{shop_name}]插入失败:{e}")
  484. self.conn.rollback() # 回滚事务
  485. return False
  486. except Exception as e:
  487. print(f"❌ 店铺[{shop_name}]插入异常:{e}")
  488. self.conn.rollback()
  489. return False
  490. def close_db(self) -> None:
  491. """安全关闭数据库连接"""
  492. if self.cursor:
  493. try:
  494. self.cursor.close()
  495. except Exception:
  496. pass
  497. if self.conn:
  498. try:
  499. self.conn.close()
  500. print("✅ 数据库连接已关闭")
  501. except Exception:
  502. pass
  503. # ===================== 天眼查浏览器类 =====================
  504. class TianyanchaBrowser:
  505. """天眼查浏览器:自动打开、登录、搜索企业名"""
  506. def __init__(self):
  507. self.pw = None
  508. self.browser = None
  509. self.context: Optional[BrowserContext] = None
  510. self.page = None
  511. self.login_state_path = PLAYWRIGHT_CONFIG["tianyancha_login_state"]
  512. def check_scan_login_prompt(self):
  513. """
  514. 检查是否出现「扫码登录」提示,若出现则暂停并提示手动扫码
  515. """
  516. try:
  517. # 定位扫码登录提示文本(结合父div,避免误匹配其他页面文本)
  518. scan_prompt_locator = self.page.locator(
  519. "div.scan-title",
  520. has_text="扫码登录 更快 更安全"
  521. )
  522. scan_prompt_locator.wait_for(
  523. state="visible",
  524. timeout=10000 # 超时10秒,可根据网络调整
  525. )
  526. # 提示出现,暂停脚本让你手动扫码
  527. print("⚠️ 检测到天眼查扫码登录提示!")
  528. input("请打开天眼查APP扫码完成登录后,按回车键继续执行脚本...")
  529. except PlaywrightTimeoutError:
  530. # 超时未出现,说明无需扫码,直接继续
  531. print("✅ 未检测到扫码登录提示,跳过扫码步骤")
  532. def _load_login_state(self) -> Optional[Dict]:
  533. """加载本地登录状态"""
  534. if os.path.exists(self.login_state_path):
  535. try:
  536. with open(self.login_state_path, "r", encoding="utf-8") as f:
  537. return json.load(f)
  538. except json.JSONDecodeError:
  539. print(f"⚠️ 天眼查登录状态文件损坏,将重新登录")
  540. os.remove(self.login_state_path)
  541. return None
  542. def _save_login_state(self) -> None:
  543. """保存登录状态"""
  544. if self.context:
  545. try:
  546. self.context.storage_state(path=self.login_state_path) # 同步保存
  547. print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}")
  548. except Exception as e:
  549. print(f"⚠️ 天眼查登录状态保存失败:{e}")
  550. def init_browser(self, pw) -> bool: # ✅ 保持async
  551. """初始化天眼查浏览器"""
  552. try:
  553. self.pw = pw # ✅ 核心修改:直接使用传进来的 playwright 引擎,不再自己 start()
  554. # 启动防检测浏览器
  555. self.browser = self.pw.chromium.launch(
  556. headless=PLAYWRIGHT_CONFIG["headless"],
  557. slow_mo=PLAYWRIGHT_CONFIG["slow_mo"],
  558. args=PLAYWRIGHT_CONFIG["browser_args"],
  559. ignore_default_args=["--enable-automation"],
  560. timeout=60000
  561. )
  562. # 加载登录状态或手动登录
  563. login_state = self._load_login_state()
  564. if login_state:
  565. self.context = self.browser.new_context(
  566. viewport=None, # ✅ 设为None,适配最大化窗口
  567. locale=PLAYWRIGHT_CONFIG["locale"],
  568. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  569. ignore_https_errors=True,
  570. storage_state=login_state # ✅ 加载已保存的登录状态
  571. )
  572. print("✅ 已加载天眼查本地登录状态")
  573. else: # ✅ 无登录状态:提示手动登录
  574. self.context = self.browser.new_context(
  575. viewport=None,
  576. locale=PLAYWRIGHT_CONFIG["locale"],
  577. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  578. ignore_https_errors=True,
  579. )
  580. # 初始化页面
  581. self.page = self.context.new_page()
  582. # self.page.window_maximize() # ✅ 强制窗口最大化(兜底)
  583. self.page.set_default_timeout(PLAYWRIGHT_CONFIG['default_timeout'])
  584. self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG['navigation_timeout'])
  585. return True
  586. except Exception as e:
  587. print(f"❌ 天眼查浏览器初始化失败:{e}")
  588. self.close() # ✅ await关闭
  589. return False
  590. def search_enterprise(self, enterprise_name: str) -> bool:
  591. """
  592. 打开天眼查并搜索指定企业名
  593. :param enterprise_name: 从OCR提取的企业名称
  594. :return: 搜索是否成功
  595. """
  596. if not self.page:
  597. print("❌ 天眼查浏览器未初始化!")
  598. return False
  599. if not enterprise_name or enterprise_name.strip() == '':
  600. print("❌ 企业名称为空!无法搜索")
  601. return False
  602. try:
  603. #设置一个计数器,去往官网只运行一次
  604. # 1. 打开天眼查首页(替换你指定的链接)
  605. print(f"\n📌 打开天眼查:https://www.tianyancha.com/")
  606. self.page.goto(
  607. "https://www.tianyancha.com/",
  608. wait_until="networkidle",
  609. timeout=30000
  610. )
  611. # input("天眼查登录")
  612. # 2. 定位天眼查搜索框(适配最新页面结构)
  613. # 搜索框selector:优先用placeholder匹配,兼容不同版本
  614. # 先检查是否需要扫码登录
  615. # self.check_scan_login_prompt()
  616. # ========== 核心:自动检测并处理登录 ==========
  617. # 定位「登录/注册」按钮(完全匹配你提供的HTML结构)
  618. login_button = self.page.locator(
  619. "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn",
  620. has_text="登录/注册"
  621. ).nth(0)
  622. try:
  623. # 等待按钮出现(最多10秒),如果出现说明未登录
  624. login_button.wait_for(state="visible", timeout=10000)
  625. print("⚠️ 检测到未登录状态,正在点击「登录/注册」按钮...")
  626. login_button.click() # 点击按钮,唤起扫码登录弹窗
  627. # 提示你手动扫码登录
  628. print("\n🔔 请打开天眼查APP,扫描页面上的登录二维码,只有四十秒,登录完成后按回车键继续...")
  629. # 等待登录完成:等待「登录/注册」按钮消失(说明已成功登录)
  630. self.page.wait_for_selector(
  631. "div.tyc-header-nav-item.tyc-nav-user span.tyc-nav-user-btn",
  632. state="hidden", # 等待元素隐藏
  633. timeout=40000 # 最多等30秒,给足扫码时间
  634. )
  635. print("✅ 扫码登录成功!")
  636. except PlaywrightTimeoutError:
  637. # 10秒内没找到「登录/注册」按钮 → 说明已经处于登录状态
  638. print("✅ 检测到已登录状态,无需重复登录")
  639. print("\n⚠️ 请先完成天眼查登录!")
  640. # self.page = self.context.new_page() # ✅ await创建页面
  641. # self.page.goto("https://www.tianyancha.com", timeout=30000) # ✅ await跳转
  642. # input("请在浏览器中完成天眼查登录,登录后按回车继续...")
  643. self.context.storage_state(path=self.login_state_path) # ✅ await保存状态
  644. print(f"✅ 天眼查登录状态已保存到:{self.login_state_path}")
  645. search_locator = None
  646. try:
  647. # 优先定位:placeholder匹配
  648. search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]')
  649. if search_locator.count() > 1:
  650. search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]').nth(1)
  651. else:
  652. search_locator = self.page.locator('input[placeholder="请输入公司名称、老板姓名、品牌名称等"]')
  653. # # 备用定位:ID匹配
  654. # if not search_locator.count():
  655. # search_locator = self.page.locator('input#header-company-search')
  656. # 等待搜索框加载(超时会触发TimeoutError)
  657. search_locator.wait_for(timeout=10000, state="visible")
  658. print("✅ 定位到天眼查搜索框")
  659. except PlaywrightTimeoutError:
  660. print(f"❌ 搜索框定位超时:页面加载过慢或搜索框元素不存在")
  661. return False
  662. except Exception as e:
  663. print(f"❌ 搜索框定位失败:{str(e)}")
  664. return False
  665. # 3. 清空搜索框 + 输入企业名 + 回车搜索
  666. search_locator.click()
  667. search_locator.clear()
  668. print(f"📌 输入企业名:{enterprise_name}")
  669. # 模拟真人输入延迟
  670. search_locator.fill(enterprise_name)
  671. self.page.wait_for_timeout(1000)
  672. # 推荐:直接用键盘回车触发搜索,这在大部分前端框架中最稳定
  673. search_locator.press("Enter")
  674. print("🖱️ 已触发回车搜索")
  675. #点击搜索按钮
  676. # search_btn = self.page.locator("button.50ab4.tyc-header-suggest-button_52bf6")
  677. # await search_btn.click() # 回车搜索
  678. # 4. 等待搜索结果加载
  679. self.page.wait_for_load_state("networkidle", timeout=20000)
  680. print(f"✅ 天眼查搜索完成!已搜索:{enterprise_name}")
  681. return True
  682. except PlaywrightTimeoutError:
  683. print(f"❌ 天眼查搜索超时(企业名:{enterprise_name})")
  684. return False
  685. except Exception as e:
  686. print(f"❌ 天眼查搜索异常:{e}")
  687. return False
  688. def get_enterprise_info(self) -> Dict:
  689. """
  690. 从天眼查搜索结果页提取核心字段(可自定义字段)
  691. 返回:包含三个字段的字典(示例:法定代表人、注册资本、成立日期)
  692. """
  693. enterprise_detail = {
  694. "tyc_company_name": "", # 公司名
  695. "tyc_company_code": "", # 统一社会信用代码
  696. "tyc_company_address": "" # 成立日期
  697. }
  698. if not self.page:
  699. print("❌ 天眼查页面未初始化")
  700. return enterprise_detail
  701. try:
  702. # 等待详情页加载
  703. # self.page.('div.company-header-container', timeout=8000)
  704. self.page.wait_for_timeout(timeout=4000)
  705. try:
  706. # 提取公司名,可能会出现很多个结果,但路径都一样,一般取第一个。
  707. company_name_locator = self.page.locator("div.index_name__qEdWi span").nth(0)
  708. if company_name_locator.count():
  709. company_name = company_name_locator.inner_text()
  710. enterprise_detail['tyc_company_name'] = company_name.strip()
  711. print(f"获取到公司名:{enterprise_detail['tyc_company_name']}")
  712. else:
  713. print(f"没有获取到企业名,网页路径有问题")
  714. except Exception as e:
  715. input("提取企业元素发生问题,检查一下")
  716. print(f"提取企业名时发生异常:{str(e)},网页路径或元素定位异常")
  717. enterprise_detail['tyc_company_name'] = ""
  718. try:
  719. # 提取统一社会信用代码
  720. code_locator = self.page.locator("div.index_info-col__UVcZb.index_credit-code__kWuDZ span").nth(0)
  721. if code_locator.count():
  722. code = code_locator.inner_text()
  723. enterprise_detail['tyc_company_code'] = code.strip()
  724. print(f"获取到企业信用代码:{enterprise_detail['tyc_company_code']}")
  725. else:
  726. print(f"没有获取到企业信用代码,网页路径有问题")
  727. except Exception as e:
  728. print(f"提取统一社会信用代码时发生异常:{str(e)},网页路径或元素定位异常")
  729. enterprise_detail['tyc_company_code'] = ""
  730. try:
  731. address_locator = self.page.locator("div.index_contact-col__7AboU.index_address__mHjQD .index_value__Pl0Nh").nth(0)
  732. if address_locator.count():
  733. address = address_locator.inner_text()
  734. enterprise_detail['tyc_company_address'] = address.strip()
  735. print(f"获取到企业地址:{enterprise_detail['tyc_company_address']}")
  736. else:
  737. print(f"没有获取到企业地址,网页路径有问题")
  738. except Exception as e:
  739. print(f"提取企业地址时发生异常:{str(e)},网页路径或元素定位异常")
  740. enterprise_detail['tyc_company_address'] = ""
  741. print("\n📌 提取的企业核心信息:")
  742. print(f"公司名:{enterprise_detail['tyc_company_name']}")
  743. print(f"企业信用代码:{enterprise_detail['tyc_company_code']}")
  744. print(f"企业地址:{enterprise_detail['tyc_company_address']}")
  745. return enterprise_detail
  746. except Exception as e:
  747. print(f"❌ 提取企业信息失败:{e}")
  748. return enterprise_detail
  749. def close(self) -> None:
  750. """关闭浏览器"""
  751. if self.page:
  752. try:
  753. self.page.close()
  754. except Exception:
  755. pass
  756. if self.context:
  757. try:
  758. self.context.close()
  759. except Exception:
  760. pass
  761. if self.browser:
  762. try:
  763. self.browser.close()
  764. print("✅ 天眼查浏览器已关闭")
  765. except Exception:
  766. pass
  767. class PddLinkBrowser:
  768. """拼多多链接浏览器(支持登录持久化+图片下载+OCR识别)"""
  769. def __init__(self, login_state_path: str = PLAYWRIGHT_CONFIG["login_state_path"]):
  770. self.login_state_path = login_state_path
  771. self.browser = None
  772. self.context: Optional[BrowserContext] = None
  773. self.page = None
  774. # 初始化图片保存文件夹
  775. self._init_image_dir()
  776. # 初始化百度OCR实例
  777. self.ocr_client: Optional[BaiduOCR] = None
  778. self._init_ocr_client()
  779. # ========== 初始化天眼查浏览器 ==========
  780. self.tyc_browser = TianyanchaBrowser()
  781. def _init_ocr_client(self):
  782. """初始化百度OCR客户端"""
  783. api_key = BAIDU_OCR_CONFIG["api_key"]
  784. secret_key = BAIDU_OCR_CONFIG["secret_key"]
  785. if not api_key or not secret_key:
  786. print("⚠️ 未配置百度OCR的API_KEY/SECRET_KEY,将跳过OCR识别")
  787. return
  788. self.ocr_client = BaiduOCR(api_key=api_key, secret_key=secret_key)
  789. print("✅ 百度OCR客户端初始化完成")
  790. # ========== 检测手机登录框并等待手动登录 ==========
  791. def _check_login_box(self) -> bool:
  792. """
  793. 检测是否出现「手机登录」框(div.phone-login 包含"手机登录"文本)
  794. :return: True=检测到并完成登录;False=未检测到登录框
  795. """
  796. if not self.page:
  797. print("❌ 页面未初始化,无法检测登录框")
  798. return False
  799. try:
  800. # 精准定位登录框元素:div.phone-login 下的 span(包含"手机登录"文本)
  801. login_locator = self.page.locator("div.phone-login span")
  802. # 等待元素可见(最多5秒,超时则认为无登录框)
  803. login_locator.wait_for(timeout=5000, state="visible")
  804. # 获取元素文本(包含伪元素的"手机登录")
  805. login_text = login_locator.inner_text().strip()
  806. if "手机登录" in login_text:
  807. print("\n⚠️ 检测到【手机登录】框,请手动完成登录!")
  808. input("登录完成后,请按回车键继续执行脚本...")
  809. # 登录后等待页面重新加载完成(确保登录状态生效)
  810. self.page.wait_for_load_state("networkidle", timeout=15000)
  811. print("✅ 登录已完成,继续处理当前商品")
  812. return True
  813. return False
  814. except PlaywrightTimeoutError:
  815. # 超时说明页面没有登录框,直接返回False
  816. return False
  817. except Exception as e:
  818. print(f"⚠️ 检测登录框时发生异常:{str(e)[:60]},继续执行")
  819. return False
  820. # ========== 登录检测方法结束 ==========
  821. # ========== 自定义向下滑动方法 ==========
  822. def _scroll_down(self, distance: int = 500, step: int = 50, interval: int =100):
  823. """
  824. 分步向下滑动指定距离(解决图片懒加载,避免一次性跳转)
  825. :param distance: 总滑动距离(像素,默认500)
  826. :param step: 每次滑动的步长(像素,默认50,越小越平缓)
  827. :param interval: 每次滑动后的间隔时间(毫秒,默认100)
  828. """
  829. if not self.page:
  830. print("❌ 浏览器页面未初始化,无法滑动")
  831. return
  832. # 容错处理:步长/总距离为非正数时直接返回
  833. if step <= 0 or distance <= 0:
  834. print(f"⚠️ 无效的滑动参数(总距离:{distance},步长:{step}),跳过滑动")
  835. return
  836. try:
  837. remaining = distance # 剩余未滑动的距离
  838. print(f"📝 开始分步滑动:总距离{distance}像素,每次滑{step}像素,间隔{interval}ms")
  839. while remaining > 0:
  840. current_step = min(step, remaining)
  841. self.page.evaluate(f"window.scrollBy(0, {current_step})")
  842. remaining -= current_step
  843. self.page.wait_for_timeout(interval)
  844. self.page.wait_for_timeout(2000)
  845. print(f"✅ 分步滑动完成,总滑动距离:{distance}像素")
  846. except Exception as e:
  847. print(f"⚠️ 分步滑动失败:{str(e)[:50]}")
  848. # ========== 滑动方法结束 ==========
  849. def _init_image_dir(self):
  850. """创建图片保存文件夹(不存在则创建)"""
  851. if not os.path.exists(IMAGE_CONFIG["save_dir"]):
  852. os.makedirs(IMAGE_CONFIG['save_dir'])
  853. print(f"✅ 图片保存文件夹已创建:{os.path.abspath(IMAGE_CONFIG['save_dir'])}")
  854. else:
  855. print(f"✅ 图片保存文件夹已存在:{os.path.abspath(IMAGE_CONFIG['save_dir'])}")
  856. def _get_image_filename(self, img_src: str, shop_name: str) -> str:
  857. """生成唯一的图片文件名(避免重复)"""
  858. # 提取原始文件名后缀(如.png/.jpg)
  859. parsed_url = urlparse(img_src)
  860. ext = os.path.splitext(parsed_url.path)[-1] or '.png'
  861. # 清洗店铺名(避免特殊字符)
  862. clean_shop = "".join([c for c in shop_name if c.isalnum() or c in ["_", "-"]])[:20]
  863. # 时间戳+店铺名+随机数,确保唯一
  864. timestamp = str(int(time.time() * 1000))
  865. filename = f"{clean_shop}_{timestamp}{ext}"
  866. return filename
  867. def _download_image(self, img_src: str, shop_name: str) -> Optional[str]:
  868. """
  869. 下载图片到指定文件夹
  870. :return: 成功返回保存路径,失败返回None
  871. """
  872. if not img_src:
  873. print("⚠️ 图片链接为空,跳过下载")
  874. return None
  875. # 生成唯一文件名
  876. filename = self._get_image_filename(img_src, shop_name)
  877. save_path = os.path.join(IMAGE_CONFIG["save_dir"], filename)
  878. # 下载重试逻辑
  879. for retry in range(IMAGE_CONFIG["retry"] + 1):
  880. try:
  881. # 发送请求下载图片(添加headers模拟浏览器)
  882. headers = {
  883. "User-Agent": PLAYWRIGHT_CONFIG["browser_args"][-1].split("=")[1],
  884. "Referer": "https://www.pinduoduo.com/",
  885. "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8" # 新增:支持jpeg格式
  886. }
  887. response = requests.get(
  888. img_src,
  889. headers=headers,
  890. timeout=IMAGE_CONFIG["timeout"],
  891. stream=True, # 流式下载,避免内存溢出
  892. allow_redirects=True # 显式开启重定向(拼多多签名链接可能302)
  893. )
  894. response.raise_for_status() # 抛出HTTP错误(4xx/5xx)
  895. # 保存图片到文件
  896. with open(save_path, "wb") as f:
  897. for chunk in response.iter_content(chunk_size=8192):
  898. f.write(chunk)
  899. # 校验文件是否保存成功
  900. if os.path.getsize(save_path) > 0:
  901. print(f"✅ 图片下载成功:{save_path}")
  902. return save_path
  903. else:
  904. os.remove(save_path) # 删除空文件
  905. print(f"⚠️ 图片下载为空,重试{retry+1}/{IMAGE_CONFIG['retry']}")
  906. except requests.exceptions.HTTPError as e:
  907. if e.response.status_code == 403:
  908. print(f"❌ 图片签名过期/无权限:{img_src[:50]}...")
  909. return None # 403无需重试,直接跳过
  910. elif retry < IMAGE_CONFIG["retry"]:
  911. print(f"⚠️ HTTP错误(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{e}")
  912. time.sleep(1)
  913. continue
  914. print(f"❌ 图片下载失败:{e}")
  915. return None
  916. except Exception as e:
  917. if retry < IMAGE_CONFIG["retry"]:
  918. print(f"⚠️ 下载失败(重试{retry + 1}/{IMAGE_CONFIG['retry']}):{str(e)[:50]}")
  919. time.sleep(1)
  920. continue
  921. print(f"❌ 图片最终下载失败:{str(e)[:50]}")
  922. return None
  923. def _process_ocr(self, image_path: str) -> Optional[Dict]:
  924. """OCR识别后立即调用天眼查搜索+提取企业信息"""
  925. """
  926. 调用OCR识别并提取企业信息
  927. :param image_path: 图片路径
  928. :return: 企业信息字典
  929. """
  930. if not self.ocr_client:
  931. print("⚠️ OCR客户端未初始化,跳过识别")
  932. return None
  933. if not os.path.exists(image_path):
  934. print(f"❌ 图片文件不存在:{image_path}")
  935. return None
  936. # 调用OCR识别
  937. ocr_result = self.ocr_client.general_ocr(
  938. image_path=image_path,
  939. scale=BAIDU_OCR_CONFIG["scale"]
  940. )
  941. print(f"识别结果{ocr_result}")
  942. if not ocr_result:
  943. return None
  944. # 提取企业信息
  945. enterprise_info = self.ocr_client.extract_enterprise_info(ocr_result)
  946. print("\n📌 提取的企业信息:")
  947. enterprise_name = enterprise_info.get("enterprise_name", "")
  948. if not enterprise_name:
  949. print("⚠️ 未提取到企业名称,跳过天眼查")
  950. return enterprise_info
  951. # 2. 调用天眼查搜索+提取字段
  952. if self.tyc_browser.search_enterprise(enterprise_name):
  953. # 提取三个核心字段
  954. tyc_info = self.tyc_browser.get_enterprise_info()
  955. # 合并OCR结果和天眼查字段
  956. enterprise_info.update(tyc_info)
  957. print("\n📌 最终整合结果:")
  958. print(json.dumps(enterprise_info, ensure_ascii=False, indent=4))
  959. return enterprise_info
  960. def _load_login_state(self) -> Optional[Dict]:
  961. """加载本地登录状态"""
  962. if os.path.exists(self.login_state_path):
  963. try:
  964. with open(self.login_state_path, "r", encoding="utf-8") as f:
  965. return json.load(f)
  966. except json.JSONDecodeError:
  967. print(f"⚠️ 登录状态文件损坏:{self.login_state_path},将重新登录")
  968. os.remove(self.login_state_path)
  969. return None
  970. def _save_login_state(self) -> None:
  971. """保存登录状态到本地"""
  972. if self.context:
  973. try:
  974. self.context.storage_state(path=self.login_state_path)
  975. print(f"✅ 登录状态已保存到:{self.login_state_path}")
  976. except Exception as e:
  977. print(f"⚠️ 保存登录状态失败:{e}")
  978. def init_browser(self) -> bool:
  979. """初始化浏览器(加载登录状态/提示登录)"""
  980. try:
  981. # ✅ 核心修改 1:全局只启动【一次】 Playwright 引擎!存到 self.pw 中
  982. self.pw = sync_playwright().start()
  983. # ✅ 核心修改 2:把启动好的引擎传给天眼查去用
  984. self.tyc_browser.init_browser(self.pw)
  985. # 启动优化后的浏览器
  986. # ✅ 核心修改 3:拼多多也用这同一个引擎启动浏览器
  987. self.browser = self.pw.chromium.launch(
  988. headless=PLAYWRIGHT_CONFIG["headless"],
  989. slow_mo=PLAYWRIGHT_CONFIG["slow_mo"],
  990. args=PLAYWRIGHT_CONFIG["browser_args"],
  991. ignore_default_args=["--enable-automation"], # 隐藏自动化标识
  992. timeout=60000
  993. )
  994. # 加载登录状态或创建新上下文
  995. login_state = self._load_login_state()
  996. if login_state:
  997. self.context = self.browser.new_context(
  998. viewport=PLAYWRIGHT_CONFIG["viewport"],
  999. locale=PLAYWRIGHT_CONFIG["locale"],
  1000. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  1001. ignore_https_errors=True,
  1002. storage_state=login_state # 加载登录状态
  1003. )
  1004. print("✅ 已加载本地登录状态")
  1005. else:
  1006. self.context = self.browser.new_context(
  1007. viewport=PLAYWRIGHT_CONFIG["viewport"],
  1008. locale=PLAYWRIGHT_CONFIG["locale"],
  1009. timezone_id=PLAYWRIGHT_CONFIG["timezone_id"],
  1010. ignore_https_errors=True
  1011. )
  1012. print("\n⚠️ 未检测到登录状态,请先完成拼多多登录!")
  1013. self.page = self.context.new_page()
  1014. self.page.goto("https://www.pinduoduo.com", timeout=30000)
  1015. input("请在浏览器中完成登录,登录后按回车继续...")
  1016. self.context.storage_state(path=self.login_state_path) # ✅ 加await保存状态
  1017. # 初始化页面
  1018. self.page = self.context.new_page()
  1019. self.page.set_default_timeout(PLAYWRIGHT_CONFIG["default_timeout"])
  1020. self.page.set_default_navigation_timeout(PLAYWRIGHT_CONFIG["navigation_timeout"])
  1021. return True
  1022. except Exception as e:
  1023. print(f"❌ 浏览器初始化失败:{e}")
  1024. self.close() # ✅ 核心修复:加await
  1025. return False
  1026. def open_links(self, goods_data: List[Dict], db_reader: DBGoodsReader) -> List[Dict]:
  1027. """依次打开商品链接(支持店名过滤+图片下载+OCR识别+天眼查搜索提取)"""
  1028. if not self.page:
  1029. print("❌ 浏览器未初始化")
  1030. return []
  1031. total = len(goods_data)
  1032. if total == 0:
  1033. print("⚠️ 无商品链接可处理")
  1034. return []
  1035. print(f"\n📋 共待处理 {total} 条商品链接")
  1036. # 收集所有抓取到的结果(可选,如果想最后统一保存的话)
  1037. all_results = []
  1038. for idx, item in enumerate(goods_data, 1):
  1039. shop = item.get("shop", "未知店铺").strip()
  1040. link = (item.get("product_link") or "").strip()
  1041. if not link:
  1042. print(f"\n⚠️ 第{idx}/{total}条:店铺【{shop}】链接为空,跳过")
  1043. continue
  1044. print(f"\n{'=' * 15} 第 {idx}/{total} 条 {'=' * 15}")
  1045. print(f"🏪 数据库店名:{shop}")
  1046. print(f"🔗 商品链接:{link}")
  1047. # ========== 判断店名是否包含“旗舰店” ==========
  1048. if "旗舰店" not in shop:
  1049. print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称不含“旗舰店”,跳过")
  1050. #涉及突破滑块验证的部分了。
  1051. continue
  1052. else:
  1053. print(f"⚠️ 第{idx}/{total}条:店铺【{shop}】名称包含“旗舰店”,打开商品链接")
  1054. try:
  1055. # 1. 打开商品链接
  1056. self.page.goto(
  1057. link,
  1058. wait_until="load",
  1059. timeout=PLAYWRIGHT_CONFIG["navigation_timeout"]
  1060. )
  1061. self.page.wait_for_load_state("networkidle", timeout=15000)
  1062. print(f"✅ 页面加载成功:{self.page.title()}...")
  1063. # 检测登录框
  1064. self._check_login_box()
  1065. #如果已售罄,不跳过。
  1066. # sold_out_locator = self.page.locator("")
  1067. # if sold_out_locator.count() > 0 :
  1068. # print("该商品已售罄,跳过这次采集")
  1069. #
  1070. # 2. 店名匹配判断
  1071. page_shop_locator = self.page.locator("div.BAq4Lzv7")
  1072. try:
  1073. page_shop_locator.wait_for(timeout=5000)
  1074. page_shop_text = (page_shop_locator.inner_text()).strip().lower()
  1075. except PlaywrightTimeoutError:
  1076. print(f"❌ 未找到页面店名元素,可能页面结构改变或被风控,跳过")
  1077. continue
  1078. db_shop_text = shop.lower()
  1079. print(f"🏪 页面元素店铺名:{page_shop_text}")
  1080. if page_shop_text != db_shop_text:
  1081. print(f"❌ 店名不匹配(数据库:{db_shop_text} | 页面:{page_shop_text}),跳过")
  1082. self.page.wait_for_timeout(2000)
  1083. continue
  1084. print(f"✅ 店名匹配成功!")
  1085. # 自定义滑动距离,触发图片懒加载
  1086. self._scroll_down(distance=2100)
  1087. # ========== 获取图片src并下载 ==========
  1088. # shop_name = 'pdd_shop_info_middle_back'
  1089. final_enterprise_info = None
  1090. try:
  1091. # ========== 原定位策略(优先使用) ==========
  1092. img_locators = self.page.locator("img[role='img'][aria-label='查看图片']")
  1093. img_count = img_locators.count()
  1094. # ========== 原定位不足时,切换到备用定位 ==========
  1095. if img_count < 2:
  1096. print(f"⚠️ 原定位仅匹配到{img_count}个图片,尝试备用定位(拼多多懒加载图片)...")
  1097. input("请手动检查页面图片元素,按回车继续...")
  1098. continue
  1099. # 备用定位:匹配截图里的「pdd-lazy-image」类资质图片(带水印的营业执照)
  1100. # backup_img_locators = self.page.locator(
  1101. # "img.pdd-lazy-image.loaded" # 精准匹配已加载的懒加载图片
  1102. # )
  1103. #
  1104. # backup_count = backup_img_locators.count()
  1105. #
  1106. # if backup_count >= 2:
  1107. # img_locators = backup_img_locators
  1108. # img_count = backup_count
  1109. # print(f"✅ 备用定位生效,匹配到图片元素:{img_count} 个")
  1110. # else:
  1111. # print(f"⚠️ 原定位({img_count}个) + 备用定位({backup_count}个)均不足2个,跳过下载")
  1112. # input("请手动检查页面图片元素,按回车继续...")
  1113. # continue # 跳过当前店铺,避免卡死
  1114. print(f"📸 匹配到图片元素:{img_count} 个")
  1115. # 3. 定位第二个元素
  1116. target_img_locator = img_locators.nth(1)
  1117. target_img_locator.wait_for(timeout=5000, state="visible")
  1118. # 4. 获取第二个图片的src
  1119. img_src = target_img_locator.get_attribute("src")
  1120. if img_src:
  1121. print(f"🖼️ 第2个图片 src:{img_src[:80]}...")
  1122. image_path = self._download_image(img_src, shop)
  1123. if image_path:
  1124. # ========== 核心:调用OCR并获取最终的天眼查数据 ==========
  1125. final_enterprise_info = self._process_ocr(image_path)
  1126. else:
  1127. print(f"⚠️ 第2个图片的src为空")
  1128. except Exception as e:
  1129. print(f"❌ 获取图片/识别失败:{str(e)[:100]}")
  1130. # 3. 收集数据并自动循环
  1131. if final_enterprise_info:
  1132. # 将原数据库的店名也塞进去,方便后续入库对比
  1133. print(f"天眼查---查出来的数据为{final_enterprise_info}")
  1134. # final_enterprise_info['pdd_shop_name'] = shop
  1135. all_results.append(final_enterprise_info)
  1136. # 获取到的数据回填数据库
  1137. update_success = db_reader.insert_enterprise_info(
  1138. shop_name=shop,
  1139. enterprise_info=final_enterprise_info, # 直接传入天眼查返回的字典
  1140. )
  1141. if update_success:
  1142. print(f"✅ 店铺[{shop}]数据回填成功")
  1143. else:
  1144. print(f"❌ 店铺[{shop}]数据回填失败")
  1145. print(f"\n🎉 成功获取数据,准备进入下一条...")
  1146. else:
  1147. print(f"\n⚠️ 本条未获取到有效企业信息,准备进入下一条...")
  1148. self.page.wait_for_timeout(5000)
  1149. except PlaywrightTimeoutError:
  1150. print(f"⏰ 页面加载/元素定位超时:{link}")
  1151. input("排查问题")
  1152. continue
  1153. except Exception as e:
  1154. print(f"❌ 第{idx}条处理异常:{str(e)[:100]}...,跳过")
  1155. continue
  1156. return all_results
  1157. def close(self) -> None:
  1158. """关闭浏览器(异步版,补全所有await)"""
  1159. # 先关闭天眼查浏览器
  1160. if hasattr(self, 'tyc_browser') and self.tyc_browser:
  1161. self.tyc_browser.close()
  1162. if hasattr(self, 'pw') and self.pw:
  1163. try:
  1164. self.pw.stop()
  1165. print("✅ Playwright 驱动已彻底停止")
  1166. except Exception:
  1167. pass
  1168. # 关闭拼多多浏览器
  1169. if self.page:
  1170. try:
  1171. self.page.close()
  1172. except Exception:
  1173. pass
  1174. if self.context:
  1175. try:
  1176. self.context.close()
  1177. except Exception:
  1178. pass
  1179. if self.browser:
  1180. try:
  1181. self.browser.close()
  1182. print("✅ 拼多多浏览器已关闭")
  1183. except Exception:
  1184. pass
  1185. def main():
  1186. """主函数:整合数据库读取+链接浏览"""
  1187. # 1. 读取环境变量并补全默认值
  1188. db_config = {
  1189. "host": os.getenv("DB_HOST", DEFAULT_DB_CONFIG["host"]),
  1190. "port": int(os.getenv("DB_PORT", DEFAULT_DB_CONFIG["port"])),
  1191. "user": os.getenv("DB_USERNAME", DEFAULT_DB_CONFIG["user"]),
  1192. "password": os.getenv("DB_PASSWORD", DEFAULT_DB_CONFIG["password"]),
  1193. "db_name": os.getenv("DB_DATABASE", DEFAULT_DB_CONFIG["db_name"]),
  1194. "table_name": os.getenv("DB_TABLENAME", DEFAULT_DB_CONFIG["table_name"])
  1195. }
  1196. # 2. 初始化数据库读取器
  1197. db_reader = DBGoodsReader(
  1198. host=db_config["host"],
  1199. port=db_config["port"],
  1200. user=db_config["user"],
  1201. password=db_config["password"],
  1202. db_name=db_config["db_name"]
  1203. )
  1204. if not db_reader.connect_db():
  1205. return
  1206. # 3. 读取商品链接
  1207. goods_data = db_reader.get_shop_and_goods()
  1208. # 预览前5条数据
  1209. if goods_data:
  1210. print("\n📌 数据预览(前5条):")
  1211. for idx, item in enumerate(goods_data[:5], 1):
  1212. print(f"第{idx}条 | 店铺:{item['shop'][:20]} | 链接:{item['product_link'][:50]}...")
  1213. # 4. 初始化浏览器并打开链接
  1214. # 初始化拼多多浏览器
  1215. pdd_browser = PddLinkBrowser()
  1216. if not pdd_browser.init_browser():
  1217. return
  1218. # 接收返回的所有提取结果
  1219. extracted_data = pdd_browser.open_links(goods_data, db_reader)
  1220. # 打印最终统计
  1221. print(f"\n📊 爬取任务结束,共成功提取 {len(extracted_data)} 条企业信息!")
  1222. if extracted_data:
  1223. # 这里你可以将 extracted_data 写入数据库,或者存为 json/csv
  1224. # 例如打印第一条看看:
  1225. print("💡 最终数据示例:", json.dumps(extracted_data[0], ensure_ascii=False, indent=2))
  1226. pdd_browser.close()
  1227. db_reader.close_db()
  1228. if __name__ == "__main__":
  1229. try:
  1230. main()
  1231. except KeyboardInterrupt:
  1232. print("\n⚠️ 程序被用户中断")
  1233. except Exception as e:
  1234. print(f"\n❌ 程序运行出错:{e}")