from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError import os import json import random from logger_config import logger from config import * import re import pymysql COOKIE_FILE_PATH = "ybm_cookies.json" # Cookie保存路径 LOGIN_VALIDATE_URL = "https://www.ybm100.com/new/" TARGET_LOGIN_URL = "https://www.ybm100.com/new/login" def load_cookies(context, cookie_path=COOKIE_FILE_PATH): """从本地JSON文件加载Cookie到浏览器上下文""" if not os.path.exists(cookie_path): # logger.warning(f" Cookie文件不存在:{cookie_path}") return False try: with open(cookie_path, "r", encoding="utf-8") as f: cookies = json.load(f) context.add_cookies(cookies) # logger.info(f"✅ 已从{cookie_path}加载Cookie") return True except Exception as e: # logger.error(f" 加载Cookie失败:{e}") return False def is_login(page): """验证是否已登录(核心:检测登录态)""" try: # 访问需要登录的页面 page.goto(LOGIN_VALIDATE_URL, timeout=5000) page.wait_for_load_state("networkidle") # 检测是否跳转到登录页(URL包含login则未登录) if "login" in page.url.lower(): # logger.warning(" Cookie失效,需要重新登录") return False # 可选:检测登录后的专属元素(比如用户名、个人中心等) # if page.locator("用户中心选择器").count() > 0: # return True # logger.info(" Cookie有效,已保持登录状态") return True except Exception as e: # logger.error(f" 验证登录状态失败:{e}") return False def popup_guard(page, tag=""): """ 全局弹窗/遮罩守卫:多步引导 + 关闭按钮 + 遮罩清理 + 恢复滚动 tag 仅用于日志区分调用位置 """ try: # 给弹窗一点出现时间 page.wait_for_timeout(300) # 1) 连续点“下一步/完成/我知道了/关闭” for _ in range(6): btn = page.locator( "xpath=//button[normalize-space()='下一步' or normalize-space()='完成' or normalize-space()='我知道了' or normalize-space()='关闭']" ).first if btn.count() > 0 and btn.is_visible(): btn.click(timeout=1500) page.wait_for_timeout(250) continue # 2) 常见的 close icon close_btn = page.locator( "css=.el-dialog__headerbtn, .el-message-box__headerbtn, .close, .icon-close, .el-icon-close" ).first if close_btn.count() > 0 and close_btn.is_visible(): close_btn.click(timeout=1200) page.wait_for_timeout(250) continue break # 3) 清遮罩 + 恢复滚动/交互 page.evaluate(r""" () => { // 第一步:精准清理已知的遮罩/弹窗类名(Element UI框架常用) const selectors = [ '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper', '.el-message-box__wrapper', '.el-loading-mask' ]; selectors.forEach(sel => document.querySelectorAll(sel).forEach(e => e.remove())); // 泛化兜底:近似全屏 + 高 z-index 的层直接屏蔽 const all = Array.from(document.querySelectorAll('body *')); for (const el of all) { const s = getComputedStyle(el); // 获取元素的实际样式(含CSS生效的样式) const z = parseInt(s.zIndex || '0', 10); // 取元素的层级(z-index),默认0 // 条件1:元素是固定/绝对定位(弹窗/遮罩常见定位方式)+ 层级≥1000(高优先级遮挡)+ 能拦截鼠标事件 if ((s.position === 'fixed' || s.position === 'absolute') && z >= 1000 && s.pointerEvents !== 'none') { const r = el.getBoundingClientRect(); // 获取元素的尺寸和位置 // 条件2:元素宽度/高度≥屏幕80%(近似全屏遮罩) const nearFull = r.width >= innerWidth * 0.8 && r.height >= innerHeight * 0.8; if (nearFull) { el.style.pointerEvents = 'none'; // 让元素不拦截鼠标点击 el.style.display = 'none'; // 隐藏元素 } } } // 第三步:恢复页面滚动功能(弹窗常把页面设为不可滚动) document.documentElement.style.overflow = 'auto'; // html标签恢复滚动 document.body.style.overflow = 'auto'; // body标签恢复滚动 document.body.classList.remove('el-popup-parent--hidden'); // 移除Element UI的滚动禁用类 } """) # logger.info("杀除弹窗成功") except Exception: pass SEARCH_INPUT_SELECTOR = "input[placeholder*='药品名称/厂家名称']" def pick_search_input(page): """优先选可见且可用的搜索输入框;第一个不行就尝试第二个""" inputs = page.locator(SEARCH_INPUT_SELECTOR) cnt = inputs.count() # 优先检查前两个(你说只有两个) for i in range(min(cnt, 2)): candidate = inputs.nth(i) try: candidate.wait_for(state="visible", timeout=1500) # 小超时快速试探 if candidate.is_enabled(): return candidate except PlaywrightTimeoutError: continue # 兜底:直接找任意可见的(避免命中 hidden 模板) candidate = page.locator(f"{SEARCH_INPUT_SELECTOR}:visible").first candidate.wait_for(state="visible", timeout=5000) return candidate def type_slow(locator, text: str, min_delay=0.06, max_delay=0.18): """逐字输入,模拟真人打字""" for ch in text: locator.type(ch, delay=int(random.uniform(min_delay, max_delay) * 1000)) SEARCH_BTN_SELECTOR = 'div.home-search-container-search-head-btn[data-scmd="text-搜索"]' def force_close_popup(page): """关闭新手引导/遮罩(多步:下一步/完成/我知道了),并兜底移除遮罩层""" try: # 1) 尝试连续点“下一步/完成/我知道了/关闭” for _ in range(5): # 最多点5次,足够覆盖多步引导 btn = page.locator( "//button[normalize-space()='下一步' or normalize-space()='完成' or normalize-space()='我知道了' or normalize-space()='关闭']" ).first if btn.count() > 0 and btn.is_visible(): btn.click(timeout=1500) page.wait_for_timeout(300) continue # 有些引导是右上角 X(如果存在就点) close_icon = page.locator( "xpath=//*[contains(@class,'close') or contains(@class,'el-icon-close') or name()='svg' or name()='i'][1]" ).first if close_icon.count() > 0 and close_icon.is_visible(): close_icon.click(timeout=1000) page.wait_for_timeout(300) continue break # 2) 兜底:移除常见遮罩层(element-ui / 通用 mask/overlay) page.evaluate(""" const selectors = [ '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper', '[class*="mask"]', '[class*="overlay"]', '[style*="z-index"]' ]; for (const sel of selectors) { document.querySelectorAll(sel).forEach(el => { const s = window.getComputedStyle(el); // 只移除“覆盖层”倾向的元素:fixed/absolute 且 z-index 很高 if ((s.position === 'fixed' || s.position === 'absolute') && parseInt(s.zIndex || '0', 10) >= 1000) { el.remove(); } }); } """) except Exception: pass def kill_masks(page): """ 强制清理残留遮罩层/覆盖层,并恢复 body 可滚动、可点击状态 """ page.evaluate(r""" () => { const removed = []; const hidden = []; // 1) 先处理已知常见遮罩 const knownSelectors = [ '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper', '.el-message-box__wrapper', '.el-loading-mask', '.el-popup-parent--hidden' ]; for (const sel of knownSelectors) { document.querySelectorAll(sel).forEach(el => { // v-modal / overlay 直接 remove 最省事 removed.push(sel); el.remove(); }); } // 2) 再做一次“泛化兜底”:全屏 fixed/absolute + 高 z-index 的覆盖层 // 注意:不要误删页面正常的固定导航,所以加上“近似全屏”的判断 const all = Array.from(document.querySelectorAll('body *')); for (const el of all) { const s = window.getComputedStyle(el); if (!s) continue; const z = parseInt(s.zIndex || '0', 10); const pos = s.position; const pe = s.pointerEvents; if ((pos === 'fixed' || pos === 'absolute') && z >= 1000 && pe !== 'none') { const r = el.getBoundingClientRect(); const nearFullScreen = r.width >= window.innerWidth * 0.8 && r.height >= window.innerHeight * 0.8 && r.left <= window.innerWidth * 0.1 && r.top <= window.innerHeight * 0.1; // 常见遮罩是半透明背景色,或者透明但拦截点击 const bg = s.backgroundColor || ''; const looksLikeMask = nearFullScreen && (bg.includes('rgba') || bg.includes('rgb') || s.opacity !== '1'); if (nearFullScreen) { // 不管透明不透明,只要近似全屏且高 z-index,就先让它不拦截点击 el.style.pointerEvents = 'none'; el.style.display = 'none'; hidden.push(el.tagName + '.' + (el.className || '')); } } } // 3) 恢复 body / html 的滚动与交互(很多弹窗会锁滚动) document.documentElement.style.overflow = 'auto'; document.body.style.overflow = 'auto'; document.body.style.position = 'static'; document.body.style.width = 'auto'; document.body.style.paddingRight = '0px'; // 4) 去掉 Element-UI 常见的锁定 class document.body.classList.remove('el-popup-parent--hidden'); return { removed, hiddenCount: hidden.length, hidden }; } """) # ==================== 搜索操作函数 ==================== def search_operation(page, keyword): """搜索框填充+提交搜索""" try: # 1) 找到“可用”的搜索框(第一个不行就用第二个) search_locator = pick_search_input(page) # 清空并填充搜索框 search_locator.wait_for(timeout=5000) # 2. 清空搜索框(双重保障:先调用locator的clear,再手动全选删除) search_locator.click() # 聚焦 search_locator.fill("") page.keyboard.down("Control") # 按住Control键 page.keyboard.press("a") # 按a键 page.keyboard.up("Control") # 松开Control键 page.keyboard.press("Backspace") # 删除选中内容 # 3) 逐字输入 type_slow(search_locator, keyword, min_delay=0.25, max_delay=0.50) # 3. 输入搜索关键词 # search_locator.fill(keyword) logger.info(f"📝 已输入搜索关键词:{keyword}") # 3) 搜索按钮也建议点可见的那个 btn = page.locator(f"{SEARCH_BTN_SELECTOR}:visible").first btn.wait_for(state="visible", timeout=5000) # btn.click() page.wait_for_timeout(600) #获取新页面对象 try: # 先开始监听新页面事件(在点击前) with page.context.expect_page(timeout=60000) as new_page_info: # 再执行点击操作 btn.click() # 点击后获取新页面 detail_page = new_page_info.value detail_page.wait_for_load_state("networkidle", timeout=20000) except PlaywrightTimeoutError: logger.warning(f"未检测到新标签页") return False # force_close_popup(page) # kill_masks(page) logger.info("✅ 已触发搜索") detail_page.wait_for_load_state("networkidle", timeout=20000) test_btn = detail_page.locator("div[data-v-c65c36bc].first-time-highlight-message-btn button") btn_count = test_btn.count() logger.info(f"✅ 匹配到的元素数量:{btn_count}") test_btn.wait_for(state="attached", timeout=5000) test_btn.click() input("....") return detail_page, True # 搜索后等待结果加载 # page.wait_for_timeout(COLLECT_DELAY) # return True except PlaywrightTimeoutError as e: logger.error(f" 搜索失败:元素定位超时 - {str(e)}") return None, False # 失败时返回 (None, False) except Exception as e: logger.error(f" 搜索异常:{str(e)}") return None, False # 失败时返回 (None, False) def goto_next_page(page) -> bool: """ 尝试翻到下一页;成功返回True,没下一页/翻页失败返回False 适配常见 ElementUI: .el-pagination .btn-next / .el-pagination__next """ # 多写几个候选,哪个能用就用哪个 candidates = [ ".el-pagination button.btn-next:not(.is-disabled)", ".el-pagination__next:not(.is-disabled)", "button:has-text('下一页'):not([disabled])", "a:has-text('下一页')", ] next_btn = None for sel in candidates: loc = page.locator(sel).first if loc.count() > 0: next_btn = loc break if not next_btn: return False # 用“当前页第一个商品标题”做翻页完成的判据(比只等networkidle更稳) first_title = page.locator(PRODUCT_TITLE_SELECTOR).first before = "" try: if first_title.count() > 0: before = first_title.inner_text(timeout=2000).strip() except: pass try: page.evaluate("window.scrollTo(0, 0);") next_btn.click(timeout=5000) page.wait_for_load_state("networkidle") # 等列表发生变化(标题变了 / 或者至少第一个标题重新出现) if before: page.wait_for_function( """(sel, oldText) => { const el = document.querySelector(sel); return el && el.innerText && el.innerText.trim() !== oldText; }""", arg=(PRODUCT_TITLE_SELECTOR, before), timeout=5000 ) else: first_title.wait_for(timeout=1000) return True except Exception as e: logger.warning(f" 翻页失败:{e}") return False PRODUCT_ITEM_SELECTOR = "div.product-list-item" def collect_data(page, keyword): collect_result = [] collected_count = 0 # ✅ 初始化计数变量 logger.info(f"📊 开始采集「{keyword}」的商品数据") page.wait_for_load_state("networkidle") page_no = 1 while True: logger.info(f"\n📄 「{keyword}」开始采集第 {page_no} 页") # ✅ 先获取当前页商品个数 page.wait_for_load_state("networkidle") total_limit = page.locator(PRODUCT_ITEM_SELECTOR).count() logger.info(f"📌 「{keyword}」第{page_no}页 初始商品个数(count):{total_limit}") for idx in range(total_limit): detail_page = None # total_limit += 1 try: item = page.locator(PRODUCT_ITEM_SELECTOR).nth(idx) collected_count += 1 # 实际采集计数(用于日志) # ========= 反爬随机延迟(保留你的原逻辑也行) ========= page.wait_for_load_state("networkidle") # delay = random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f"📌 「{keyword}」第{page_no}页 第{collected_count}/{total_limit}个商品") #获取product_id product_id = None #这里还得改 child_item = item.locator("> [data-product-id]") product_id = child_item.get_attribute("data-product-id") if product_id: product_id = product_id.strip() logger.info(f"✅ 「{keyword}」第{collected_count}个商品 - 提取到product_id:{product_id}") else: logger.warning(f"没提取到{product_id}") product_url = f"https://www.ybm100.com/new/base/skuDetail?id={product_id}" print(product_url) db_match_result = fuzzy_match_product_url_in_db_mysql(product_url) if db_match_result: logger.info(f"✅ 「{keyword}」第{collected_count}个商品 - MySQL 匹配到URL,直接返回结果:{db_match_result}") print(db_match_result) else: # 4. 匹配不存在:准备执行后续点击提取流程 logger.info(f"ℹ️ 「{keyword}」第{collected_count}个商品 - MySQL 未匹配到URL,执行点击提取") except Exception as e: logger.info("该商品链接没有在数据库,进行点击提取。") continue # ====== 当前页采集完毕,尝试翻页 ====== delay = page.wait_for_timeout(5000) logger.info(f"⏳ 翻页前随机等待 {delay:.2f}s(反爬)") if goto_next_page(page): page_no += 1 continue else: logger.info(f" 「{keyword}」已无下一页,关键词采集结束") break def main(): with sync_playwright() as p: browser = p.chromium.launch( headless=False, # 不要用无头模式(反爬:无头模式易被识别) channel="chrome", # 使用真实Chrome内核 slow_mo=random.randint(100, 300), # 全局操作延迟(模拟真人慢速操作) args=[ "--disable-blink-features=AutomationControlled", # 禁用webdriver特征(核心!) "--enable-automation=false", # 新增:禁用自动化标识 "--disable-infobars", # 新增:禁用信息栏 "--remote-debugging-port=0", # 新增:随机调试端口 "--start-maximized", # 最大化窗口(模拟真人使用) "--disable-extensions", # 禁用扩展(避免特征) "--disable-plugins-discovery", # 禁用插件发现 "--no-sandbox", # 避免沙箱模式特征 "--disable-dev-shm-usage", # 避免内存限制导致的异常 f"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(110, 120)}.0.0.0 Safari/537.36" # 随机Chrome版本的UA ] ) # 创建页面时伪装指纹 context = browser.new_context( locale="zh-CN", # 中文环境 timezone_id="Asia/Shanghai", # 上海时区 geolocation={"latitude": 31.230416, "longitude": 121.473701}, # 模拟上海地理位置(可选) permissions=["geolocation"], # 授予定位权限(模拟真人) user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", viewport={"width": 1600, "height": 1400}, # 关键:隐藏自动化特征 java_script_enabled=True, bypass_csp=True, # user_data_dir="./temp_user_data" # 模拟真实用户数据目录 ) page = context.new_page() # 关键:移除navigator.webdriver标识(反爬核心) page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3] }); // 新增:模拟插件 Object.defineProperty(navigator, 'mimeTypes', { get: () => [1, 2, 3] }); // 新增:模拟MIME类型 window.chrome = { runtime: {}, loadTimes: () => ({}) }; // 增强Chrome模拟 delete window.navigator.languages; window.navigator.languages = ['zh-CN', 'zh']; // 新增:模拟真实鼠标移动特征 (() => { const originalAddEventListener = EventTarget.prototype.addEventListener; EventTarget.prototype.addEventListener = function(type, listener) { if (type === 'mousemove') { return originalAddEventListener.call(this, type, (e) => { e._automation = undefined; listener(e); }); } return originalAddEventListener.call(this, type, listener); }; })(); """) try: # ========== 核心:Cookie复用逻辑 ========== # 1. 加载本地Cookie load_cookies(context) # 2. 验证登录状态 if not is_login(page): # 3. Cookie失效/不存在,执行登录 page.goto(TARGET_LOGIN_URL) page.wait_for_load_state("networkidle") # logger.info("🔑 开始执行登录流程") # 执行登录操作 # login_success = login_operation(page, USERNAME, PASSWORD) # if not login_success: # logger.error(" 登录失败,程序终止") # return # # 4. 登录成功后保存Cookie # save_cookies(context) # logger.info(" 登录并保存Cookie成功!") KEYWORDS = get_search_keywords_from_db() # get_search_keywords_from_db() # 执行搜索 total_num = 0 for kw in KEYWORDS: popup_guard(page, "before_search") detail_page, search_success = search_operation(page, kw) if not search_success: print(f"❌ 搜索失败:{kw}") continue popup_guard(page, "after_search") detail_page.wait_for_load_state('networkidle') data_list = collect_data(detail_page, kw) #找不到数据跳过判断和出现杂数据跳过 # not_found_keywords = page.locator("span:has-text('新品登记')") # if not_found_keywords.count() > 0: # logger.warning(f"⚠️ 关键词「{kw}」无匹配商品,直接跳过整个关键词采集") # continue # TARGET_SELECTOR = page.locator('div[data-v-4c22c8c9].sr-page_turner-pagination-total') # total_count = 0 # ⚠️ 每一轮关键词都重置 # if TARGET_SELECTOR.count() > 0: # nums = TARGET_SELECTOR.inner_text(timeout=5000).strip() # print(nums) # match = re.search(r'\d+', nums) # if match: # total_count = int(match.group()) # print(total_count) # else: # itme_boxes = page.locator("div[data-v-4c22c8c9].sr-list-item[data-item_loc]") # total_count = itme_boxes.count() # print(f"【{kw}】无分页,当前页盒子数:{total_count}") # total_num += total_count # print(f"截止到这个{kw}关键词有{total_num}条数据") # page.wait_for_timeout(10000) # print(f"✅ 本次采集总数据量:{total_num}") except Exception as e: print(f" 程序异常:{str(e)}") finally: browser.close() print(" 浏览器已关闭,程序结束") # ==================== 程序入口 ==================== if __name__ == '__main__': main()