from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError from logger_config import logger from datetime import datetime import random import csv import os import time import json import pymysql from pymysql.err import OperationalError, ProgrammingError, DataError from config import * import re import uuid import requests import base64 from io import BytesIO from PIL import Image import traceback # ===================== 工具函数:获取当前时间字符串 ===================== def get_current_time(): """统一日志时间格式""" return datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 代理IP池 PROXY_POOL_URL ="" PROXY_VALIDATION_URL = "" # 用于验证代理有效性的URL PROXY_TIMEOUT = 10 # 代理验证超时时间(秒) def get_random_proxy(): """从代理池获取随机代理IP""" try: response = requests.get(PROXY_POOL_URL, timeout=10) if response.status_code == 200: proxy = response.text.strip() if validate_proxy(proxy): logger.info(f"获取到有效代理: {proxy}") return proxy logger.warning(f"代理无效: {proxy}") except Exception as e: logger.error(f"获取代理失败: {str(e)}") return None def validate_proxy(proxy): """验证代理IP有效性""" try: proxies = { "http": f"http://{proxy}", "https": f"https://{proxy}" } response = requests.get( PROXY_VALIDATION_URL, proxies=proxies, timeout=PROXY_TIMEOUT ) return response.status_code == 200 except: return False def init_browser_with_proxy(playwright): proxy = get_random_proxy() proxy_config = None if proxy: proxy_server, proxy_port = proxy.split(":") proxy_config = { "server": f"http://{proxy_server}:{proxy_port}", # "username": "your_proxy_username", # "password": "your_proxy_password" } logger.info(f"使用代理: {proxy_server}:{proxy_port}") else: logger.warning("未获取到有效代理,将使用本地IP") # 启动浏览器(保留原有反爬配置) return playwright.chromium.launch( headless=False, # 非无头模式 channel="chrome", # 使用Chrome内核 slow_mo=random.randint(100, 300), # 随机操作延迟 proxy=proxy_config, # 代理配置(None则不使用代理) args=[ "--disable-blink-features=AutomationControlled", # 核心反检测 "--enable-automation=false", "--disable-infobars", "--remote-debugging-port=0", "--start-maximized", "--disable-extensions", "--disable-plugins-discovery", "--no-sandbox", "--disable-dev-shm-usage", # 随机Chrome版本UA f"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(110, 120)}.0.0.0 Safari/537.36" ] ) # ==================== 2. 反爬工具函数 ==================== def random_delay(min_seconds, max_seconds): """生成随机延迟(核心反爬:避免固定间隔)""" delay = random.uniform(min_seconds, max_seconds) time.sleep(delay) return delay def simulate_human_typing(page, locator, text): """模拟真人打字(逐个字符输入,带随机间隔)""" try: locator.click() locator.clear() for char in text: locator.type(char, delay=random.uniform(MIN_INPUT_DELAY, MAX_INPUT_DELAY)) random_delay(0.05, 0.1) # 字符间额外小延迟 logger.info(f" 模拟真人输入完成:{text}") except Exception as e: logger.error(f"模拟打字失败:{e}") locator.fill(text) # 兜底:直接填充 def save_cookies(context, cookie_path=COOKIE_FILE_PATH): """保存Cookie到本地JSON文件""" try: cookies = context.cookies() with open(cookie_path, "w", encoding="utf-8") as f: json.dump(cookies, f, ensure_ascii=False, indent=2) logger.info(f"Cookie已保存到:{cookie_path}") return True except Exception as e: logger.error(f" 保存Cookie失败:{e}") return False def load_cookies(context, cookie_path=COOKIE_FILE_PATH): """从本地JSON文件加载Cookie到浏览器上下文""" if not os.path.exists(cookie_path): logger.warning(f" Cookie文件不存在:{cookie_path}") return False try: with open(cookie_path, "r", encoding="utf-8") as f: cookies = json.load(f) context.add_cookies(cookies) logger.info(f"✅ 已从{cookie_path}加载Cookie") return True except Exception as e: logger.error(f" 加载Cookie失败:{e}") return False def is_login(page): """验证是否已登录(核心:检测登录态)""" try: # 访问需要登录的页面 page.goto(LOGIN_VALIDATE_URL, timeout=300000) page.wait_for_load_state("networkidle") # 检测是否跳转到登录页(URL包含login则未登录) if "login" in page.url.lower(): logger.warning(" Cookie失效,需要重新登录") return False # 可选:检测登录后的专属元素(比如用户名、个人中心等) # if page.locator("用户中心选择器").count() > 0: # return True logger.info(" Cookie有效,已保持登录状态") return True except Exception as e: logger.error(f" 验证登录状态失败:{e}") return False # ==================== 滚动函数重构(核心修改) ==================== def slow_scroll_400px(page,scroll_distance1=400): """ 慢速滚动400px±50px(模拟真人滑动) :param page: 页面对象 :return: 滚动是否成功 """ try: # 生成400±50px的随机滚动距离 scroll_distance = random.randint( scroll_distance1 - SCROLL_OFFSET_RANGE, scroll_distance1 + SCROLL_OFFSET_RANGE ) remaining_distance = scroll_distance total_steps = int(scroll_distance / SCROLL_STEP) logger.info( f"📜 开始慢速滚动(目标距离:{scroll_distance}px,总步数:{total_steps},总时长约{total_steps*SCROLL_INTERVAL:.2f}秒)" ) # 渐进式滚动(每步50px,间隔0.05秒) for _ in range(total_steps): step = min(SCROLL_STEP, remaining_distance) page.evaluate(f"window.scrollBy(0, {step});") remaining_distance -= step time.sleep(SCROLL_INTERVAL) # 处理剩余不足一步的距离 if remaining_distance > 0: page.evaluate(f"window.scrollBy(0, {remaining_distance});") time.sleep(SCROLL_INTERVAL) # 滚动后等待懒加载完成 page.wait_for_load_state("networkidle", timeout=8000) random_delay(2.0, 3.0) # 滚动后额外停顿,模拟真人 logger.info(f" 慢速滚动完成,实际滚动距离:{scroll_distance - remaining_distance}px") return True except Exception as e: logger.warning(f" 慢速滚动失败:{e}") return False # def check_anti_crawl(page): # """检测反爬弹窗/验证码(核心:提前识别反爬)""" # anti_crawl_selectors = [ # "//div[contains(text(), '验证')]", # "//div[contains(text(), '人机验证')]", # "//div[contains(text(), '访问过于频繁')]", # "//button[contains(text(), '验证')]" # ] # for selector in anti_crawl_selectors: # if page.locator(selector).count() > 0: # logger.error("❌ 检测到反爬验证弹窗!请手动完成验证后按回车继续...") # input() # 暂停等待手动验证 # return True # return False # CSV配置 CSV_FILE_PATH = f"ybm_collect_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" # CSV保存路径 CSV_HEADERS = [ "商品标题", "商品采购价格", "商品折扣价格", "规格", "盒数", "店铺名称", "公司名称", "有效日期", "生产日期", "批准文号", "采集时间" ] #表头 # ==================== 登录函数 ==================== def login_operation(page, username, password): """登录操作函数""" try: # 输入手机号(直接用单个变量) page.wait_for_selector(USERNAME_SELECTOR, timeout=ELEMENT_TIMEOUT, state="visible") page.wait_for_timeout(timeout=3000) page.fill(USERNAME_SELECTOR, username) logger.info(" 已输入登录账号") # 输入密码 page.wait_for_selector(PASSWORD_SELECTOR, timeout=ELEMENT_TIMEOUT, state="visible") page.wait_for_timeout(timeout=3000) page.fill(PASSWORD_SELECTOR, password) logger.info(" 已输入登录密码") random_delay(1, 2) agree_btn = page.locator('span.el-checkbox__inner') agree_btn.click() # 点击登录按钮 page.wait_for_selector(LOGIN_BTN_SELECTOR, timeout=ELEMENT_TIMEOUT) page.wait_for_timeout(timeout=3000) page.click(LOGIN_BTN_SELECTOR) logger.info(" 已点击登录按钮") page.wait_for_timeout(LOGIN_AFTER_CLICK) return True except PlaywrightTimeoutError as e: logger.error(f" 登录失败:元素定位超时 - {str(e)}") return False except Exception as e: logger.error(f" 登录异常:{str(e)}") return False def kill_masks(page): """ 强制清理残留遮罩层/覆盖层,并恢复 body 可滚动、可点击状态 """ page.evaluate(r""" () => { const removed = []; const hidden = []; // 1) 先处理已知常见遮罩 const knownSelectors = [ '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper', '.el-message-box__wrapper', '.el-loading-mask', '.el-popup-parent--hidden' ]; for (const sel of knownSelectors) { document.querySelectorAll(sel).forEach(el => { // v-modal / overlay 直接 remove 最省事 removed.push(sel); el.remove(); }); } // 2) 再做一次“泛化兜底”:全屏 fixed/absolute + 高 z-index 的覆盖层 // 注意:不要误删页面正常的固定导航,所以加上“近似全屏”的判断 const all = Array.from(document.querySelectorAll('body *')); for (const el of all) { const s = window.getComputedStyle(el); if (!s) continue; const z = parseInt(s.zIndex || '0', 10); const pos = s.position; const pe = s.pointerEvents; if ((pos === 'fixed' || pos === 'absolute') && z >= 1000 && pe !== 'none') { const r = el.getBoundingClientRect(); const nearFullScreen = r.width >= window.innerWidth * 0.8 && r.height >= window.innerHeight * 0.8 && r.left <= window.innerWidth * 0.1 && r.top <= window.innerHeight * 0.1; // 常见遮罩是半透明背景色,或者透明但拦截点击 const bg = s.backgroundColor || ''; const looksLikeMask = nearFullScreen && (bg.includes('rgba') || bg.includes('rgb') || s.opacity !== '1'); if (nearFullScreen) { // 不管透明不透明,只要近似全屏且高 z-index,就先让它不拦截点击 el.style.pointerEvents = 'none'; el.style.display = 'none'; hidden.push(el.tagName + '.' + (el.className || '')); } } } // 3) 恢复 body / html 的滚动与交互(很多弹窗会锁滚动) document.documentElement.style.overflow = 'auto'; document.body.style.overflow = 'auto'; document.body.style.position = 'static'; document.body.style.width = 'auto'; document.body.style.paddingRight = '0px'; // 4) 去掉 Element-UI 常见的锁定 class document.body.classList.remove('el-popup-parent--hidden'); return { removed, hiddenCount: hidden.length, hidden }; } """) def force_close_popup(page): """关闭新手引导/遮罩(多步:下一步/完成/我知道了),并兜底移除遮罩层""" try: # 1) 尝试连续点“下一步/完成/我知道了/关闭” for _ in range(5): # 最多点5次,足够覆盖多步引导 btn = page.locator( "//button[normalize-space()='下一步' or normalize-space()='完成' or normalize-space()='我知道了' or normalize-space()='关闭']" ).first if btn.count() > 0 and btn.is_visible(): btn.click(timeout=1500) page.wait_for_timeout(300) continue # 有些引导是右上角 X(如果存在就点) close_icon = page.locator( "xpath=//*[contains(@class,'close') or contains(@class,'el-icon-close') or name()='svg' or name()='i'][1]" ).first if close_icon.count() > 0 and close_icon.is_visible(): close_icon.click(timeout=1000) page.wait_for_timeout(300) continue break # 2) 兜底:移除常见遮罩层(element-ui / 通用 mask/overlay) page.evaluate(""" const selectors = [ '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper', '[class*="mask"]', '[class*="overlay"]', '[style*="z-index"]' ]; for (const sel of selectors) { document.querySelectorAll(sel).forEach(el => { const s = window.getComputedStyle(el); // 只移除“覆盖层”倾向的元素:fixed/absolute 且 z-index 很高 if ((s.position === 'fixed' || s.position === 'absolute') && parseInt(s.zIndex || '0', 10) >= 1000) { el.remove(); } }); } """) except Exception: pass # 调用方式和方案1一致:在搜索后、采集前执行 # force_close_popup(page) def pick_search_input(page): """优先选可见且可用的搜索输入框;第一个不行就尝试第二个""" inputs = page.locator(SEARCH_INPUT_SELECTOR) cnt = inputs.count() # 优先检查前两个(你说只有两个) for i in range(min(cnt, 2)): candidate = inputs.nth(i) try: candidate.wait_for(state="visible", timeout=1500) # 小超时快速试探 if candidate.is_enabled(): return candidate except PlaywrightTimeoutError: continue # 兜底:直接找任意可见的(避免命中 hidden 模板) candidate = page.locator(f"{SEARCH_INPUT_SELECTOR}:visible").first candidate.wait_for(state="visible", timeout=ELEMENT_TIMEOUT) return candidate def type_slow(locator, text: str, min_delay=0.06, max_delay=0.18): """逐字输入,模拟真人打字""" for ch in text: locator.type(ch, delay=int(random.uniform(min_delay, max_delay) * 1000)) # ==================== 搜索操作函数 ==================== def search_operation(page, keyword, is_first_search: bool = True): """ 搜索框填充+提交搜索 :param page: 页面对象 :param keyword: 搜索关键词 :param is_first_search: 是否是首次搜索(首次开新页面,后续原页面跳转) :return: (detail_page, 是否成功) """ try: # 1) 找到“可用”的搜索框(第一个不行就用第二个) search_locator = page.locator(SEARCH_INPUT_SELECTOR) # 清空并填充搜索框 search_locator.wait_for(timeout=ELEMENT_TIMEOUT) # 2. 清空搜索框(双重保障:先调用locator的clear,再手动全选删除) search_locator.click(force=True) # 聚焦 search_locator.fill("") page.keyboard.down("Control") # 按住Control键 page.keyboard.press("a") # 按a键 page.keyboard.up("Control") # 松开Control键 page.keyboard.press("Backspace") # 删除选中内容 # 3) 逐字输入 type_slow(search_locator, keyword, min_delay=0.06, max_delay=0.18) # 3. 输入搜索关键词 # search_locator.fill(keyword) logger.info(f"📝 已输入搜索关键词:{keyword}") # 3) 搜索按钮也建议点可见的那个 btn = page.locator(f"{SEARCH_BTN_SELECTOR}") btn.wait_for(state="visible", timeout=SEARCH_BTN_TIMEOUT) # btn.click() page.wait_for_timeout(3000) detail_page = page if is_first_search: #获取新页面对象 try: # 先开始监听新页面事件(在点击前) with page.context.expect_page(timeout=60000) as new_page_info: # 再执行点击操作 btn.click() # 点击后获取新页面 detail_page = new_page_info.value detail_page.wait_for_load_state("networkidle", timeout=20000) # #点击出现的按钮 # test_btn = detail_page.locator("div[data-v-c65c36bc].first-time-highlight-message-btn button") # btn_count = test_btn.count() # logger.info(f"✅ 匹配到的元素数量:{btn_count}") # test_btn.wait_for(state="attached", timeout=5000) # test_btn.click() except PlaywrightTimeoutError: logger.warning(f"{get_current_time()} 未检测到新标签页") return None, False except Exception as e: logger.warning(f"{get_current_time()} 等待新标签页异常:{e}") return None, False else: btn.click() # 等待原页面跳转并加载完成(替代新页面监听) page.wait_for_load_state("networkidle", timeout=20000) # 详情页就是原页面,无需新建 detail_page = page logger.info("✅ 后续搜索:已在原页面完成跳转加载") test_btn = detail_page.locator("div[data-v-c65c36bc].first-time-highlight-message-btn button") btn_count = test_btn.count() logger.info(f"✅ 匹配到的元素数量:{btn_count}") if btn_count > 0: test_btn.wait_for(state="attached", timeout=5000) test_btn.click() force_close_popup(detail_page) kill_masks(detail_page) logger.info("✅ 已触发搜索") return detail_page, True # 搜索后等待结果加载 # page.wait_for_timeout(COLLECT_DELAY) # return True except PlaywrightTimeoutError as e: logger.error(f" 搜索失败:元素定位超时 - {str(e)}") return None, False # 失败时返回 (None, False) except Exception as e: logger.error(f" 搜索异常:{str(e)}") return None, False # 失败时返回 (None, False) #翻下一页 def goto_next_page(page) -> bool: """ 基于 button.btn-next 的 aria-disabled 属性判断是否有下一页 :param page: 搜索结果页面对象(detail_page) :return: True=翻页成功,False=无下一页/翻页失败 """ try: next_btn = page.locator("button.btn-next").first # 2. 先等待按钮加载(确保元素存在) next_btn.wait_for(state="attached", timeout=3000) # 3. 获取 aria-disabled 属性值(核心判断依据) aria_disabled = next_btn.get_attribute("aria-disabled") logger.info(f"下一页按钮 aria-disabled 属性值:{aria_disabled}") # 4. 判断是否有下一页:aria-disabled="true" 表示无下一页 if aria_disabled == "true": logger.warning("⚠️ 下一页按钮 aria-disabled=true,已无更多页面") return False page.wait_for_timeout(500) # 6. 确保按钮可见且可点击(强制点击兜底) if next_btn.is_visible() and next_btn.is_enabled(): next_btn.click(timeout=5000) else: # 兜底:强制点击(避免元素不可见但实际可点击的情况) next_btn.click(force=True, timeout=5000) logger.info("✅ 翻页成功,下一页按钮 aria-disabled=false") return True except PlaywrightTimeoutError: logger.warning("⚠️ 下一页按钮加载超时,判定无更多页面") return False except Exception as e: logger.warning(f"⚠️ 翻页操作异常:{e},判定无更多页面") return False def popup_guard(page, tag=""): """ 全局弹窗/遮罩守卫:多步引导 + 关闭按钮 + 遮罩清理 + 恢复滚动 tag 仅用于日志区分调用位置 """ try: # 给弹窗一点出现时间 page.wait_for_timeout(300) # 1) 连续点“下一步/完成/我知道了/关闭” for _ in range(6): btn = page.locator( "xpath=//button[normalize-space()='下一步' or normalize-space()='完成' or normalize-space()='我知道了' or normalize-space()='关闭']" ).first if btn.count() > 0 and btn.is_visible(): btn.click(timeout=1500) page.wait_for_timeout(250) continue # 2) 常见的 close icon close_btn = page.locator( "css=.el-dialog__headerbtn, .el-message-box__headerbtn, .close, .icon-close, .el-icon-close" ).first if close_btn.count() > 0 and close_btn.is_visible(): close_btn.click(timeout=1200) page.wait_for_timeout(250) continue break # 3) 清遮罩 + 恢复滚动/交互 page.evaluate(r""" () => { // 第一步:精准清理已知的遮罩/弹窗类名(Element UI框架常用) const selectors = [ '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper', '.el-message-box__wrapper', '.el-loading-mask' ]; selectors.forEach(sel => document.querySelectorAll(sel).forEach(e => e.remove())); // 泛化兜底:近似全屏 + 高 z-index 的层直接屏蔽 const all = Array.from(document.querySelectorAll('body *')); for (const el of all) { const s = getComputedStyle(el); // 获取元素的实际样式(含CSS生效的样式) const z = parseInt(s.zIndex || '0', 10); // 取元素的层级(z-index),默认0 // 条件1:元素是固定/绝对定位(弹窗/遮罩常见定位方式)+ 层级≥1000(高优先级遮挡)+ 能拦截鼠标事件 if ((s.position === 'fixed' || s.position === 'absolute') && z >= 1000 && s.pointerEvents !== 'none') { const r = el.getBoundingClientRect(); // 获取元素的尺寸和位置 // 条件2:元素宽度/高度≥屏幕80%(近似全屏遮罩) const nearFull = r.width >= innerWidth * 0.8 && r.height >= innerHeight * 0.8; if (nearFull) { el.style.pointerEvents = 'none'; // 让元素不拦截鼠标点击 el.style.display = 'none'; // 隐藏元素 } } } // 第三步:恢复页面滚动功能(弹窗常把页面设为不可滚动) document.documentElement.style.overflow = 'auto'; // html标签恢复滚动 document.body.style.overflow = 'auto'; // body标签恢复滚动 document.body.classList.remove('el-popup-parent--hidden'); // 移除Element UI的滚动禁用类 } """) logger.info("杀除弹窗成功") except Exception: pass def open_detail_page(list_page, item, keyword, idx, *, timeout=15000): """ 点击商品进入详情页,兼容: 1) 新开 tab(返回 detail_page != list_page, opened_new_tab=True) 2) 同 tab 跳转(detail_page == list_page, opened_new_tab=False) """ ctx = list_page.context list_url = list_page.url detail_page = None opened_new_tab = False try: # 期望新开 tab(很多站点会这样) with ctx.expect_page(timeout=timeout) as p: item.click(delay=random.uniform(0.1, 0.3)) detail_page = p.value opened_new_tab = True logger.info(f" 「{keyword}」第{idx}个商品 - 新开标签页进入详情") except PlaywrightTimeoutError: # 兜底:没新开 tab,大概率是同页跳转/弹层 detail_page = list_page opened_new_tab = False logger.info(f" 「{keyword}」第{idx}个商品 - 未新开标签页,按同页进入详情处理") return detail_page, opened_new_tab, list_url def return_to_list(list_page, detail_page, opened_new_tab, list_url, keyword, idx): """ 从详情页返回列表页: - 新 tab:关闭 tab,然后 bring_to_front 切回 - 同 tab:尽量 go_back 回到 list_url;如果没跳转而是弹层,尝试 ESC """ # 如果浏览器/页面已经被关了,直接退出,避免二次异常 if list_page is None or list_page.is_closed(): logger.warning(f" 「{keyword}」第{idx}个商品 - 列表页已关闭,无法切回") return if opened_new_tab: # 只关“新开的详情 tab”,绝不关 list_page try: if detail_page and (detail_page is not list_page) and (not detail_page.is_closed()): detail_page.close() logger.info(f"📌 「{keyword}」第{idx}个商品 - 已关闭详情页标签页") except Exception as e: logger.warning(f" 「{keyword}」第{idx}个商品 - 关闭详情页失败:{e}") # 切回列表页 try: list_page.bring_to_front() list_page.mouse.move(random.randint(100, 300), random.randint(200, 400)) random_delay(0.3, 0.8) list_page.wait_for_load_state("networkidle") logger.info(f" 「{keyword}」第{idx}个商品 - 已切回列表页(新tab模式)") except Exception as e: logger.warning(f" 「{keyword}」第{idx}个商品 - 切回列表页失败:{e}") return # 同 tab:detail_page == list_page try: # 1) 如果 URL 变了,说明确实跳转了 → go_back 回去 if list_page.url != list_url: for _ in range(3): # 最多退 3 次,防止死循环 list_page.go_back(timeout=15000) list_page.wait_for_load_state("domcontentloaded", timeout=15000) random_delay(0.2, 0.5) if list_page.url == list_url: break logger.info(f" 「{keyword}」第{idx}个商品 - 已返回列表页(同tab跳转模式)") else: # 2) URL 没变:可能是弹层详情 → 尝试 ESC 关闭弹层 list_page.keyboard.press("Escape") random_delay(0.2, 0.5) logger.info(f" 「{keyword}」第{idx}个商品 - 已尝试关闭弹层并留在列表页(同tab弹层模式)") list_page.bring_to_front() list_page.wait_for_load_state("networkidle") except Exception as e: logger.warning(f" 「{keyword}」第{idx}个商品 - 同tab返回列表页失败:{e}") #判断店名是否已经在数据库 def shop_is_exists_database(shop): try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor(pymysql.cursors.DictCursor) # 改为字典游标 query_sql = """ SELECT province, city, business_license_company, qualification_number FROM ybm_shop_info_middle WHERE shop = %s """ cursor.execute(query_sql, (shop,)) result = cursor.fetchone() # 正确的调试方式(替代cursor._last_executed) print(f"【调试】传入的店铺名:{repr(shop)}") # repr能显示空格/隐藏字符 print(f"【调试】查询参数:{shop}") print(f"【调试】查询结果:{result} → 函数返回:{bool(result)}") is_exists = bool(result) if is_exists: logger.info(f"【店铺存在校验】店铺已存在 | 店铺名:{repr(shop)} | 结果:存在(True)不要执行采集店铺") else: logger.info(f"【店铺存在校验】店铺不存在 | 店铺名:{repr(shop)} | 结果:不存在(False)") return is_exists, result except Exception as e: logger.error(f"查询店铺失败:{e}") return False, None # 异常时明确返回False,避免返回None finally: # 修复:关闭游标和连接,避免泄露 if cursor: cursor.close() if conn: conn.close() def insert_shop_info_to_db(shop,contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform, province, city, create_time, update_time): """ 把字段插入到ybm_shop_info_middle表 :param 各参数: 你要插入的字段值(空字符串也可) :return: bool - 插入成功返回True,失败返回False """ # 1. 初始化数据库连接和游标 conn = None cursor = None try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() # 2. 构造INSERT SQL语句(参数化查询,防止SQL注入) # 注意:请确认ybm_shop_info_middle表的字段名和以下%s的顺序对应! # 若表字段名不同,修改INSERT后的字段列表(比如你的表字段是credit_code而非qualification_number,要对应改) sql = """ INSERT INTO ybm_shop_info_middle ( shop, contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform, province, city, create_time, update_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE contact_address = VALUES(contact_address), # 重复时更新联系地址 qualification_number = VALUES(qualification_number), # 更新社会信用代码 business_license_company = VALUES(business_license_company), # 更新公司名 business_license_address = VALUES(business_license_address), # 更新地址 scrape_date = VALUES(scrape_date), platform = VALUES(platform), province = VALUES(province), city = VALUES(city), update_time = VALUES(update_time) # 重复时更新update_time """ # 3. 构造插入的参数(顺序必须和SQL中的%s一一对应) params = ( shop, # 店铺名称 contact_address, # 联系地址 qualification_number, # 社会信用代码 business_license_company, # 营业执照公司名 business_license_address, # 营业执照地址 scrape_date, # 爬取日期 platform, # 平台名称(药九九) province, # 省份 city, # 城市 create_time, # create_time(当前时间) update_time ) # 4. 执行SQL并提交事务 cursor.execute(sql, params) conn.commit() print(f"✅ 数据插入成功!店铺:{shop} | 公司:{business_license_company}") return True except pymysql.MySQLError as e: # 数据库相关错误(连接失败、SQL语法错误、字段不匹配等) print(f"MySQL插入失败:{e}") print(f"详细异常信息:{traceback.format_exc()}") # 打印详细堆栈,方便排查 if conn: conn.rollback() # 插入失败回滚事务 return False except Exception as e: # 其他未知错误 print(f"插入数据时发生未知错误:{e}") print(f"详细异常信息:{traceback.format_exc()}") if conn: conn.rollback() return False finally: # 5. 无论成功/失败,都关闭游标和连接(释放资源) if cursor: cursor.close() if conn: conn.close() def insert_single_to_mysql(single_data): """ 逐条插入单条数据到MySQL数据库 :param single_data: 单条商品数据元组 :return: 插入是否成功 """ conn = None cursor = None try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() # 2. 确保表存在(兼容表未创建的情况) # cursor.execute(CREATE_TABLE_SQL) insert_sql = """ INSERT INTO ybm_drug_middle ( product, my_good_price, min_price, manufacture_date, expiry_date, shop, business_license_company, province, city, manufacturer, specification, approval_number, product_link, scrape_date, scrape_province, availability, credit_code, platform, search_key, number, is_sold_out, sales, inventory, snapshot_url, update_time, create_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ # 字段值(与SQL占位符顺序严格对应) values = ( single_data["product"], single_data["my_good_price"], single_data["min_price"], single_data["manufacture_date"], single_data["expiry_date"], single_data["shop"], single_data["business_license_company"], single_data["province"], single_data["city"], single_data["manufacturer"], single_data["specification"], single_data["approval_number"], single_data["product_link"], single_data["scrape_date"], single_data["scrape_province"], single_data["availability"], single_data["credit_code"], single_data["platform"], single_data["search_key"], single_data["number"], single_data["is_sold_out"], single_data["sales"], single_data["inventory"], single_data["snapshot_url"], single_data["update_time"], single_data["create_time"] ) cursor.execute(insert_sql, values) conn.commit() logger.info(f" 单条数据插入成功:...") # 仅打印标题前20字 return True except OperationalError as e: logger.error(f" MySQL连接失败:{str(e)}") if conn: conn.rollback() return False except ProgrammingError as e: logger.error(f" SQL语法错误:{str(e)}") if conn: conn.rollback() return False except Exception as e: logger.error(f" 单条数据插入失败:{str(e)}") if conn: conn.rollback() return False finally: # 关闭游标和连接 if cursor: cursor.close() if conn: conn.close() def clean_shop_name(raw_shop_name): """ 清洗店铺名称:移除无关前缀(如【xx截单】)、多余空格/特殊符号,提取核心店名 :param raw_shop_name: 原始采集的店铺名称字符串 :return: 清洗后的纯店铺名称 """ if not raw_shop_name: #处理空值 return '' # 步骤1:移除【】/()/[]包裹的所有内容(如【2月13日11点截单】) # 正则解释:匹配【任意字符】、(任意字符)、[任意字符],并替换为空 pattern = r'【.*?】|\(.*?\)|\[.*?\]' cleaned = re.sub(pattern, '', raw_shop_name) # 步骤2:移除首尾空格、换行符,替换中间多余空格为单个空格 cleaned = cleaned.strip().replace('\n', '').replace('\r', '') cleaned = re.sub(r'\s+', ' ', cleaned) # 步骤3:兜底处理(若清洗后为空,返回原始值避免空字符串) return cleaned if cleaned else raw_shop_name def check_dup_in_biz_db(product_link, discount_price_val, scrape_date): """直接查询业务表是否存在该商品链接+价格""" conn = None cursor = None log_context = ( f"【去重校验】商品链接:{product_link.strip()} | 价格:{discount_price_val} " f"采集日期:{scrape_date.strip()}" ) try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() sql = """ SELECT * FROM ybm_drug_middle WHERE product_link = %s AND min_price = %s AND scrape_date=%s """ # 先执行查询 cursor.execute(sql, (product_link.strip(), discount_price_val, scrape_date.strip())) # 再判断是否有结果 # 如果 fetchone() 返回元组(比如(1,))→ (1,) is not None → 结果为 True; # 如果 fetchone() 返回 None → None is not None → 结果为 False。 is_dup = cursor.fetchone() is not None if is_dup: logger.warning(f"{log_context} - 表中已存在重复记录,跳过本次采集") else: logger.info(f"{log_context} - 表中无重复记录,正常采集") return is_dup except Exception as e: logger.error(f"查询业务表去重失败:{str(e)}") return False finally: if cursor: cursor.close() if conn: conn.close() # 压缩图片函数 def compress_image(image_data, max_size=4*1024*1024): # 4MB上限 try: img = Image.open(BytesIO(image_data)) # 将RGBA模式转为RGB(兼容JPEG) if img.mode in ('RGBA', 'P'): # P是PNG的调色板模式,也需转换 # 新建白色背景的RGB图片,把透明图贴上去(避免透明区域变黑) bg_img = Image.new('RGB', img.size, (255, 255, 255)) bg_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) img = bg_img # 缩小分辨率(按比例缩到宽≤1000px) if img.width > 1000: ratio = 1000 / img.width new_size = (int(img.width*ratio), int(img.height*ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) # 降低质量(JPG)/压缩(PNG) output = BytesIO() img.save(output, format='JPEG', quality=80) # quality越小体积越小 compressed_data = output.getvalue() # 若仍超限,继续降质量 if len(compressed_data) > max_size: img.save(output, format='JPEG', quality=60) compressed_data = output.getvalue() return compressed_data except Exception as e: logger.debug(f"图片压缩失败:{e}") return image_data # 压缩失败返回原始数据 def download_image_to_base64(image_url, save_dir = "./download_images"): """下载网络图片,返回图片二进制数据(BytesIO)""" try: if not os.path.exists(save_dir): os.makedirs(save_dir) # 创建多级目录(比如a/b/c) print(f"创建本地保存目录:{save_dir}") except Exception as e: print(f"创建保存目录失败:{str(e)}") return None try: # 模拟浏览器请求头,避免被服务器拦截 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } response = requests.get(image_url, headers=headers, timeout=15) response.raise_for_status() compressed_data = compress_image(response.content) image_base64 = base64.b64encode(compressed_data).decode("utf-8") image_data = compressed_data # 步骤3:提取图片文件名(从URL中截取,避免重复) # 示例URL:https://xxx.com/123.jpg → 文件名:123.jpg file_name = image_url.split("/")[-1] # 处理特殊字符(避免文件名非法) file_name = file_name.replace("?", "").replace("&", "").replace("=", "") save_path = os.path.join(save_dir, file_name) # 完整保存路径 # 步骤4:保存图片到本地 with open(save_path, "wb") as f: f.write(image_data) print(f"图片已保存到本地:{save_path}") return image_base64 except requests.exceptions.Timeout: print(f"下载图片超时:{image_url}") return None except requests.exceptions.HTTPError as e: print(f"图片URL无效(状态码:{response.status_code}):{image_url}") return None except Exception as e: print(f"下载图片失败:{str(e)}") return None def get_ocr_res(img): try: #img地址 print(f'开始识别图片:{img}') request_url = request_url_config img_base64 = download_image_to_base64(img) if not img_base64: print("图片下载/转Base64失败,终止OCR识别") return None # 获取access_token access_token = get_access_token() if not access_token: print("获取access_token失败,无法调用OCR接口") return None params = {"image": img_base64} request_url = request_url + "?access_token=" + access_token headers = {'content-type': 'application/x-www-form-urlencoded'} response = requests.post(request_url, data=params, headers=headers) if response: res = response.json() # 检查OCR返回是否有错误 if "error_code" in res: print(f"百度OCR接口错误:{res['error_msg']}(错误码:{res['error_code']})") return None # 解析识别结果 new_dic = dict() for ite in res['words_result'].keys(): new_dic[ite] = res['words_result'][ite]['words'] print('资质数据信息', new_dic) return new_dic else: print("OCR接口返回空响应") return None except requests.exceptions.RequestException as e: print(f"网络错误(图片下载/OCR请求失败):{str(e)}") return None except KeyError as e: print(f"OCR响应格式异常,缺失字段:{str(e)}") return None except Exception as e: print(f"OCR识别未知错误:{str(e)}") return None def get_access_token(): AppKey = AppKey_config AppSrcret = AppSecret_config token_url =token_url_config url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}" payload = "" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } try: response = requests.request("POST", url, headers=headers, data=payload) response.raise_for_status() # 触发HTTP错误 return response.json()['access_token'] except Exception as e: print(f"获取access_token失败:{str(e)}") return None def extract_province_city(address): """ 从地址中提取省份和城市 :param address: 营业执照地址(如"福建省福州市马尾区") :return: (province, city) - 提取到的省份/城市,提取失败返回空字符串 """ if not address: # 地址为空,直接返回空 return "", "" # 正则1:匹配省份(兼容省/自治区/直辖市/特别行政区) province_pattern = re.compile(r'([^省]+省|.+自治区|北京市|上海市|天津市|重庆市|.+特别行政区)') province_match = province_pattern.search(address) province = province_match.group(1) if province_match else "" # 正则2:匹配城市(兼容市/自治州/地区/盟,且排除省份已匹配的部分) # 先去掉已匹配的省份,再匹配城市 address_remain = address.replace(province, "").strip() if province else address.strip() city_pattern = re.compile(r'([^市]+市|.+自治州|.+地区|.+盟|^[^\d区县镇]+)') city_match = city_pattern.search(address_remain) city = city_match.group(1).strip() if city_match else "" # 兼容直辖市(如"北京市朝阳区"→city=北京市) if province in ["北京市", "上海市", "天津市", "重庆市"]: city = province # 兼容地址不规范的情况(如"福建福州马尾区",无"省"/"市"字) if not province and not city: # 匹配前两个地名(如"福建福州"→province=福建,city=福州) simple_pattern = re.compile(r'^([^\d区县镇]+)') simple_match = simple_pattern.search(address) if simple_match: city = simple_match.group(1).strip() # 只有城市,省份留空 if city and province in city: city = city.replace(province, "").strip() return province.strip(), city.strip() #采集数据核心 def collect_data(store_page, keyword): """ 1) 先获取当前页商品个数(count) 2) 按循环次数采集;每循环15次滚动一次 slow_scroll_1200px 3) 当前页循环完 -> goto_next_page;有下一页继续;无下一页结束该关键词 """ collect_result = [] # seen = set() logger.info(f"📊 开始采集「{keyword}」的商品数据") store_page.wait_for_load_state("networkidle") #没有找到商品就跳过这个商品 page_no = 1 while True: logger.info(f"\n📄 「{keyword}」开始采集第 {page_no} 页") # 记录列表页URL(可用于你后续兜底) list_page_url = store_page.url logger.info(f"📌 已记录商品列表页URL:{list_page_url}") # ✅ 先获取当前页商品个数 store_page.wait_for_load_state("domcontentloaded") # 先等DOM加载 store_page.wait_for_load_state("networkidle") store_page.wait_for_timeout(500) # 额外等待渲染稳定 total_limit = store_page.locator(PRODUCT_ITEM_SELECTOR).count() logger.info(f"📌 「{keyword}」第{page_no}页 初始商品个数(count):{total_limit}") # 重置当前页的采集计数 collected_count = 0 # ========= 初始化无匹配计数器(记录标题不包含核心关键词的次数) ========= # no_match_count = 0 # 无匹配次数初始化为0 # MAX_NO_MATCH = 10 # 最大无匹配次数阈值 #补充没找到关键词的兜底 not_found_keywords = store_page.locator("div.filter-panel-container-empty-text") if not_found_keywords.count() > 0: logger.warning(f"⚠️ 关键词「{keyword}」无匹配商品,直接跳过整个关键词采集") return [] # 获取当前页面 # store_page = context.pages[0] # 从上下文中获取当前页面 # store_page.wait_for_load_state("networkidle") for idx in range(total_limit): detail_page = None try: item = store_page.locator(PRODUCT_ITEM_SELECTOR).nth(idx) collected_count += 1 # 实际采集计数(用于日志) # ========= 反爬随机延迟(保留你的原逻辑也行) ========= store_page.wait_for_load_state("networkidle") delay = random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f"📌 「{keyword}」第{page_no}页 第{collected_count}/{total_limit}个商品 - 等待{delay:.2f}秒后采集(反爬)") # 1. 初始化所有字段默认值 title = "无标题" price = "0.00" shop = "无店名" expiry_date = "无有效期" manufacture_date = "无生产日期" approval_number = "无批准文号" manufacturer = "未知公司" # discount_price = "0.00" spec = "未知规格" num = 1 # ✅ 默认 1 platform = '药帮忙' current_time = datetime.now().strftime("%Y-%m-%d") is_sold_out = 0 # ========= 售罄不跳过 ========= sold_locator = item.locator('div.product-status') if sold_locator.count() > 0: is_sold_out = 1 logger.warning(f" 「{keyword}」第{page_no}页 第{collected_count}个商品已售罄") # if collected_count % 5 == 0 and collected_count > 0: # logger.info("采满5个往下滑") # slow_scroll_400px(page) # page.wait_for_load_state("networkidle") # continue #提取商品ID # product_id_elem = item.locator('div.product-card[data-product-id]') # if product_id.count() > 0: # product_id = product_id_elem.get_attribute("data-product-id") # logger.info(f"✅ 提取到data-product-id:{product_id}") # 输出:5678955 # 提取商品标题(处理空值) product_locator = item.locator(PRODUCT_TITLE_SELECTOR) if product_locator.count() > 0: title = product_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页标题:{title}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品 - 列表页标题元素未找到,使用默认值:{title}") #关键词不在标题中,跳过当前商品 # core_keyword = re.sub(r'^999[\s\(\)()、·]*', '', keyword) # if core_keyword not in title: # no_match_count += 1 # logger.warning(f" 「{keyword}」第{collected_count}个商品 - 标题「{title}」不包含核心关键词「{core_keyword}」(无匹配次数:{no_match_count}/{MAX_NO_MATCH}),跳过本次循环") # continue # if no_match_count >= MAX_NO_MATCH: # logger.error(f"❌ 关键词「{keyword}」无匹配商品次数已达{MAX_NO_MATCH}次,直接终止当前关键词采集,进入下一个关键词") # return [] # 提取价格(带缺失日志) # price_locator = item.locator(PRODUCT_PRICE_SELECTOR) price_int = item.locator('//span[@class="price-int"]').text_content().strip() # 2. 提取小数部分(注意可能为空,比如价格是整数13) price_decimal_elem = item.locator('//span[@class="price-decimal"]') if price_decimal_elem.count() > 0: price_decimal = price_decimal_elem.text_content().strip() else: price_decimal = '' # 3. 拼接完整价格 full_price = f"{price_int}{price_decimal}" # 转成浮点数(便于后续计算/入库) full_price_num = float(full_price) logger.info(f"✅ 提取到价格:{full_price_num}") if full_price_num is None: logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页采购价格元素未找到,使用默认值:{price}") # if full_price_num > 0: # price = price_locator.inner_text(timeout=3000).strip() # logger.info(f"{'='*10}{keyword}」第{collected_count}个商品 - 列表页采购价格:{price}{'='*10}") # else: # price = "0.00" # 初始化默认值,避免后续报错 # logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页采购价格元素未找到,使用默认值:{price}") # 5. 提取公司名称(带缺失日志) manufacturer_locator = item.locator(PRODUCT_COMPANY_SELECTOR) if manufacturer_locator.count() > 0: manufacturer = manufacturer_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页公司名:{manufacturer}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页公司名称元素未找到,使用默认值:{manufacturer}") #提取店铺名称 shop_locator = item.locator(PRODUCT_STORE_SELECTOR) if shop_locator.count() > 0: raw_shop = shop_locator.inner_text(timeout=3000).strip() # 2. 清洗店名(核心新增步骤) shop = clean_shop_name(raw_shop) logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页店名:{shop}{'='*10}") logger.info(f"原始店名:{raw_shop}") logger.info(f"清洗后店名:{shop}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页店铺名称元素未找到,使用默认值:{shop}") # 提取折扣价 discount_price_val_origin = "" discount_price = "" discount_price_locator = item.locator('span[data-v-4cb6cc1f].discount-int').first if discount_price_locator.count() > 0: discount_price = discount_price_locator.inner_text(timeout=3000).strip() discount_price_val_origin = discount_price match = re.search(r'\d+\.?\d*', str(discount_price_val_origin)) discount_price_val = float(match.group()) if match else 0.00 logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页折扣价:{discount_price_val}{'='*10}") else: #如果没有拿原价替换 # price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "") else "0.00" discount_price_val = full_price_num logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 折扣价元素未找到,使用采购价兜底:{discount_price_val}") merged_price = f"{full_price_num}{discount_price_val_origin}" if discount_price_val_origin else full_price_num # 提取有效期(处理空值) expiry_date_locator = item.locator(f"{PRODUCT_VALIDITY_SELECTOR}") if expiry_date_locator.count() > 0: expiry_date = expiry_date_locator.inner_text(timeout=3000).strip().replace('-', '') #.replace('近效期','') logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页有效期:{expiry_date}{'='*10}") else: # 修复:替换未定义的i为collected_count logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 有效期元素未找到,使用默认值:{expiry_date}") #获取product_id # product_id = None # try: # product_id = item.get_attribute("data-product-id") # if product_id: # product_id = product_id.strip() # logger.info(f"✅ 「{keyword}」第{collected_count}个商品 - 提取到product_id:{product_id}") # ========= 模拟点击商品进入详情页 ========= logger.info( f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 模拟鼠标移动并点击" ) # 点击商品项容器,触发详情展示 # ========== 点击商品跳详情页 ========== # 反爬:模拟真人鼠标移动到商品上再点击(不是直接点击) item.hover() # 先悬停 random_delay(0.2, 0.5) # 悬停后延迟 item.dispatch_event("mousedown") random_delay(0.05, 0.15) # 鼠标按下后延迟 item.dispatch_event("mouseup") random_delay(0.05, 0.1) # 鼠标松开后延迟 try: with store_page.context.expect_page(timeout=60000) as p: item.click(delay=random.uniform(0.1, 0.3)) detail_page = p.value except PlaywrightTimeoutError: logger.warning( f" 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 未检测到新标签页,使用当前页采集详情" ) detail_page = None # 标记为无新标签页,避免关闭列表页 # 等待详情加载(优先用新标签页,无则用列表页) target_page = detail_page if detail_page else store_page target_page.wait_for_load_state("networkidle", timeout=20000) delay = random_delay(MIN_PAGE_DELAY, MAX_PAGE_DELAY) logger.info( f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 详情页加载完成,等待{delay:.2f}秒(反爬)" ) # 反爬:检测详情页反爬验证 # check_anti_crawl(page) # ========== 采集详情页的专属信息(有效期/生产日期/批准文号) ========== #获取商品详情页链接 product_link = target_page.url logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页链接:{product_link}{'='*10}") # ========= ✅ 去重逻辑,拿商品链接和折扣价和有效期和采集日期 ========= if check_dup_in_biz_db(product_link, full_price_num, current_time): logger.warning(f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过") # ========== 关闭新标签页,切回列表页 ========== if detail_page and not detail_page.is_closed(): detail_page.close() # 关闭详情页标签 logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页") # 切回原列表页(第一个标签页) store_page.bring_to_front() # 激活列表页 store_page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标 random_delay(0.5, 1.0) # 增加切换后延迟 store_page.wait_for_load_state("networkidle") random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页") if collected_count % 6 == 0 and collected_count > 0: logger.info("采满6个往下滑") slow_scroll_400px(store_page) store_page.wait_for_load_state("networkidle") continue # key = f"{product_link.strip()}|{discount_price_val}" # if key in seen: # logger.warning( # f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过" # ) # if collected_count % 5 == 0 and collected_count > 0: # logger.info("采满15个往下滑") # slow_scroll_400px(page) # page.wait_for_load_state("networkidle") # continue # seen.add(key) # 提取生产日期(修复完成) manufacture_date_locator = target_page.locator('//div[contains(@class, "spec-info-item") and .//div[contains(@class, "spec-info-item-label") and normalize-space(.)="生产日期"]]//div[contains(@class, "spec-info-item-value-text")]') if manufacture_date_locator.count() > 0: manufacture_date = manufacture_date_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页生产日期:{manufacture_date}{'='*10}") else: # 修复:替换未定义的i为collected_count logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 生产日期元素未找到,使用默认值:{manufacture_date}") # 提取批准文号 approval_number_locator = target_page.locator('//div[contains(@class, "spec-info-item") and .//div[contains(@class, "spec-info-item-label") and normalize-space(.)="批准文号"]]//div[contains(@class, "spec-info-item-value-text")]') if approval_number_locator.count() > 0: approval_number = approval_number_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页批准文号:{approval_number}{'='*10}") else: # 修复:替换未定义的i为collected_count logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 批准文号元素未找到,使用默认值:{approval_number}") #提取规格 spec_locator = target_page.locator('//div[contains(@class, "spec-info-item") and .//div[contains(@class, "spec-info-item-label") and normalize-space(.)="规格"]]//div[contains(@class, "spec-info-item-value-text")]') if spec_locator.count() > 0: spec = spec_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页规格:{spec}{'='*10}") else: # 修复:替换未定义的i为collected_count,补充规格数量不足的提示 logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 规格元素数量不足,使用默认值:{spec}") # input("...") #提取库存 storage = '' storage_locator = target_page.locator('[data-v-51f0e85d].detail-input-num-right-title') if storage_locator.count() > 0: storage = storage_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页库存:{storage}{'='*10}") else: # 修复:替换未定义的i为collected_count,补充规格数量不足的提示 logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 库存元素数量不足,使用默认值:{storage}") #提取销量 sell = '' sell_locator = target_page.locator('div.detail-info-content-item-value-price-top-right div[data-v-95163d4a]',has_text='已售') if sell_locator.count() > 0: sell = sell_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页销量:{sell}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 没有销量元素,使用默认值:{sell}") #保存快照url上传到oss try: local_path, oss_url = screenshot_target_page_to_local_then_oss( target_page=target_page, full_page=True # 截取全屏 ) print(f"最终结果:") print(f" 本地文件路径:{local_path}") logger.info(f" OSS访问链接:{oss_url}") except Exception as e: logger.warning(f"整体流程执行失败:{str(e)}") # input("...") province = "" city = "" business_license_company = "" qualification_number = '' #如果店名为商品预约中心 # if shop == '药店品种预约中心': # #https://www.ybm100.com/new-front/product-info/detail?type__1241=222029ad07-tWcfAcrWtc_CSPpP_%2FtW_cfB_ETca0SugQSbgC7gAb5RAdZyTA5UdS%3DUAoogIsKBqyWgKP_tgAPItgePrBgRPrlgQP_ug0PTZgEPrugpPA5lq%3DSQPg%3Dgt2_xg%3D2FPgs0oBgYqwcg9%3DWPTuSgTHgtBsfgGEh%3D%2FXvko2R%3DGvhceloleBnCGBqcG%2F2V_uKVUBftg # #获取pidhttps://www.ybm100.com/new-front/product-info/detail?type__1241=222029ad07-G%2FxP7PxPJgfPUgu%2FIbv7Wg6gpIgwJg5q4PfAg%2FTWZ_Q6gtHaHG%2FgWCPKsClvGsLPVsgQyuBlVVPTqgtvgQgWvG6gOPTkg5%2F_jgAvTog6vT4g5v_6gSU7vC9cggZgvPAtgZJBPgysGg_OuH%2Fg9ToPgjkBgO%2FgaCQggY7KNlo7itg%2FBGP2GrJpPV6%2FQ6f_u6qvMjPvQVIgPg # url = 'https://www.ybm100.com/new-front/product-info/detail?type__1241=222029ad07-G%2FxP7PxPJgfPUgu%2FIbv7Wg6gpIgwJg5q4PfAg%2FTWZ_Q6gtHaHG%2FgWCPKsClvGsLPVsgQyuBlVVPTqgtvgQgWvG6gOPTkg5%2F_jgAvTog6vT4g5v_6gSU7vC9cggZgvPAtgZJBPgysGg_OuH%2Fg9ToPgjkBgO%2FgaCQggY7KNlo7itg%2FBGP2GrJpPV6%2FQ6f_u6qvMjPvQVIgPg' # data = { # 'id': f'{product_id}', # 'isMainProductVirtualSupplier': 0 # } # headers = { # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0", # 'Cookie': '_abfpc=48083f46aa22e0eaefbace39874e38acc7c631ea_2.0; cna=2b5bf2a0d04d0ec45367fda825d4fa6b; xyy=MjM2JjE4MDA4NjUwMzAw; JSESSIONID=922A896126C5961D09622E042CAAA01D; xyy_token=eyJhbGciOiJIUzUxMiJ9.eyJhY2NvdW50X2lkIjoyMzYsImRldmljZV9pZCI6IiIsIm9zIjoiV2luZG93cyAxMCIsImxvZ2luX3RpbWUiOjE3NjkxNjAzNDQ5MDYsImJyb3dzZXIiOiJDaHJvbWUgMTQiLCJtZXJjaGFudF9pZCI6MjM2LCJpcF9hZGRyIjoiMTEzLjk4LjYyLjE2NiIsInZlcnNpb24iOiIiLCJsb2dpbl91c2VyX2tleSI6IjM3NzQ2ZjM5LTE3MjQtNDBjYi1hNTk4LWRlYTM5MTU2NjllNSJ9.IN8gFX6p4KuClT2KysZLNVuyQuszfdNW5gz7m_u4yq60zqbvSOg1yo0f7TuKcbZVvd-t5mVsb4hoNBRNV6nsYQ; xyy_principal=236&Y2MwY2FiZGYzZjU4NzUzNGE5OWRkZTIwYmRiMmQ4NTk2ZDg5N2QxOQ&236; xyy_last_login_time=1769160344906; acw_tc=1a0c650c17694095621061999e5d6b6730068c59854298f31bdd661882a009; qt_session=KsnsuMqE_1769409754197; ssxmod_itna=1-eq0xgDnDyAeYqDKi=G0KKG7DRDIEpDpxgGDBP01G7DuExjKidtDUDQulGmFgG4G=oG7iheet3RLKNDlpLeDZDGKQDqx0Eb0iiD4Ns3ImkiT53QQGvqUdaeOENowZaTRbY9oVG6MxfXy/UDgEeDU4GnD068CY6bDYYLDBYD74G_DDeDi2rD84D_DGpdMnudxi33nDeDzqr=xG3txYpdweDgADDB_RiDKkP=hDDlGA7YREbPAcTq6PmzxGU8lCGxUeDMFxGXmikYUQy6MK4rZCSfp1EYH1aDtqD9DgbDb42zvrTbp6ebF_mbS_83r1Ki=3iifhNQ2rt0iC0_Yiofx4lxxfxx3Be5WHiTHDDW=fd1xxq05p71UdznuzuAernD=xIxRtbj=/74anQqf5Dxx4hYb0DnOGK0D3j=bGrxnD4D; ssxmod_itna2=1-eq0xgDnDyAeYqDKi=G0KKG7DRDIEpDpxgGDBP01G7DuExjKidtDUDQulGmFgG4G=oG7iheet3RLFoDiaRAqzbCD7pxTs4GNeYfb78=o8pWc0HY8dN0vO6z5i69OeF5Dg34naHHkD98UZ3tVAb=9/L3BSLIczMds0bxfCAIfG0eY3oTQym5z/oAhmi4qDLetNaD', # 'Referer': f'https://www.ybm100.com/new/base/skuDetail?id={product_id}&combination=1&type=1', # "Content-Type" : "application/json" # } # response = requests.post(url, json=data, headers=headers) # print(response.status_code) # try: # response_json = response.json() # print("✅ 成功解析JSON响应") # if 'data' in response_json and 'detail' in response_json['data'] and 'pid' in response_json['data']['detail']: # pid = response_json['data']['detail']['pid'] # print(f"✅ 提取到pid:{pid}") # elif 'pid' in response_json: # pid = response_json['pid'] # print(f"✅ 方式二提取到pid:{pid}") # else: # # 打印响应的前1000个字符,帮助你确认JSON结构 # print("⚠️ 未找到pid字段,响应数据预览:") # print(json.dumps(response_json, ensure_ascii=False, indent=2)[:1000]) # pid = None # except json.JSONDecodeError: # # 响应不是JSON格式的情况 # print("❌ 响应不是JSON格式,无法解析") # print("响应文本:", response.text[:1000]) # pid = None # except Exception as e: # # 其他异常 # print(f"❌ 提取pid时出错:{str(e)}") # pid = None # target_page.goto(f'https://www.ybm100.com/new/base/skuDetail?id={product_id}&combination=1&type=1') # shop_name_elem = target_page.locator('span[data-v-5485589c]') # shop_name = shop_name_elem.inner_text(timeout=3000).strip() # shop_exists, shop_info = shop_is_exists_database(shop_name) # if not shop_exists: # if shop_info: # province = shop_info['province'] # city = shop_info['city'] # business_license_company = shop_info['business_license_company'] # qualification_number = shop_info['qualification_number'] # #去往药店品种预约中心后面的链接 # target_page.goto(f"https://www.ybm100.com/new/base/skuDetail?id={pid}&combination=1&type=1") # if not shop_exists: shop_exists, shop_info = shop_is_exists_database(shop) shop_page = None #店铺名不是药品预约中心且店铺名不在数据库就要点击 if shop != "药店品种预约中心" and not shop_exists: logger.info("店铺名不是药店品种预约中心且数据库没有该公司的营业执照") # 获取营业执照图片 # 进入店铺 random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) entershop_btn = target_page.locator('div[data-v-5485589c].shop-info-container-left-info') # 增强:先等待进入店铺按钮可见 entershop_btn.wait_for(state="visible", timeout=10000) entershop_btn.scroll_into_view_if_needed() # 确保按钮在视口内 entershop_btn.hover() # 先悬停 random_delay(0.2, 0.5) # 悬停后延迟 with target_page.expect_popup(timeout=15000) as pop: entershop_btn.click() random_delay(0.05, 0.15) # 鼠标按下后延迟 shop_page = pop.value shop_page.wait_for_load_state("domcontentloaded") # 比 networkidle 更 #点击店铺资质 random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) shop_license_page = shop_page.locator('//div[contains(@class, "shop-info-container-right-btns-item") and contains(span, "资质/售后")]') shop_license_page.wait_for(state="attached", timeout=15000) # 等待元素加载完成 shop_license_page.scroll_into_view_if_needed() # 确保在视口内 shop_license_page.hover() # 先悬停 random_delay(0.2, 0.5) # 悬停后延迟 # shop_license_page.dispatch_event("mousedown") shop_license_page.click() random_delay(0.05, 0.15) # 鼠标按下后延迟 # shop_license_page.dispatch_event("mouseup") random_delay(0.05, 0.1) # 鼠标松开后延迟 shop_page.wait_for_load_state("networkidle") # slow_scroll_400px(shop_page, scroll_distance1=700) #获取药品经营许可证图片 shop_page.wait_for_load_state("load") ocr_res = None # shop_license_div = target_page.locator('//span[contains(text(), "营业执照")]') shop_license_img = shop_page.locator('//span[contains(text(), "企业营业执照") or contains(text(), "营业执照(正本)")]/ancestor::div[@class="shop-info-drawer-zz-tab1-list-item"]/img').first shop_license_img.wait_for(state="visible", timeout=60000) try: if shop_license_img.count() > 0: shop_license_src = shop_license_img.get_attribute('src') shop_license_src = shop_license_src.strip() if shop_license_src else None ocr_res = get_ocr_res(shop_license_src) # print(f'ocr_res:{ocr_res}') # input(".....") else: shop_license_src = None except Exception as e: # 捕获定位/提取失败的异常,避免程序崩溃 logger.warning(f"提取营业执照图片src失败:{e}") shop_license_src = None print("营业执照图片链接:", shop_license_src) # input("..") contact_address = '' qualification_number = ocr_res.get('社会信用代码', '') if ocr_res else '' business_license_company = ocr_res.get('单位名称', '') if ocr_res else '' business_license_address = ocr_res.get('地址', '') if ocr_res else '' # scrape_date = '' # 调用提取函数,获取省份和城市 province, city = extract_province_city(business_license_address) logger.info(f"原始地址:{business_license_address}") logger.info(f"提取的省份:{province} | 城市:{city}") insert_result = insert_shop_info_to_db( shop=shop, contact_address=contact_address, qualification_number=qualification_number, business_license_company=business_license_company, business_license_address=business_license_address, scrape_date=current_time, platform=platform, province=province, city=city, create_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") , update_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) else: logger.info("数据库有该店名,在数据库拿取对应字段填充ybm_drug_middle表") if shop_info: province = shop_info['province'] city = shop_info['city'] business_license_company = shop_info['business_license_company'] qualification_number = shop_info['qualification_number'] try: if shop_page and not shop_page.is_closed(): random_delay(4,8) shop_page.close() logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭店铺页标签 shop_page") except Exception as e: logger.warning(f"⚠️ 关闭 shop_page 失败:{e}") # # purchase_price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "").isdigit() else 0.00 random_delay(5,8) # ========== 关闭新标签页,切回列表页 ========== if detail_page and not detail_page.is_closed(): detail_page.close() # 关闭详情页标签 logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页") # 切回原列表页(第一个标签页) store_page.bring_to_front() # 激活列表页 store_page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标 random_delay(0.5, 1.0) # 增加切换后延迟 store_page.wait_for_load_state("networkidle") random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页") random_delay(2,4) # credit_code = "" availability = "" # input(".....") # 组装单条数据(仅新增生产日期/批准文号字段,原有字段顺序/逻辑不变) # 构造单条数据元组(适配MySQL字段) single_data = { # 核心商品信息 "product": title, # 商品名称 "my_good_price": merged_price, # 自定义价格(可与min_price相同或单独提取) "min_price": discount_price_val, # 最低价格 "manufacture_date": manufacture_date, # 生产日期 "expiry_date": expiry_date, # 有效期 "shop": shop, # 店铺名 "business_license_company": business_license_company, # 营业执照主体(公司名称) "province": province, # 省份 "city": city, # 城市 "manufacturer": manufacturer, # 生产厂家 "specification": spec, # 规格 "approval_number": approval_number, # 批准文号 "product_link": product_link, # 商品链接 "scrape_date": current_time, # 采集日期 "scrape_province": "", # 采集省份(可留空或根据IP获取) "availability": availability, # 库存状态 "credit_code": qualification_number, # 统一信用代码(如有可补充提取) "platform": platform, # 平台名称(固定或动态获取) "search_key": keyword, # 搜索关键词 "number": num, # 数量(盒数) "is_sold_out": is_sold_out, # 售罄标记(0/1) "sales": sell, #销量 "inventory": storage, #库存 "snapshot_url": oss_url, #快照链接 "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 更新时间 "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 创建时间 } # 调用逐条插入函数 insert_single_to_mysql(single_data) collect_result.append(single_data) logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」采集完成") # input("....") except Exception as e: # 异常处理:关闭详情页,强制切回列表页 logger.exception(f" 「{keyword}」第{collected_count}个商品采集核心异常:{str(e)}") try: if detail_page and not detail_page.is_closed(): detail_page.close() logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 异常时关闭详情页标签页") if store_page and not store_page.is_closed(): store_page.bring_to_front() # 切回列表页 store_page.wait_for_load_state("networkidle") random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) except Exception as e2: logger.error(f" 「{keyword}」第{collected_count}个商品详情采集异常(处理时):{str(e2)},原异常:{str(e)}") continue # ✅ 每15次滚动一次(修复:用collected_count,且排除0的情况) if collected_count % 6 == 0 and collected_count > 0 and collected_count != total_limit: logger.info("采满5个往下滑") slow_scroll_400px(store_page,) store_page.wait_for_load_state("networkidle") # ====== 当前页采集完毕,尝试翻页 ====== delay = random_delay(1.5, 3.0) logger.info(f"⏳ 翻页前随机等待 {delay:.2f}s(反爬)") if goto_next_page(store_page): logger.info(f"「{keyword}」还有下一页") page_no += 1 continue else: logger.info(f" 「{keyword}」已无下一页,关键词采集结束") break # 关键词采集完成后长延迟 long_delay = random_delay(MIN_KEYWORD_DELAY, MAX_KEYWORD_DELAY) logger.info(f" 「{keyword}」采集完成,共{len(collect_result)}条数据,等待{long_delay:.2f}秒后继续下一个关键词(反爬)") return collect_result # ==================== 保存到CSV函数(适配新表头) ==================== # def save_to_csv(data_list): # """ # 保存数据到CSV(适配新表头) # :param data_list: list - 采集到的字典数据列表 # :return: bool - 保存是否成功 # """ # if not data_list: # logger.warning(" 无数据可保存到CSV") # return False # try: # # 判断文件是否存在,不存在则写入表头 # file_exists = os.path.exists(CSV_FILE_PATH) # # 打开CSV文件(追加模式,utf-8-sig避免Excel乱码) # with open(CSV_FILE_PATH, "a", newline="", encoding="utf-8-sig") as f: # # 用新表头作为字段名 # writer = csv.DictWriter(f, fieldnames=CSV_HEADERS) # # 首次写入表头 # if not file_exists: # writer.writeheader() # logger.info(f" 已创建CSV文件并写入新表头:{CSV_FILE_PATH}") # # 写入数据行 # writer.writerows(data_list) # logger.info(f" 成功将 {len(data_list)} 条数据写入CSV") # return True # except Exception as e: # logger.error(f" 保存CSV失败:{str(e)}") # return False # ==================== 主函数(登录+批量搜索) ==================== def main(): logger.info("\n" + "="*50) logger.info("🚀 药帮忙采集程序启动") logger.info(f"⏰ 启动时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("="*50) # 待搜索的关键词列表(直接写在这里,改起来更直观) # 存储所有关键词的采集数据 # all_collect_data = [] with sync_playwright() as p: # browser = init_browser_with_proxy(p) # 启动浏览器(用单个配置变量) browser = p.chromium.launch( headless=False, # 不要用无头模式(反爬:无头模式易被识别) channel="chrome", # 使用真实Chrome内核 slow_mo=random.randint(100, 300), # 全局操作延迟(模拟真人慢速操作) args=[ "--disable-blink-features=AutomationControlled", # 禁用webdriver特征(核心!) "--enable-automation=false", # 新增:禁用自动化标识 "--disable-infobars", # 新增:禁用信息栏 "--remote-debugging-port=0", # 新增:随机调试端口 "--start-maximized", # 最大化窗口(模拟真人使用) "--disable-extensions", # 禁用扩展(避免特征) "--disable-plugins-discovery", # 禁用插件发现 "--no-sandbox", # 避免沙箱模式特征 "--disable-dev-shm-usage", # 避免内存限制导致的异常 f"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(110, 120)}.0.0.0 Safari/537.36" # 随机Chrome版本的UA ] ) # 创建页面时伪装指纹 context = browser.new_context( locale="zh-CN", # 中文环境 timezone_id="Asia/Shanghai", # 上海时区 geolocation={"latitude": 31.230416, "longitude": 121.473701}, # 模拟上海地理位置(可选) permissions=["geolocation"], # 授予定位权限(模拟真人) user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", viewport={"width": 1800, "height": 1000}, # 关键:隐藏自动化特征 java_script_enabled=True, bypass_csp=True, # user_data_dir="./temp_user_data" # 模拟真实用户数据目录 ) page = context.new_page() # 关键:移除navigator.webdriver标识(反爬核心) page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3] }); // 新增:模拟插件 Object.defineProperty(navigator, 'mimeTypes', { get: () => [1, 2, 3] }); // 新增:模拟MIME类型 window.chrome = { runtime: {}, loadTimes: () => ({}) }; // 增强Chrome模拟 delete window.navigator.languages; window.navigator.languages = ['zh-CN', 'zh']; // 新增:模拟真实鼠标移动特征 (() => { const originalAddEventListener = EventTarget.prototype.addEventListener; EventTarget.prototype.addEventListener = function(type, listener) { if (type === 'mousemove') { return originalAddEventListener.call(this, type, (e) => { e._automation = undefined; listener(e); }); } return originalAddEventListener.call(this, type, listener); }; })(); """) try: # ========== 核心:Cookie复用逻辑 ========== # 1. 加载本地Cookie load_cookies(context) # 2. 验证登录状态 if not is_login(page): # 3. Cookie失效/不存在,执行登录 page.goto(TARGET_LOGIN_URL) page.wait_for_load_state("networkidle") logger.info("🔑 开始执行登录流程") # 执行登录操作 login_success = login_operation(page, USERNAME, PASSWORD) if not login_success: logger.error(" 登录失败,程序终止") return # 4. 登录成功后保存Cookie save_cookies(context) logger.info(" 登录并保存Cookie成功!") # 初始化变量:保存首次搜索的新页面对象 store_page = None #统计搜索次数 nums = 0 # 2. 批量搜索+采集+保存 for keyword_idx, keyword in enumerate(SEARCH_KEYWORDS, 1): logger.info(f"\n=====================================") logger.info(f"🔍 开始处理第{keyword_idx}/{len(SEARCH_KEYWORDS)}个关键词:{keyword}") logger.info(f"=====================================") # 执行搜索 popup_guard(page, "before_search") if nums == 0: popup_guard(store_page if store_page else page, "before_search") # page是你的初始页面对象,需提前定义 store_page, search_success = search_operation(page, keyword, is_first_search=True) nums += 1 else: if store_page is None: logger.error(f"{get_current_time()} ❌ 无可用的搜索页面,跳过「{keyword}」") continue popup_guard(store_page, "before_search") store_page, search_success = search_operation(store_page, keyword, is_first_search=False) # input("") popup_guard(store_page, "after_search") # store_page = detail_page if store_page is None: break if not search_success: logger.warning(f" 「{keyword}」搜索失败,跳过采集") continue # ✅ 再等页面稳定一下(networkidle 有时会等不到,建议加超时或换成 domcontentloaded) store_page.wait_for_load_state("domcontentloaded") store_page.wait_for_load_state('networkidle') # 采集数据 data_list = collect_data(store_page, keyword) # # 保存到CSV # if data_list: # save_to_csv(data_list) # else: # logger.warning(f" 「{keyword}」无数据,跳过保存") logger.info("\n🎉 所有关键词处理完成!CSV文件路径:" + os.path.abspath(CSV_FILE_PATH)) # input("\n按回车关闭程序...") except Exception as e: logger.error(f" 程序异常:{str(e)}") finally: browser.close() logger.info(" 浏览器已关闭,程序结束") # ==================== 程序入口 ==================== if __name__ == '__main__': main()