| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987 |
- from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
- from logger_config import logger
- from datetime import datetime
- import random
- import csv
- import os
- import time
- import json
- import pymysql
- from pymysql.err import OperationalError, ProgrammingError, DataError
- from config import *
- import re
- import uuid
- import requests
- import base64
- from io import BytesIO
- from PIL import Image
- import traceback
- # ===================== 工具函数:获取当前时间字符串 =====================
- def get_current_time():
- """统一日志时间格式"""
- return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 代理IP池
- PROXY_POOL_URL =""
- PROXY_VALIDATION_URL = "" # 用于验证代理有效性的URL
- PROXY_TIMEOUT = 10 # 代理验证超时时间(秒)
- def get_random_proxy():
- """从代理池获取随机代理IP"""
- try:
- response = requests.get(PROXY_POOL_URL, timeout=10)
- if response.status_code == 200:
- proxy = response.text.strip()
- if validate_proxy(proxy):
- logger.info(f"获取到有效代理: {proxy}")
- return proxy
- logger.warning(f"代理无效: {proxy}")
- except Exception as e:
- logger.error(f"获取代理失败: {str(e)}")
- return None
- def validate_proxy(proxy):
- """验证代理IP有效性"""
- try:
- proxies = {
- "http": f"http://{proxy}",
- "https": f"https://{proxy}"
- }
- response = requests.get(
- PROXY_VALIDATION_URL,
- proxies=proxies,
- timeout=PROXY_TIMEOUT
- )
- return response.status_code == 200
- except:
- return False
- def init_browser_with_proxy(playwright):
- proxy = get_random_proxy()
- proxy_config = None
- if proxy:
- proxy_server, proxy_port = proxy.split(":")
- proxy_config = {
- "server": f"http://{proxy_server}:{proxy_port}",
- # "username": "your_proxy_username",
- # "password": "your_proxy_password"
- }
- logger.info(f"使用代理: {proxy_server}:{proxy_port}")
- else:
- logger.warning("未获取到有效代理,将使用本地IP")
- # 启动浏览器(保留原有反爬配置)
- return playwright.chromium.launch(
- headless=False, # 非无头模式
- channel="chrome", # 使用Chrome内核
- slow_mo=random.randint(100, 300), # 随机操作延迟
- proxy=proxy_config, # 代理配置(None则不使用代理)
- args=[
- "--disable-blink-features=AutomationControlled", # 核心反检测
- "--enable-automation=false",
- "--disable-infobars",
- "--remote-debugging-port=0",
- "--start-maximized",
- "--disable-extensions",
- "--disable-plugins-discovery",
- "--no-sandbox",
- "--disable-dev-shm-usage",
- # 随机Chrome版本UA
- f"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(110, 120)}.0.0.0 Safari/537.36"
- ]
- )
- # ==================== 2. 反爬工具函数 ====================
- def random_delay(min_seconds, max_seconds):
- """生成随机延迟(核心反爬:避免固定间隔)"""
- delay = random.uniform(min_seconds, max_seconds)
- time.sleep(delay)
- return delay
- def simulate_human_typing(page, locator, text):
- """模拟真人打字(逐个字符输入,带随机间隔)"""
- try:
- locator.click()
- locator.clear()
- for char in text:
- locator.type(char, delay=random.uniform(MIN_INPUT_DELAY, MAX_INPUT_DELAY))
- random_delay(0.05, 0.1) # 字符间额外小延迟
- logger.info(f" 模拟真人输入完成:{text}")
- except Exception as e:
- logger.error(f"模拟打字失败:{e}")
- locator.fill(text) # 兜底:直接填充
- def save_cookies(context, cookie_path=COOKIE_FILE_PATH):
- """保存Cookie到本地JSON文件"""
- try:
- cookies = context.cookies()
- with open(cookie_path, "w", encoding="utf-8") as f:
- json.dump(cookies, f, ensure_ascii=False, indent=2)
- logger.info(f"Cookie已保存到:{cookie_path}")
- return True
- except Exception as e:
- logger.error(f" 保存Cookie失败:{e}")
- return False
- def load_cookies(context, cookie_path=COOKIE_FILE_PATH):
- """从本地JSON文件加载Cookie到浏览器上下文"""
- if not os.path.exists(cookie_path):
- logger.warning(f" Cookie文件不存在:{cookie_path}")
- return False
- try:
- with open(cookie_path, "r", encoding="utf-8") as f:
- cookies = json.load(f)
- context.add_cookies(cookies)
- logger.info(f"✅ 已从{cookie_path}加载Cookie")
- return True
- except Exception as e:
- logger.error(f" 加载Cookie失败:{e}")
- return False
- def is_login(page):
- """验证是否已登录(核心:检测登录态)"""
- try:
- # 访问需要登录的页面
- page.goto(LOGIN_VALIDATE_URL, timeout=300000)
- page.wait_for_load_state("networkidle")
- # 检测是否跳转到登录页(URL包含login则未登录)
- if "login" in page.url.lower():
- logger.warning(" Cookie失效,需要重新登录")
- return False
- # 可选:检测登录后的专属元素(比如用户名、个人中心等)
- # if page.locator("用户中心选择器").count() > 0:
- # return True
- logger.info(" Cookie有效,已保持登录状态")
- return True
- except Exception as e:
- logger.error(f" 验证登录状态失败:{e}")
- return False
- # ==================== 滚动函数重构(核心修改) ====================
- def slow_scroll_400px(page,scroll_distance1=400):
- """
- 慢速滚动400px±50px(模拟真人滑动)
- :param page: 页面对象
- :return: 滚动是否成功
- """
- try:
- # 生成400±50px的随机滚动距离
- scroll_distance = random.randint(
- scroll_distance1 - SCROLL_OFFSET_RANGE,
- scroll_distance1 + SCROLL_OFFSET_RANGE
- )
- remaining_distance = scroll_distance
- total_steps = int(scroll_distance / SCROLL_STEP)
- logger.info(
- f"📜 开始慢速滚动(目标距离:{scroll_distance}px,总步数:{total_steps},总时长约{total_steps*SCROLL_INTERVAL:.2f}秒)"
- )
- # 渐进式滚动(每步50px,间隔0.05秒)
- for _ in range(total_steps):
- step = min(SCROLL_STEP, remaining_distance)
- page.evaluate(f"window.scrollBy(0, {step});")
- remaining_distance -= step
- time.sleep(SCROLL_INTERVAL)
- # 处理剩余不足一步的距离
- if remaining_distance > 0:
- page.evaluate(f"window.scrollBy(0, {remaining_distance});")
- time.sleep(SCROLL_INTERVAL)
- # 滚动后等待懒加载完成
- page.wait_for_load_state("networkidle", timeout=8000)
- random_delay(2.0, 3.0) # 滚动后额外停顿,模拟真人
- logger.info(f" 慢速滚动完成,实际滚动距离:{scroll_distance - remaining_distance}px")
- return True
- except Exception as e:
- logger.warning(f" 慢速滚动失败:{e}")
- return False
- # def check_anti_crawl(page):
- # """检测反爬弹窗/验证码(核心:提前识别反爬)"""
- # anti_crawl_selectors = [
- # "//div[contains(text(), '验证')]",
- # "//div[contains(text(), '人机验证')]",
- # "//div[contains(text(), '访问过于频繁')]",
- # "//button[contains(text(), '验证')]"
- # ]
- # for selector in anti_crawl_selectors:
- # if page.locator(selector).count() > 0:
- # logger.error("❌ 检测到反爬验证弹窗!请手动完成验证后按回车继续...")
- # input() # 暂停等待手动验证
- # return True
- # return False
- # CSV配置
- CSV_FILE_PATH = f"ybm_collect_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" # CSV保存路径
- CSV_HEADERS = [
- "商品标题", "商品采购价格", "商品折扣价格", "规格", "盒数",
- "店铺名称", "公司名称",
- "有效日期", "生产日期", "批准文号", "采集时间"
- ] #表头
- # ==================== 登录函数 ====================
- def login_operation(page, username, password):
- """登录操作函数"""
- try:
- # 输入手机号(直接用单个变量)
- page.wait_for_selector(USERNAME_SELECTOR, timeout=ELEMENT_TIMEOUT, state="visible")
- page.wait_for_timeout(timeout=3000)
- page.fill(USERNAME_SELECTOR, username)
- logger.info(" 已输入登录账号")
- # 输入密码
- page.wait_for_selector(PASSWORD_SELECTOR, timeout=ELEMENT_TIMEOUT, state="visible")
- page.wait_for_timeout(timeout=3000)
- page.fill(PASSWORD_SELECTOR, password)
- logger.info(" 已输入登录密码")
- random_delay(1, 2)
- agree_btn = page.locator('span.el-checkbox__inner')
- agree_btn.click()
- # 点击登录按钮
- page.wait_for_selector(LOGIN_BTN_SELECTOR, timeout=ELEMENT_TIMEOUT)
- page.wait_for_timeout(timeout=3000)
- page.click(LOGIN_BTN_SELECTOR)
- logger.info(" 已点击登录按钮")
- page.wait_for_timeout(LOGIN_AFTER_CLICK)
- return True
- except PlaywrightTimeoutError as e:
- logger.error(f" 登录失败:元素定位超时 - {str(e)}")
- return False
- except Exception as e:
- logger.error(f" 登录异常:{str(e)}")
- return False
- def kill_masks(page):
- """
- 强制清理残留遮罩层/覆盖层,并恢复 body 可滚动、可点击状态
- """
- page.evaluate(r"""
- () => {
- const removed = [];
- const hidden = [];
- // 1) 先处理已知常见遮罩
- const knownSelectors = [
- '.v-modal',
- '.el-overlay',
- '.el-overlay-dialog',
- '.el-dialog__wrapper',
- '.el-message-box__wrapper',
- '.el-loading-mask',
- '.el-popup-parent--hidden'
- ];
- for (const sel of knownSelectors) {
- document.querySelectorAll(sel).forEach(el => {
- // v-modal / overlay 直接 remove 最省事
- removed.push(sel);
- el.remove();
- });
- }
- // 2) 再做一次“泛化兜底”:全屏 fixed/absolute + 高 z-index 的覆盖层
- // 注意:不要误删页面正常的固定导航,所以加上“近似全屏”的判断
- const all = Array.from(document.querySelectorAll('body *'));
- for (const el of all) {
- const s = window.getComputedStyle(el);
- if (!s) continue;
- const z = parseInt(s.zIndex || '0', 10);
- const pos = s.position;
- const pe = s.pointerEvents;
- if ((pos === 'fixed' || pos === 'absolute') && z >= 1000 && pe !== 'none') {
- const r = el.getBoundingClientRect();
- const nearFullScreen =
- r.width >= window.innerWidth * 0.8 &&
- r.height >= window.innerHeight * 0.8 &&
- r.left <= window.innerWidth * 0.1 &&
- r.top <= window.innerHeight * 0.1;
- // 常见遮罩是半透明背景色,或者透明但拦截点击
- const bg = s.backgroundColor || '';
- const looksLikeMask =
- nearFullScreen && (bg.includes('rgba') || bg.includes('rgb') || s.opacity !== '1');
- if (nearFullScreen) {
- // 不管透明不透明,只要近似全屏且高 z-index,就先让它不拦截点击
- el.style.pointerEvents = 'none';
- el.style.display = 'none';
- hidden.push(el.tagName + '.' + (el.className || ''));
- }
- }
- }
- // 3) 恢复 body / html 的滚动与交互(很多弹窗会锁滚动)
- document.documentElement.style.overflow = 'auto';
- document.body.style.overflow = 'auto';
- document.body.style.position = 'static';
- document.body.style.width = 'auto';
- document.body.style.paddingRight = '0px';
- // 4) 去掉 Element-UI 常见的锁定 class
- document.body.classList.remove('el-popup-parent--hidden');
- return { removed, hiddenCount: hidden.length, hidden };
- }
- """)
- def force_close_popup(page):
- """关闭新手引导/遮罩(多步:下一步/完成/我知道了),并兜底移除遮罩层"""
- try:
- # 1) 尝试连续点“下一步/完成/我知道了/关闭”
- for _ in range(5): # 最多点5次,足够覆盖多步引导
- btn = page.locator(
- "//button[normalize-space()='下一步' or normalize-space()='完成' or normalize-space()='我知道了' or normalize-space()='关闭']"
- ).first
- if btn.count() > 0 and btn.is_visible():
- btn.click(timeout=1500)
- page.wait_for_timeout(300)
- continue
- # 有些引导是右上角 X(如果存在就点)
- close_icon = page.locator(
- "xpath=//*[contains(@class,'close') or contains(@class,'el-icon-close') or name()='svg' or name()='i'][1]"
- ).first
- if close_icon.count() > 0 and close_icon.is_visible():
- close_icon.click(timeout=1000)
- page.wait_for_timeout(300)
- continue
- break
- # 2) 兜底:移除常见遮罩层(element-ui / 通用 mask/overlay)
- page.evaluate("""
- const selectors = [
- '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper',
- '[class*="mask"]', '[class*="overlay"]', '[style*="z-index"]'
- ];
- for (const sel of selectors) {
- document.querySelectorAll(sel).forEach(el => {
- const s = window.getComputedStyle(el);
- // 只移除“覆盖层”倾向的元素:fixed/absolute 且 z-index 很高
- if ((s.position === 'fixed' || s.position === 'absolute') && parseInt(s.zIndex || '0', 10) >= 1000) {
- el.remove();
- }
- });
- }
- """)
- except Exception:
- pass
- # 调用方式和方案1一致:在搜索后、采集前执行
- # force_close_popup(page)
- def pick_search_input(page):
- """优先选可见且可用的搜索输入框;第一个不行就尝试第二个"""
- inputs = page.locator(SEARCH_INPUT_SELECTOR)
- cnt = inputs.count()
- # 优先检查前两个(你说只有两个)
- for i in range(min(cnt, 2)):
- candidate = inputs.nth(i)
- try:
- candidate.wait_for(state="visible", timeout=1500) # 小超时快速试探
- if candidate.is_enabled():
- return candidate
- except PlaywrightTimeoutError:
- continue
- # 兜底:直接找任意可见的(避免命中 hidden 模板)
- candidate = page.locator(f"{SEARCH_INPUT_SELECTOR}:visible").first
- candidate.wait_for(state="visible", timeout=ELEMENT_TIMEOUT)
- return candidate
- def type_slow(locator, text: str, min_delay=0.06, max_delay=0.18):
- """逐字输入,模拟真人打字"""
- for ch in text:
- locator.type(ch, delay=int(random.uniform(min_delay, max_delay) * 1000))
- # ==================== 搜索操作函数 ====================
- def search_operation(page, keyword):
- """搜索框填充+提交搜索"""
- try:
- # 1) 找到“可用”的搜索框(第一个不行就用第二个)
- search_locator = page.locator(SEARCH_INPUT_SELECTOR)
- # 清空并填充搜索框
- search_locator.wait_for(timeout=ELEMENT_TIMEOUT)
- # 2. 清空搜索框(双重保障:先调用locator的clear,再手动全选删除)
- search_locator.click() # 聚焦
- search_locator.fill("")
- page.keyboard.down("Control") # 按住Control键
- page.keyboard.press("a") # 按a键
- page.keyboard.up("Control") # 松开Control键
- page.keyboard.press("Backspace") # 删除选中内容
- # 3) 逐字输入
- type_slow(search_locator, keyword, min_delay=0.06, max_delay=0.18)
- # 3. 输入搜索关键词
- # search_locator.fill(keyword)
- logger.info(f"📝 已输入搜索关键词:{keyword}")
- # 3) 搜索按钮也建议点可见的那个
- btn = page.locator(f"{SEARCH_BTN_SELECTOR}")
- btn.wait_for(state="visible", timeout=SEARCH_BTN_TIMEOUT)
- # btn.click()
- page.wait_for_timeout(3000)
- #获取新页面对象
- try:
- # 先开始监听新页面事件(在点击前)
- with page.context.expect_page(timeout=60000) as new_page_info:
- # 再执行点击操作
- btn.click()
- # 点击后获取新页面
- detail_page = new_page_info.value
- detail_page.wait_for_load_state("networkidle", timeout=20000)
- #点击出现的按钮
- test_btn = detail_page.locator("div[data-v-c65c36bc].first-time-highlight-message-btn button")
- btn_count = test_btn.count()
- logger.info(f"✅ 匹配到的元素数量:{btn_count}")
- test_btn.wait_for(state="attached", timeout=5000)
- test_btn.click()
-
- except PlaywrightTimeoutError:
- logger.warning(f"{get_current_time()} 未检测到新标签页")
- return None, False
- except Exception as e:
- logger.warning(f"{get_current_time()} 等待新标签页异常:{e}")
- return None, False
- force_close_popup(detail_page)
- kill_masks(detail_page)
- logger.info("✅ 已触发搜索")
- return detail_page, True
- # 搜索后等待结果加载
- # page.wait_for_timeout(COLLECT_DELAY)
- # return True
- except PlaywrightTimeoutError as e:
- logger.error(f" 搜索失败:元素定位超时 - {str(e)}")
- return None, False # 失败时返回 (None, False)
- except Exception as e:
- logger.error(f" 搜索异常:{str(e)}")
- return None, False # 失败时返回 (None, False)
- #翻下一页
- def goto_next_page(page) -> bool:
- """
- 尝试翻到下一页;成功返回True,没下一页/翻页失败返回False
- 适配常见 ElementUI: .el-pagination .btn-next / .el-pagination__next
- """
- # 多写几个候选,哪个能用就用哪个
- candidates = [
- ".el-pagination button.btn-next:not(.is-disabled)",
- ".el-pagination__next:not(.is-disabled)",
- "button:has-text('下一页'):not([disabled])",
- "a:has-text('下一页')",
- ]
- next_btn = None
- for sel in candidates:
- loc = page.locator(sel).first
- if loc.count() > 0:
- next_btn = loc
- break
- if not next_btn:
- return False
- # 用“当前页第一个商品标题”做翻页完成的判据(比只等networkidle更稳)
- first_title = page.locator(PRODUCT_TITLE_SELECTOR).first
- before = ""
- try:
- if first_title.count() > 0:
- before = first_title.inner_text(timeout=2000).strip()
- except:
- pass
- try:
- page.evaluate("window.scrollTo(0, 0);")
- next_btn.click(timeout=5000)
- page.wait_for_load_state("networkidle")
- # 等列表发生变化(标题变了 / 或者至少第一个标题重新出现)
- if before:
- page.wait_for_function(
- """(sel, oldText) => {
- const el = document.querySelector(sel);
- return el && el.innerText && el.innerText.trim() !== oldText;
- }""",
- arg=(PRODUCT_TITLE_SELECTOR, before),
- timeout=5000
- )
- else:
- first_title.wait_for(timeout=1000)
- return True
- except Exception as e:
- logger.warning(f" 翻页失败:{e}")
- return False
- def popup_guard(page, tag=""):
- """
- 全局弹窗/遮罩守卫:多步引导 + 关闭按钮 + 遮罩清理 + 恢复滚动
- tag 仅用于日志区分调用位置
- """
- try:
- # 给弹窗一点出现时间
- page.wait_for_timeout(300)
- # 1) 连续点“下一步/完成/我知道了/关闭”
- for _ in range(6):
- btn = page.locator(
- "xpath=//button[normalize-space()='下一步' or normalize-space()='完成' or normalize-space()='我知道了' or normalize-space()='关闭']"
- ).first
- if btn.count() > 0 and btn.is_visible():
- btn.click(timeout=1500)
- page.wait_for_timeout(250)
- continue
- # 2) 常见的 close icon
- close_btn = page.locator(
- "css=.el-dialog__headerbtn, .el-message-box__headerbtn, .close, .icon-close, .el-icon-close"
- ).first
- if close_btn.count() > 0 and close_btn.is_visible():
- close_btn.click(timeout=1200)
- page.wait_for_timeout(250)
- continue
- break
- # 3) 清遮罩 + 恢复滚动/交互
- page.evaluate(r"""
- () => {
- // 第一步:精准清理已知的遮罩/弹窗类名(Element UI框架常用)
- const selectors = [
- '.v-modal', '.el-overlay', '.el-overlay-dialog', '.el-dialog__wrapper',
- '.el-message-box__wrapper', '.el-loading-mask'
- ];
- selectors.forEach(sel => document.querySelectorAll(sel).forEach(e => e.remove()));
- // 泛化兜底:近似全屏 + 高 z-index 的层直接屏蔽
- const all = Array.from(document.querySelectorAll('body *'));
- for (const el of all) {
- const s = getComputedStyle(el); // 获取元素的实际样式(含CSS生效的样式)
- const z = parseInt(s.zIndex || '0', 10); // 取元素的层级(z-index),默认0
- // 条件1:元素是固定/绝对定位(弹窗/遮罩常见定位方式)+ 层级≥1000(高优先级遮挡)+ 能拦截鼠标事件
- if ((s.position === 'fixed' || s.position === 'absolute') && z >= 1000 && s.pointerEvents !== 'none') {
- const r = el.getBoundingClientRect(); // 获取元素的尺寸和位置
- // 条件2:元素宽度/高度≥屏幕80%(近似全屏遮罩)
- const nearFull = r.width >= innerWidth * 0.8 && r.height >= innerHeight * 0.8;
- if (nearFull) {
- el.style.pointerEvents = 'none'; // 让元素不拦截鼠标点击
- el.style.display = 'none'; // 隐藏元素
- }
- }
- }
- // 第三步:恢复页面滚动功能(弹窗常把页面设为不可滚动)
- document.documentElement.style.overflow = 'auto'; // html标签恢复滚动
- document.body.style.overflow = 'auto'; // body标签恢复滚动
- document.body.classList.remove('el-popup-parent--hidden'); // 移除Element UI的滚动禁用类
- }
- """)
- logger.info("杀除弹窗成功")
- except Exception:
- pass
- def open_detail_page(list_page, item, keyword, idx, *, timeout=15000):
- """
- 点击商品进入详情页,兼容:
- 1) 新开 tab(返回 detail_page != list_page, opened_new_tab=True)
- 2) 同 tab 跳转(detail_page == list_page, opened_new_tab=False)
- """
- ctx = list_page.context
- list_url = list_page.url
- detail_page = None
- opened_new_tab = False
- try:
- # 期望新开 tab(很多站点会这样)
- with ctx.expect_page(timeout=timeout) as p:
- item.click(delay=random.uniform(0.1, 0.3))
- detail_page = p.value
- opened_new_tab = True
- logger.info(f" 「{keyword}」第{idx}个商品 - 新开标签页进入详情")
- except PlaywrightTimeoutError:
- # 兜底:没新开 tab,大概率是同页跳转/弹层
- detail_page = list_page
- opened_new_tab = False
- logger.info(f" 「{keyword}」第{idx}个商品 - 未新开标签页,按同页进入详情处理")
- return detail_page, opened_new_tab, list_url
- def return_to_list(list_page, detail_page, opened_new_tab, list_url, keyword, idx):
- """
- 从详情页返回列表页:
- - 新 tab:关闭 tab,然后 bring_to_front 切回
- - 同 tab:尽量 go_back 回到 list_url;如果没跳转而是弹层,尝试 ESC
- """
- # 如果浏览器/页面已经被关了,直接退出,避免二次异常
- if list_page is None or list_page.is_closed():
- logger.warning(f" 「{keyword}」第{idx}个商品 - 列表页已关闭,无法切回")
- return
- if opened_new_tab:
- # 只关“新开的详情 tab”,绝不关 list_page
- try:
- if detail_page and (detail_page is not list_page) and (not detail_page.is_closed()):
- detail_page.close()
- logger.info(f"📌 「{keyword}」第{idx}个商品 - 已关闭详情页标签页")
- except Exception as e:
- logger.warning(f" 「{keyword}」第{idx}个商品 - 关闭详情页失败:{e}")
- # 切回列表页
- try:
- list_page.bring_to_front()
- list_page.mouse.move(random.randint(100, 300), random.randint(200, 400))
- random_delay(0.3, 0.8)
- list_page.wait_for_load_state("networkidle")
- logger.info(f" 「{keyword}」第{idx}个商品 - 已切回列表页(新tab模式)")
- except Exception as e:
- logger.warning(f" 「{keyword}」第{idx}个商品 - 切回列表页失败:{e}")
- return
- # 同 tab:detail_page == list_page
- try:
- # 1) 如果 URL 变了,说明确实跳转了 → go_back 回去
- if list_page.url != list_url:
- for _ in range(3): # 最多退 3 次,防止死循环
- list_page.go_back(timeout=15000)
- list_page.wait_for_load_state("domcontentloaded", timeout=15000)
- random_delay(0.2, 0.5)
- if list_page.url == list_url:
- break
- logger.info(f" 「{keyword}」第{idx}个商品 - 已返回列表页(同tab跳转模式)")
- else:
- # 2) URL 没变:可能是弹层详情 → 尝试 ESC 关闭弹层
- list_page.keyboard.press("Escape")
- random_delay(0.2, 0.5)
- logger.info(f" 「{keyword}」第{idx}个商品 - 已尝试关闭弹层并留在列表页(同tab弹层模式)")
- list_page.bring_to_front()
- list_page.wait_for_load_state("networkidle")
- except Exception as e:
- logger.warning(f" 「{keyword}」第{idx}个商品 - 同tab返回列表页失败:{e}")
- #判断店名是否已经在数据库
- def shop_is_exists_database(shop):
- try:
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor(pymysql.cursors.DictCursor) # 改为字典游标
- query_sql = """
- SELECT province, city, business_license_company, qualification_number FROM ybm_shop_info_middle
- WHERE shop = %s
- """
- cursor.execute(query_sql, (shop,))
- result = cursor.fetchone()
- # 正确的调试方式(替代cursor._last_executed)
- print(f"【调试】传入的店铺名:{repr(shop)}") # repr能显示空格/隐藏字符
- print(f"【调试】查询参数:{shop}")
- print(f"【调试】查询结果:{result} → 函数返回:{bool(result)}")
- is_exists = bool(result)
- if is_exists:
- logger.info(f"【店铺存在校验】店铺已存在 | 店铺名:{repr(shop)} | 结果:存在(True)不要执行采集店铺")
- else:
- logger.info(f"【店铺存在校验】店铺不存在 | 店铺名:{repr(shop)} | 结果:不存在(False)")
- return is_exists, result
- except Exception as e:
- logger.error(f"查询店铺失败:{e}")
- return False, None # 异常时明确返回False,避免返回None
- finally:
- # 修复:关闭游标和连接,避免泄露
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- def insert_shop_info_to_db(shop,contact_address, qualification_number, business_license_company, business_license_address, scrape_date, platform, province, city, create_time, update_time):
- """
- 把字段插入到ybm_shop_info_middle表
- :param 各参数: 你要插入的字段值(空字符串也可)
- :return: bool - 插入成功返回True,失败返回False
- """
- # 1. 初始化数据库连接和游标
- conn = None
- cursor = None
- try:
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor()
- # 2. 构造INSERT SQL语句(参数化查询,防止SQL注入)
- # 注意:请确认ybm_shop_info_middle表的字段名和以下%s的顺序对应!
- # 若表字段名不同,修改INSERT后的字段列表(比如你的表字段是credit_code而非qualification_number,要对应改)
- sql = """
- INSERT INTO ybm_shop_info_middle (
- shop,
- contact_address,
- qualification_number,
- business_license_company,
- business_license_address,
- scrape_date,
- platform,
- province,
- city,
- create_time,
- update_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- contact_address = VALUES(contact_address), # 重复时更新联系地址
- qualification_number = VALUES(qualification_number), # 更新社会信用代码
- business_license_company = VALUES(business_license_company), # 更新公司名
- business_license_address = VALUES(business_license_address), # 更新地址
- scrape_date = VALUES(scrape_date),
- platform = VALUES(platform),
- province = VALUES(province),
- city = VALUES(city),
- update_time = VALUES(update_time) # 重复时更新update_time
- """
- # 3. 构造插入的参数(顺序必须和SQL中的%s一一对应)
- params = (
- shop, # 店铺名称
- contact_address, # 联系地址
- qualification_number, # 社会信用代码
- business_license_company, # 营业执照公司名
- business_license_address, # 营业执照地址
- scrape_date, # 爬取日期
- platform, # 平台名称(药九九)
- province, # 省份
- city, # 城市
- create_time, # create_time(当前时间)
- update_time
- )
- # 4. 执行SQL并提交事务
- cursor.execute(sql, params)
- conn.commit()
- print(f"✅ 数据插入成功!店铺:{shop} | 公司:{business_license_company}")
- return True
- except pymysql.MySQLError as e:
- # 数据库相关错误(连接失败、SQL语法错误、字段不匹配等)
- print(f"MySQL插入失败:{e}")
- print(f"详细异常信息:{traceback.format_exc()}") # 打印详细堆栈,方便排查
- if conn:
- conn.rollback() # 插入失败回滚事务
- return False
- except Exception as e:
- # 其他未知错误
- print(f"插入数据时发生未知错误:{e}")
- print(f"详细异常信息:{traceback.format_exc()}")
- if conn:
- conn.rollback()
- return False
- finally:
- # 5. 无论成功/失败,都关闭游标和连接(释放资源)
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- def insert_single_to_mysql(single_data):
- """
- 逐条插入单条数据到MySQL数据库
- :param single_data: 单条商品数据元组
- :return: 插入是否成功
- """
- conn = None
- cursor = None
- try:
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor()
- # 2. 确保表存在(兼容表未创建的情况)
- # cursor.execute(CREATE_TABLE_SQL)
- insert_sql = """
- INSERT INTO ybm_drug_middle (
- product, my_good_price, min_price, manufacture_date, expiry_date,
- shop, business_license_company, province, city, manufacturer,
- specification, approval_number, product_link, scrape_date,
- scrape_province, availability, credit_code, platform, search_key,
- number, is_sold_out, sales, inventory, snapshot_url, update_time, create_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- # 字段值(与SQL占位符顺序严格对应)
- values = (
- single_data["product"],
- single_data["my_good_price"],
- single_data["min_price"],
- single_data["manufacture_date"],
- single_data["expiry_date"],
- single_data["shop"],
- single_data["business_license_company"],
- single_data["province"],
- single_data["city"],
- single_data["manufacturer"],
- single_data["specification"],
- single_data["approval_number"],
- single_data["product_link"],
- single_data["scrape_date"],
- single_data["scrape_province"],
- single_data["availability"],
- single_data["credit_code"],
- single_data["platform"],
- single_data["search_key"],
- single_data["number"],
- single_data["is_sold_out"],
- single_data["sales"],
- single_data["inventory"],
- single_data["snapshot_url"],
- single_data["update_time"],
- single_data["create_time"]
- )
- cursor.execute(insert_sql, values)
- conn.commit()
- logger.info(f" 单条数据插入成功:...") # 仅打印标题前20字
- return True
- except OperationalError as e:
- logger.error(f" MySQL连接失败:{str(e)}")
- if conn:
- conn.rollback()
- return False
- except ProgrammingError as e:
- logger.error(f" SQL语法错误:{str(e)}")
- if conn:
- conn.rollback()
- return False
- except Exception as e:
- logger.error(f" 单条数据插入失败:{str(e)}")
- if conn:
- conn.rollback()
- return False
- finally:
- # 关闭游标和连接
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- def check_dup_in_biz_db(product_link, discount_price_val, scrape_date):
- """直接查询业务表是否存在该商品链接+价格"""
- conn = None
- cursor = None
- log_context = (
- f"【去重校验】商品链接:{product_link.strip()} | 价格:{discount_price_val} "
- f"采集日期:{scrape_date.strip()}"
- )
- try:
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor()
- sql = """
- SELECT * FROM ybm_drug_middle
- WHERE product_link = %s AND min_price = %s AND scrape_date=%s
- """
- # 先执行查询
- cursor.execute(sql, (product_link.strip(), discount_price_val, scrape_date.strip()))
- # 再判断是否有结果
- # 如果 fetchone() 返回元组(比如(1,))→ (1,) is not None → 结果为 True;
- # 如果 fetchone() 返回 None → None is not None → 结果为 False。
- is_dup = cursor.fetchone() is not None
- if is_dup:
- logger.warning(f"{log_context} - 表中已存在重复记录,跳过本次采集")
- else:
- logger.info(f"{log_context} - 表中无重复记录,正常采集")
- return is_dup
- except Exception as e:
- logger.error(f"查询业务表去重失败:{str(e)}")
- return False
- finally:
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- # 压缩图片函数
- def compress_image(image_data, max_size=4*1024*1024): # 4MB上限
- try:
- img = Image.open(BytesIO(image_data))
- # 将RGBA模式转为RGB(兼容JPEG)
- if img.mode in ('RGBA', 'P'): # P是PNG的调色板模式,也需转换
- # 新建白色背景的RGB图片,把透明图贴上去(避免透明区域变黑)
- bg_img = Image.new('RGB', img.size, (255, 255, 255))
- bg_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
- img = bg_img
- # 缩小分辨率(按比例缩到宽≤1000px)
- if img.width > 1000:
- ratio = 1000 / img.width
- new_size = (int(img.width*ratio), int(img.height*ratio))
- img = img.resize(new_size, Image.Resampling.LANCZOS)
- # 降低质量(JPG)/压缩(PNG)
- output = BytesIO()
- img.save(output, format='JPEG', quality=80) # quality越小体积越小
- compressed_data = output.getvalue()
- # 若仍超限,继续降质量
- if len(compressed_data) > max_size:
- img.save(output, format='JPEG', quality=60)
- compressed_data = output.getvalue()
- return compressed_data
- except Exception as e:
- logger.debug(f"图片压缩失败:{e}")
- return image_data # 压缩失败返回原始数据
- def download_image_to_base64(image_url, save_dir = "./download_images"):
- """下载网络图片,返回图片二进制数据(BytesIO)"""
- try:
- if not os.path.exists(save_dir):
- os.makedirs(save_dir) # 创建多级目录(比如a/b/c)
- print(f"创建本地保存目录:{save_dir}")
- except Exception as e:
- print(f"创建保存目录失败:{str(e)}")
- return None
- try:
- # 模拟浏览器请求头,避免被服务器拦截
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
- }
- response = requests.get(image_url, headers=headers, timeout=15)
- response.raise_for_status()
- compressed_data = compress_image(response.content)
- image_base64 = base64.b64encode(compressed_data).decode("utf-8")
- image_data = compressed_data
- # 步骤3:提取图片文件名(从URL中截取,避免重复)
- # 示例URL:https://xxx.com/123.jpg → 文件名:123.jpg
- file_name = image_url.split("/")[-1]
- # 处理特殊字符(避免文件名非法)
- file_name = file_name.replace("?", "").replace("&", "").replace("=", "")
- save_path = os.path.join(save_dir, file_name) # 完整保存路径
- # 步骤4:保存图片到本地
- with open(save_path, "wb") as f:
- f.write(image_data)
- print(f"图片已保存到本地:{save_path}")
- return image_base64
- except requests.exceptions.Timeout:
- print(f"下载图片超时:{image_url}")
- return None
- except requests.exceptions.HTTPError as e:
- print(f"图片URL无效(状态码:{response.status_code}):{image_url}")
- return None
- except Exception as e:
- print(f"下载图片失败:{str(e)}")
- return None
- def get_ocr_res(img):
- try:
- #img地址
- print(f'开始识别图片:{img}')
- request_url = request_url_config
- img_base64 = download_image_to_base64(img)
- if not img_base64:
- print("图片下载/转Base64失败,终止OCR识别")
- return None
- # 获取access_token
- access_token = get_access_token()
- if not access_token:
- print("获取access_token失败,无法调用OCR接口")
- return None
- params = {"image": img_base64}
- request_url = request_url + "?access_token=" + access_token
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- response = requests.post(request_url, data=params, headers=headers)
- if response:
- res = response.json()
- # 检查OCR返回是否有错误
- if "error_code" in res:
- print(f"百度OCR接口错误:{res['error_msg']}(错误码:{res['error_code']})")
- return None
- # 解析识别结果
- new_dic = dict()
- for ite in res['words_result'].keys():
- new_dic[ite] = res['words_result'][ite]['words']
- print('资质数据信息', new_dic)
- return new_dic
- else:
- print("OCR接口返回空响应")
- return None
- except requests.exceptions.RequestException as e:
- print(f"网络错误(图片下载/OCR请求失败):{str(e)}")
- return None
- except KeyError as e:
- print(f"OCR响应格式异常,缺失字段:{str(e)}")
- return None
- except Exception as e:
- print(f"OCR识别未知错误:{str(e)}")
- return None
- def get_access_token():
- AppKey = AppKey_config
- AppSrcret = AppSecret_config
- token_url =token_url_config
- url = f"{token_url}?grant_type=client_credentials&client_id={AppKey}&client_secret={AppSrcret}"
- payload = ""
- headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'application/json'
- }
- try:
- response = requests.request("POST", url, headers=headers, data=payload)
- response.raise_for_status() # 触发HTTP错误
- return response.json()['access_token']
- except Exception as e:
- print(f"获取access_token失败:{str(e)}")
- return None
- def extract_province_city(address):
- """
- 从地址中提取省份和城市
- :param address: 营业执照地址(如"福建省福州市马尾区")
- :return: (province, city) - 提取到的省份/城市,提取失败返回空字符串
- """
- if not address: # 地址为空,直接返回空
- return "", ""
- # 正则1:匹配省份(兼容省/自治区/直辖市/特别行政区)
- province_pattern = re.compile(r'([^省]+省|.+自治区|北京市|上海市|天津市|重庆市|.+特别行政区)')
- province_match = province_pattern.search(address)
- province = province_match.group(1) if province_match else ""
- # 正则2:匹配城市(兼容市/自治州/地区/盟,且排除省份已匹配的部分)
- # 先去掉已匹配的省份,再匹配城市
- address_remain = address.replace(province, "").strip() if province else address.strip()
- city_pattern = re.compile(r'([^市]+市|.+自治州|.+地区|.+盟|^[^\d区县镇]+)')
- city_match = city_pattern.search(address_remain)
- city = city_match.group(1).strip() if city_match else ""
- # 兼容直辖市(如"北京市朝阳区"→city=北京市)
- if province in ["北京市", "上海市", "天津市", "重庆市"]:
- city = province
- # 兼容地址不规范的情况(如"福建福州马尾区",无"省"/"市"字)
- if not province and not city:
- # 匹配前两个地名(如"福建福州"→province=福建,city=福州)
- simple_pattern = re.compile(r'^([^\d区县镇]+)')
- simple_match = simple_pattern.search(address)
- if simple_match:
- city = simple_match.group(1).strip() # 只有城市,省份留空
- if city and province in city:
- city = city.replace(province, "").strip()
- return province.strip(), city.strip()
- #采集数据核心
- def collect_data(page, keyword):
- """
- 1) 先获取当前页商品个数(count)
- 2) 按循环次数采集;每循环15次滚动一次 slow_scroll_1200px
- 3) 当前页循环完 -> goto_next_page;有下一页继续;无下一页结束该关键词
- """
- collect_result = []
- # seen = set()
- logger.info(f"📊 开始采集「{keyword}」的商品数据")
- page.wait_for_load_state("networkidle")
- #没有找到商品就跳过这个商品
- page_no = 1
- while True:
- logger.info(f"\n📄 「{keyword}」开始采集第 {page_no} 页")
- # 记录列表页URL(可用于你后续兜底)
- list_page_url = page.url
- logger.info(f"📌 已记录商品列表页URL:{list_page_url}")
- # ✅ 先获取当前页商品个数
- page.wait_for_load_state("networkidle")
- total_limit = page.locator(PRODUCT_ITEM_SELECTOR).count()
- logger.info(f"📌 「{keyword}」第{page_no}页 初始商品个数(count):{total_limit}")
- # 重置当前页的采集计数
- collected_count = 0
- # ========= 初始化无匹配计数器(记录标题不包含核心关键词的次数) =========
- # no_match_count = 0 # 无匹配次数初始化为0
- # MAX_NO_MATCH = 10 # 最大无匹配次数阈值
- #补充没找到关键词的兜底
- not_found_keywords = page.locator("span:has-text('新品登记')")
- if not_found_keywords.count() > 0:
- logger.warning(f"⚠️ 关键词「{keyword}」无匹配商品,直接跳过整个关键词采集")
- return []
- for idx in range(total_limit):
- detail_page = None
- try:
- item = page.locator(PRODUCT_ITEM_SELECTOR).nth(idx)
- collected_count += 1 # 实际采集计数(用于日志)
- # ========= 反爬随机延迟(保留你的原逻辑也行) =========
- page.wait_for_load_state("networkidle")
- delay = random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- logger.info(f"📌 「{keyword}」第{page_no}页 第{collected_count}/{total_limit}个商品 - 等待{delay:.2f}秒后采集(反爬)")
- # 1. 初始化所有字段默认值
- title = "无标题"
- price = "0.00"
- shop = "无店名"
- expiry_date = "无有效期"
- manufacture_date = "无生产日期"
- approval_number = "无批准文号"
- manufacturer = "未知公司"
- # discount_price = "0.00"
- spec = "未知规格"
- num = 1 # ✅ 默认 1
- platform = '药帮忙'
- current_time = datetime.now().strftime("%Y-%m-%d")
- is_sold_out = 0
- # ========= 售罄不跳过 =========
- # sold_locator = item.locator('div[data-v-480da687].gc-l1-cirle_tip')
- # if sold_locator.count() > 0:
- # is_sold_out = 1
- # logger.warning(f" 「{keyword}」第{page_no}页 第{collected_count}个商品已售罄")
- # if collected_count % 5 == 0 and collected_count > 0:
- # logger.info("采满5个往下滑")
- # slow_scroll_400px(page)
- # page.wait_for_load_state("networkidle")
- # continue
- #提取商品ID
- # product_id_elem = item.locator('div.product-card[data-product-id]')
- # if product_id.count() > 0:
- # product_id = product_id_elem.get_attribute("data-product-id")
- # logger.info(f"✅ 提取到data-product-id:{product_id}") # 输出:5678955
- # 提取商品标题(处理空值)
- product_locator = item.locator(PRODUCT_TITLE_SELECTOR)
- if product_locator.count() > 0:
- title = product_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页标题:{title}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品 - 列表页标题元素未找到,使用默认值:{title}")
- #关键词不在标题中,跳过当前商品
- # core_keyword = re.sub(r'^999[\s\(\)()、·]*', '', keyword)
- # if core_keyword not in title:
- # no_match_count += 1
- # logger.warning(f" 「{keyword}」第{collected_count}个商品 - 标题「{title}」不包含核心关键词「{core_keyword}」(无匹配次数:{no_match_count}/{MAX_NO_MATCH}),跳过本次循环")
- # continue
- # if no_match_count >= MAX_NO_MATCH:
- # logger.error(f"❌ 关键词「{keyword}」无匹配商品次数已达{MAX_NO_MATCH}次,直接终止当前关键词采集,进入下一个关键词")
- # return []
- # 提取价格(带缺失日志)
- # price_locator = item.locator(PRODUCT_PRICE_SELECTOR)
- price_int = item.locator('//span[@class="price-int"]').text_content().strip()
- # 2. 提取小数部分(注意可能为空,比如价格是整数13)
- price_decimal_elem = item.locator('//span[@class="price-decimal"]')
- if price_decimal_elem.count() > 0:
- price_decimal = price_decimal_elem.text_content().strip()
- else:
- price_decimal = ''
- # 3. 拼接完整价格
- full_price = f"{price_int}{price_decimal}"
- # 转成浮点数(便于后续计算/入库)
- full_price_num = float(full_price)
- logger.info(f"✅ 提取到价格:{full_price_num}")
- if full_price_num is None:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页采购价格元素未找到,使用默认值:{price}")
- # if full_price_num > 0:
- # price = price_locator.inner_text(timeout=3000).strip()
- # logger.info(f"{'='*10}{keyword}」第{collected_count}个商品 - 列表页采购价格:{price}{'='*10}")
- # else:
- # price = "0.00" # 初始化默认值,避免后续报错
- # logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页采购价格元素未找到,使用默认值:{price}")
- # 5. 提取公司名称(带缺失日志)
- manufacturer_locator = item.locator(PRODUCT_COMPANY_SELECTOR)
- if manufacturer_locator.count() > 0:
- manufacturer = manufacturer_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页公司名:{manufacturer}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页公司名称元素未找到,使用默认值:{manufacturer}")
- #提取店铺名称
- shop_locator = item.locator(PRODUCT_STORE_SELECTOR)
- if shop_locator.count() > 0:
- shop = shop_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页店名:{shop}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页店铺名称元素未找到,使用默认值:{shop}")
- # 提取折扣价
- discount_price_val_origin = ""
- discount_price = ""
- discount_price_locator = item.locator('span[data-v-4cb6cc1f].discount-int').first
- if discount_price_locator.count() > 0:
- discount_price = discount_price_locator.inner_text(timeout=3000).strip()
- discount_price_val_origin = discount_price
- match = re.search(r'\d+\.?\d*', str(discount_price_val_origin))
- discount_price_val = float(match.group()) if match else 0.00
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页折扣价:{discount_price_val}{'='*10}")
- else:
- #如果没有拿原价替换
- # price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "") else "0.00"
- discount_price_val = full_price_num
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 折扣价元素未找到,使用采购价兜底:{discount_price_val}")
- merged_price = f"{full_price_num}{discount_price_val_origin}" if discount_price_val_origin else full_price_num
- # 提取有效期(处理空值)
- expiry_date_locator = item.locator(f"{PRODUCT_VALIDITY_SELECTOR}")
- if expiry_date_locator.count() > 0:
- expiry_date = expiry_date_locator.inner_text(timeout=3000).strip().replace('-', '') #.replace('近效期','')
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页有效期:{expiry_date}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 有效期元素未找到,使用默认值:{expiry_date}")
- #获取product_id
- # product_id = None
- # try:
- # product_id = item.get_attribute("data-product-id")
- # if product_id:
- # product_id = product_id.strip()
- # logger.info(f"✅ 「{keyword}」第{collected_count}个商品 - 提取到product_id:{product_id}")
- # ========= 模拟点击商品进入详情页 =========
- logger.info(
- f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 模拟鼠标移动并点击"
- )
- # 点击商品项容器,触发详情展示
- # ========== 点击商品跳详情页 ==========
- # 反爬:模拟真人鼠标移动到商品上再点击(不是直接点击)
- item.hover() # 先悬停
- random_delay(0.2, 0.5) # 悬停后延迟
- item.dispatch_event("mousedown")
- random_delay(0.05, 0.15) # 鼠标按下后延迟
- item.dispatch_event("mouseup")
- random_delay(0.05, 0.1) # 鼠标松开后延迟
- try:
- with page.context.expect_page(timeout=60000) as p:
- item.click(delay=random.uniform(0.1, 0.3))
- detail_page = p.value
- except PlaywrightTimeoutError:
- logger.warning(
- f" 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 未检测到新标签页,使用当前页采集详情"
- )
- detail_page = None # 标记为无新标签页,避免关闭列表页
- # 等待详情加载(优先用新标签页,无则用列表页)
- target_page = detail_page if detail_page else page
- target_page.wait_for_load_state("networkidle", timeout=20000)
- delay = random_delay(MIN_PAGE_DELAY, MAX_PAGE_DELAY)
- logger.info(
- f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 详情页加载完成,等待{delay:.2f}秒(反爬)"
- )
- # 反爬:检测详情页反爬验证
- # check_anti_crawl(page)
- # ========== 采集详情页的专属信息(有效期/生产日期/批准文号) ==========
- #获取商品详情页链接
- product_link = target_page.url
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页链接:{product_link}{'='*10}")
- # ========= ✅ 去重逻辑,拿商品链接和折扣价和有效期和采集日期 =========
- if check_dup_in_biz_db(product_link, full_price_num, current_time):
- logger.warning(f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过")
- # ========== 关闭新标签页,切回列表页 ==========
- if detail_page and not detail_page.is_closed():
- detail_page.close() # 关闭详情页标签
- logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页")
- # 切回原列表页(第一个标签页)
- page.bring_to_front() # 激活列表页
- page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标
- random_delay(0.5, 1.0) # 增加切换后延迟
- page.wait_for_load_state("networkidle")
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页")
- if collected_count % 5 == 0 and collected_count > 0:
- logger.info("采满5个往下滑")
- slow_scroll_400px(page)
- page.wait_for_load_state("networkidle")
- continue
- # key = f"{product_link.strip()}|{discount_price_val}"
- # if key in seen:
- # logger.warning(
- # f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过"
- # )
- # if collected_count % 5 == 0 and collected_count > 0:
- # logger.info("采满15个往下滑")
- # slow_scroll_400px(page)
- # page.wait_for_load_state("networkidle")
- # continue
- # seen.add(key)
- # 提取生产日期(修复完成)
- manufacture_date_locator = target_page.locator('//div[contains(@class, "spec-info-item") and .//div[contains(@class, "spec-info-item-label") and normalize-space(.)="生产日期"]]//div[contains(@class, "spec-info-item-value-text")]')
- if manufacture_date_locator.count() > 0:
- manufacture_date = manufacture_date_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页生产日期:{manufacture_date}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 生产日期元素未找到,使用默认值:{manufacture_date}")
- # 提取批准文号
- approval_number_locator = target_page.locator('//div[contains(@class, "spec-info-item") and .//div[contains(@class, "spec-info-item-label") and normalize-space(.)="批准文号"]]//div[contains(@class, "spec-info-item-value-text")]')
- if approval_number_locator.count() > 0:
- approval_number = approval_number_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页批准文号:{approval_number}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 批准文号元素未找到,使用默认值:{approval_number}")
- #提取规格
- spec_locator = target_page.locator('//div[contains(@class, "spec-info-item") and .//div[contains(@class, "spec-info-item-label") and normalize-space(.)="规格"]]//div[contains(@class, "spec-info-item-value-text")]')
- if spec_locator.count() > 0:
- spec = spec_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页规格:{spec}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count,补充规格数量不足的提示
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 规格元素数量不足,使用默认值:{spec}")
- # input("...")
- #提取库存
- storage = ''
- storage_locator = target_page.locator('[data-v-51f0e85d].detail-input-num-right-title')
- if storage_locator.count() > 0:
- storage = storage_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页库存:{storage}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count,补充规格数量不足的提示
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 库存元素数量不足,使用默认值:{storage}")
- #提取销量
- sell = ''
- sell_locator = target_page.locator('div.detail-info-content-item-value-price-top-right div[data-v-95163d4a]')
- if sell_locator.count() > 0:
- sell = sell_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页销量:{sell}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 没有销量元素,使用默认值:{sell}")
- #保存快照url上传到oss
- try:
- local_path, oss_url = screenshot_target_page_to_local_then_oss(
- target_page=target_page,
- full_page=True # 截取全屏
- )
- print(f"最终结果:")
- print(f" 本地文件路径:{local_path}")
- logger.info(f" OSS访问链接:{oss_url}")
- except Exception as e:
- logger.warning(f"整体流程执行失败:{str(e)}")
- # input("...")
- province = ""
- city = ""
- business_license_company = ""
- qualification_number = ''
- #如果店名为商品预约中心
- # if shop == '药店品种预约中心':
- # #https://www.ybm100.com/new-front/product-info/detail?type__1241=222029ad07-tWcfAcrWtc_CSPpP_%2FtW_cfB_ETca0SugQSbgC7gAb5RAdZyTA5UdS%3DUAoogIsKBqyWgKP_tgAPItgePrBgRPrlgQP_ug0PTZgEPrugpPA5lq%3DSQPg%3Dgt2_xg%3D2FPgs0oBgYqwcg9%3DWPTuSgTHgtBsfgGEh%3D%2FXvko2R%3DGvhceloleBnCGBqcG%2F2V_uKVUBftg
- # #获取pidhttps://www.ybm100.com/new-front/product-info/detail?type__1241=222029ad07-G%2FxP7PxPJgfPUgu%2FIbv7Wg6gpIgwJg5q4PfAg%2FTWZ_Q6gtHaHG%2FgWCPKsClvGsLPVsgQyuBlVVPTqgtvgQgWvG6gOPTkg5%2F_jgAvTog6vT4g5v_6gSU7vC9cggZgvPAtgZJBPgysGg_OuH%2Fg9ToPgjkBgO%2FgaCQggY7KNlo7itg%2FBGP2GrJpPV6%2FQ6f_u6qvMjPvQVIgPg
- # url = 'https://www.ybm100.com/new-front/product-info/detail?type__1241=222029ad07-G%2FxP7PxPJgfPUgu%2FIbv7Wg6gpIgwJg5q4PfAg%2FTWZ_Q6gtHaHG%2FgWCPKsClvGsLPVsgQyuBlVVPTqgtvgQgWvG6gOPTkg5%2F_jgAvTog6vT4g5v_6gSU7vC9cggZgvPAtgZJBPgysGg_OuH%2Fg9ToPgjkBgO%2FgaCQggY7KNlo7itg%2FBGP2GrJpPV6%2FQ6f_u6qvMjPvQVIgPg'
- # data = {
- # 'id': f'{product_id}',
- # 'isMainProductVirtualSupplier': 0
- # }
- # headers = {
- # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
- # 'Cookie': '_abfpc=48083f46aa22e0eaefbace39874e38acc7c631ea_2.0; cna=2b5bf2a0d04d0ec45367fda825d4fa6b; xyy=MjM2JjE4MDA4NjUwMzAw; JSESSIONID=922A896126C5961D09622E042CAAA01D; xyy_token=eyJhbGciOiJIUzUxMiJ9.eyJhY2NvdW50X2lkIjoyMzYsImRldmljZV9pZCI6IiIsIm9zIjoiV2luZG93cyAxMCIsImxvZ2luX3RpbWUiOjE3NjkxNjAzNDQ5MDYsImJyb3dzZXIiOiJDaHJvbWUgMTQiLCJtZXJjaGFudF9pZCI6MjM2LCJpcF9hZGRyIjoiMTEzLjk4LjYyLjE2NiIsInZlcnNpb24iOiIiLCJsb2dpbl91c2VyX2tleSI6IjM3NzQ2ZjM5LTE3MjQtNDBjYi1hNTk4LWRlYTM5MTU2NjllNSJ9.IN8gFX6p4KuClT2KysZLNVuyQuszfdNW5gz7m_u4yq60zqbvSOg1yo0f7TuKcbZVvd-t5mVsb4hoNBRNV6nsYQ; xyy_principal=236&Y2MwY2FiZGYzZjU4NzUzNGE5OWRkZTIwYmRiMmQ4NTk2ZDg5N2QxOQ&236; xyy_last_login_time=1769160344906; acw_tc=1a0c650c17694095621061999e5d6b6730068c59854298f31bdd661882a009; qt_session=KsnsuMqE_1769409754197; ssxmod_itna=1-eq0xgDnDyAeYqDKi=G0KKG7DRDIEpDpxgGDBP01G7DuExjKidtDUDQulGmFgG4G=oG7iheet3RLKNDlpLeDZDGKQDqx0Eb0iiD4Ns3ImkiT53QQGvqUdaeOENowZaTRbY9oVG6MxfXy/UDgEeDU4GnD068CY6bDYYLDBYD74G_DDeDi2rD84D_DGpdMnudxi33nDeDzqr=xG3txYpdweDgADDB_RiDKkP=hDDlGA7YREbPAcTq6PmzxGU8lCGxUeDMFxGXmikYUQy6MK4rZCSfp1EYH1aDtqD9DgbDb42zvrTbp6ebF_mbS_83r1Ki=3iifhNQ2rt0iC0_Yiofx4lxxfxx3Be5WHiTHDDW=fd1xxq05p71UdznuzuAernD=xIxRtbj=/74anQqf5Dxx4hYb0DnOGK0D3j=bGrxnD4D; ssxmod_itna2=1-eq0xgDnDyAeYqDKi=G0KKG7DRDIEpDpxgGDBP01G7DuExjKidtDUDQulGmFgG4G=oG7iheet3RLFoDiaRAqzbCD7pxTs4GNeYfb78=o8pWc0HY8dN0vO6z5i69OeF5Dg34naHHkD98UZ3tVAb=9/L3BSLIczMds0bxfCAIfG0eY3oTQym5z/oAhmi4qDLetNaD',
- # 'Referer': f'https://www.ybm100.com/new/base/skuDetail?id={product_id}&combination=1&type=1',
- # "Content-Type" : "application/json"
- # }
- # response = requests.post(url, json=data, headers=headers)
- # print(response.status_code)
- # try:
- # response_json = response.json()
- # print("✅ 成功解析JSON响应")
- # if 'data' in response_json and 'detail' in response_json['data'] and 'pid' in response_json['data']['detail']:
- # pid = response_json['data']['detail']['pid']
- # print(f"✅ 提取到pid:{pid}")
- # elif 'pid' in response_json:
- # pid = response_json['pid']
- # print(f"✅ 方式二提取到pid:{pid}")
- # else:
- # # 打印响应的前1000个字符,帮助你确认JSON结构
- # print("⚠️ 未找到pid字段,响应数据预览:")
- # print(json.dumps(response_json, ensure_ascii=False, indent=2)[:1000])
- # pid = None
- # except json.JSONDecodeError:
- # # 响应不是JSON格式的情况
- # print("❌ 响应不是JSON格式,无法解析")
- # print("响应文本:", response.text[:1000])
- # pid = None
- # except Exception as e:
- # # 其他异常
- # print(f"❌ 提取pid时出错:{str(e)}")
- # pid = None
- # target_page.goto(f'https://www.ybm100.com/new/base/skuDetail?id={product_id}&combination=1&type=1')
- # shop_name_elem = target_page.locator('span[data-v-5485589c]')
- # shop_name = shop_name_elem.inner_text(timeout=3000).strip()
- # shop_exists, shop_info = shop_is_exists_database(shop_name)
- # if not shop_exists:
- # if shop_info:
- # province = shop_info['province']
- # city = shop_info['city']
- # business_license_company = shop_info['business_license_company']
- # qualification_number = shop_info['qualification_number']
- # #去往药店品种预约中心后面的链接
- # target_page.goto(f"https://www.ybm100.com/new/base/skuDetail?id={pid}&combination=1&type=1")
- # if not shop_exists:
- shop_exists, shop_info = shop_is_exists_database(shop)
- shop_page = None
- #店铺名不是药品预约中心且店铺名不在数据库就要点击
- if shop != "药店品种预约中心" and not shop_exists:
- logger.info("店铺名不是药店品种预约中心且数据库没有该公司的营业执照")
- # 获取营业执照图片
- # 进入店铺
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- entershop_btn = target_page.locator('div[data-v-5485589c].shop-info-container-left-info')
- # 增强:先等待进入店铺按钮可见
- entershop_btn.wait_for(state="visible", timeout=10000)
- entershop_btn.scroll_into_view_if_needed() # 确保按钮在视口内
- entershop_btn.hover() # 先悬停
- random_delay(0.2, 0.5) # 悬停后延迟
- with target_page.expect_popup(timeout=15000) as pop:
- entershop_btn.click()
- random_delay(0.05, 0.15) # 鼠标按下后延迟
- shop_page = pop.value
- shop_page.wait_for_load_state("domcontentloaded") # 比 networkidle 更
- #点击店铺资质
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- shop_license_page = shop_page.locator('//div[contains(@class, "shop-info-container-right-btns-item") and contains(span, "资质/售后")]')
- shop_license_page.wait_for(state="attached", timeout=15000) # 等待元素加载完成
- shop_license_page.scroll_into_view_if_needed() # 确保在视口内
- shop_license_page.hover() # 先悬停
- random_delay(0.2, 0.5) # 悬停后延迟
- # shop_license_page.dispatch_event("mousedown")
- shop_license_page.click()
- random_delay(0.05, 0.15) # 鼠标按下后延迟
- # shop_license_page.dispatch_event("mouseup")
- random_delay(0.05, 0.1) # 鼠标松开后延迟
- shop_page.wait_for_load_state("networkidle")
- # slow_scroll_400px(shop_page, scroll_distance1=700)
- #获取药品经营许可证图片
- shop_page.wait_for_load_state("load")
- ocr_res = None
- # shop_license_div = target_page.locator('//span[contains(text(), "营业执照")]')
- shop_license_img = shop_page.locator('//span[contains(text(), "企业营业执照")]/ancestor::div[@class="shop-info-drawer-zz-tab1-list-item"]/img').first
- shop_license_img.wait_for(state="visible", timeout=60000)
- try:
- if shop_license_img.count() > 0:
- shop_license_src = shop_license_img.get_attribute('src')
- shop_license_src = shop_license_src.strip() if shop_license_src else None
- ocr_res = get_ocr_res(shop_license_src)
- # print(f'ocr_res:{ocr_res}')
- # input(".....")
- else:
- shop_license_src = None
- except Exception as e:
- # 捕获定位/提取失败的异常,避免程序崩溃
- logger.warning(f"提取营业执照图片src失败:{e}")
- shop_license_src = None
- print("营业执照图片链接:", shop_license_src)
- # input("..")
- contact_address = ''
- qualification_number = ocr_res.get('社会信用代码', '') if ocr_res else ''
- business_license_company = ocr_res.get('单位名称', '') if ocr_res else ''
- business_license_address = ocr_res.get('地址', '') if ocr_res else ''
- # scrape_date = ''
- # 调用提取函数,获取省份和城市
- province, city = extract_province_city(business_license_address)
- logger.info(f"原始地址:{business_license_address}")
- logger.info(f"提取的省份:{province} | 城市:{city}")
- insert_result = insert_shop_info_to_db(
- shop=shop,
- contact_address=contact_address,
- qualification_number=qualification_number,
- business_license_company=business_license_company,
- business_license_address=business_license_address,
- scrape_date=current_time,
- platform=platform,
- province=province,
- city=city,
- create_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ,
- update_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- )
- else:
- logger.info("数据库有该店名,在数据库拿取对应字段填充ybm_drug_middle表")
- if shop_info:
- province = shop_info['province']
- city = shop_info['city']
- business_license_company = shop_info['business_license_company']
- qualification_number = shop_info['qualification_number']
- try:
- if shop_page and not shop_page.is_closed():
- random_delay(4,8)
- shop_page.close()
- logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭店铺页标签 shop_page")
- except Exception as e:
- logger.warning(f"⚠️ 关闭 shop_page 失败:{e}")
- # # purchase_price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "").isdigit() else 0.00
- random_delay(5,8)
- # ========== 关闭新标签页,切回列表页 ==========
- if detail_page and not detail_page.is_closed():
- detail_page.close() # 关闭详情页标签
- logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页")
- # 切回原列表页(第一个标签页)
- page.bring_to_front() # 激活列表页
- page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标
- random_delay(0.5, 1.0) # 增加切换后延迟
- page.wait_for_load_state("networkidle")
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页")
- random_delay(2,4)
- # credit_code = ""
- availability = ""
- # input(".....")
- # 组装单条数据(仅新增生产日期/批准文号字段,原有字段顺序/逻辑不变)
- # 构造单条数据元组(适配MySQL字段)
- single_data = {
- # 核心商品信息
- "product": title, # 商品名称
- "my_good_price": merged_price, # 自定义价格(可与min_price相同或单独提取)
- "min_price": discount_price_val, # 最低价格
- "manufacture_date": manufacture_date, # 生产日期
- "expiry_date": expiry_date, # 有效期
- "shop": shop, # 店铺名
- "business_license_company": business_license_company, # 营业执照主体(公司名称)
- "province": province, # 省份
- "city": city, # 城市
- "manufacturer": manufacturer, # 生产厂家
- "specification": spec, # 规格
- "approval_number": approval_number, # 批准文号
- "product_link": product_link, # 商品链接
- "scrape_date": current_time, # 采集日期
- "scrape_province": "", # 采集省份(可留空或根据IP获取)
- "availability": availability, # 库存状态
- "credit_code": qualification_number, # 统一信用代码(如有可补充提取)
- "platform": platform, # 平台名称(固定或动态获取)
- "search_key": keyword, # 搜索关键词
- "number": num, # 数量(盒数)
- "is_sold_out": is_sold_out, # 售罄标记(0/1)
- "sales": sell, #销量
- "inventory": storage, #库存
- "snapshot_url": oss_url, #快照链接
- "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 更新时间
- "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 创建时间
- }
- # 调用逐条插入函数
- insert_single_to_mysql(single_data)
- collect_result.append(single_data)
- logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」采集完成")
- # input("....")
- except Exception as e:
- # 异常处理:关闭详情页,强制切回列表页
- logger.exception(f" 「{keyword}」第{collected_count}个商品采集核心异常:{str(e)}")
- try:
- if detail_page and not detail_page.is_closed():
- detail_page.close()
- logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 异常时关闭详情页标签页")
- if page and not page.is_closed():
- page.bring_to_front() # 切回列表页
- page.wait_for_load_state("networkidle")
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- except Exception as e2:
- logger.error(f" 「{keyword}」第{collected_count}个商品详情采集异常(处理时):{str(e2)},原异常:{str(e)}")
- continue
- # ✅ 每15次滚动一次(修复:用collected_count,且排除0的情况)
- if collected_count % 5 == 0 and collected_count > 0 and collected_count != total_limit:
- logger.info("采满5个往下滑")
- slow_scroll_400px(page,)
- page.wait_for_load_state("networkidle")
- # ====== 当前页采集完毕,尝试翻页 ======
- delay = random_delay(1.5, 3.0)
- logger.info(f"⏳ 翻页前随机等待 {delay:.2f}s(反爬)")
- if goto_next_page(page):
- page_no += 1
- continue
- else:
- logger.info(f" 「{keyword}」已无下一页,关键词采集结束")
- break
- # 关键词采集完成后长延迟
- long_delay = random_delay(MIN_KEYWORD_DELAY, MAX_KEYWORD_DELAY)
- logger.info(f" 「{keyword}」采集完成,共{len(collect_result)}条数据,等待{long_delay:.2f}秒后继续下一个关键词(反爬)")
- return collect_result
- # ==================== 保存到CSV函数(适配新表头) ====================
- # def save_to_csv(data_list):
- # """
- # 保存数据到CSV(适配新表头)
- # :param data_list: list - 采集到的字典数据列表
- # :return: bool - 保存是否成功
- # """
- # if not data_list:
- # logger.warning(" 无数据可保存到CSV")
- # return False
- # try:
- # # 判断文件是否存在,不存在则写入表头
- # file_exists = os.path.exists(CSV_FILE_PATH)
- # # 打开CSV文件(追加模式,utf-8-sig避免Excel乱码)
- # with open(CSV_FILE_PATH, "a", newline="", encoding="utf-8-sig") as f:
- # # 用新表头作为字段名
- # writer = csv.DictWriter(f, fieldnames=CSV_HEADERS)
- # # 首次写入表头
- # if not file_exists:
- # writer.writeheader()
- # logger.info(f" 已创建CSV文件并写入新表头:{CSV_FILE_PATH}")
- # # 写入数据行
- # writer.writerows(data_list)
- # logger.info(f" 成功将 {len(data_list)} 条数据写入CSV")
- # return True
- # except Exception as e:
- # logger.error(f" 保存CSV失败:{str(e)}")
- # return False
- # ==================== 主函数(登录+批量搜索) ====================
- def main():
- logger.info("\n" + "="*50)
- logger.info("🚀 药帮忙采集程序启动")
- logger.info(f"⏰ 启动时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- logger.info("="*50)
- # 待搜索的关键词列表(直接写在这里,改起来更直观)
- # 存储所有关键词的采集数据
- # all_collect_data = []
- with sync_playwright() as p:
- # browser = init_browser_with_proxy(p)
- # 启动浏览器(用单个配置变量)
- browser = p.chromium.launch(
- headless=False, # 不要用无头模式(反爬:无头模式易被识别)
- channel="chrome", # 使用真实Chrome内核
- slow_mo=random.randint(100, 300), # 全局操作延迟(模拟真人慢速操作)
- args=[
- "--disable-blink-features=AutomationControlled", # 禁用webdriver特征(核心!)
- "--enable-automation=false", # 新增:禁用自动化标识
- "--disable-infobars", # 新增:禁用信息栏
- "--remote-debugging-port=0", # 新增:随机调试端口
- "--start-maximized", # 最大化窗口(模拟真人使用)
- "--disable-extensions", # 禁用扩展(避免特征)
- "--disable-plugins-discovery", # 禁用插件发现
- "--no-sandbox", # 避免沙箱模式特征
- "--disable-dev-shm-usage", # 避免内存限制导致的异常
- f"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(110, 120)}.0.0.0 Safari/537.36" # 随机Chrome版本的UA
- ]
- )
- # 创建页面时伪装指纹
- context = browser.new_context(
- locale="zh-CN", # 中文环境
- timezone_id="Asia/Shanghai", # 上海时区
- geolocation={"latitude": 31.230416, "longitude": 121.473701}, # 模拟上海地理位置(可选)
- permissions=["geolocation"], # 授予定位权限(模拟真人)
- user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- viewport={"width": 1800, "height": 1000},
- # 关键:隐藏自动化特征
- java_script_enabled=True,
- bypass_csp=True,
- # user_data_dir="./temp_user_data" # 模拟真实用户数据目录
- )
- page = context.new_page()
- # 关键:移除navigator.webdriver标识(反爬核心)
- page.add_init_script("""
- Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
- Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3] }); // 新增:模拟插件
- Object.defineProperty(navigator, 'mimeTypes', { get: () => [1, 2, 3] }); // 新增:模拟MIME类型
- window.chrome = { runtime: {}, loadTimes: () => ({}) }; // 增强Chrome模拟
- delete window.navigator.languages;
- window.navigator.languages = ['zh-CN', 'zh'];
- // 新增:模拟真实鼠标移动特征
- (() => {
- const originalAddEventListener = EventTarget.prototype.addEventListener;
- EventTarget.prototype.addEventListener = function(type, listener) {
- if (type === 'mousemove') {
- return originalAddEventListener.call(this, type, (e) => {
- e._automation = undefined;
- listener(e);
- });
- }
- return originalAddEventListener.call(this, type, listener);
- };
- })();
- """)
- try:
- # ========== 核心:Cookie复用逻辑 ==========
- # 1. 加载本地Cookie
- load_cookies(context)
- # 2. 验证登录状态
- if not is_login(page):
- # 3. Cookie失效/不存在,执行登录
- page.goto(TARGET_LOGIN_URL)
- page.wait_for_load_state("networkidle")
- logger.info("🔑 开始执行登录流程")
- # 执行登录操作
- login_success = login_operation(page, USERNAME, PASSWORD)
- if not login_success:
- logger.error(" 登录失败,程序终止")
- return
- # 4. 登录成功后保存Cookie
- save_cookies(context)
- logger.info(" 登录并保存Cookie成功!")
- # 2. 批量搜索+采集+保存
- for keyword_idx, keyword in enumerate(SEARCH_KEYWORDS, 1):
- logger.info(f"\n=====================================")
- logger.info(f"🔍 开始处理第{keyword_idx}/{len(SEARCH_KEYWORDS)}个关键词:{keyword}")
- logger.info(f"=====================================")
- # 执行搜索
- popup_guard(page, "before_search")
- detail_page,search_success = search_operation(page, keyword)
- # input("")
- popup_guard(detail_page, "after_search")
- if detail_page is None:
- break
- if not search_success:
- logger.warning(f" 「{keyword}」搜索失败,跳过采集")
- continue
- # ✅ 再等页面稳定一下(networkidle 有时会等不到,建议加超时或换成 domcontentloaded)
- detail_page.wait_for_load_state("domcontentloaded")
- detail_page.wait_for_load_state('networkidle')
- # 采集数据
- data_list = collect_data(detail_page, keyword)
- # # 保存到CSV
- # if data_list:
- # save_to_csv(data_list)
- # else:
- # logger.warning(f" 「{keyword}」无数据,跳过保存")
- logger.info("\n🎉 所有关键词处理完成!CSV文件路径:" + os.path.abspath(CSV_FILE_PATH))
- # input("\n按回车关闭程序...")
- except Exception as e:
- logger.error(f" 程序异常:{str(e)}")
- finally:
- browser.close()
- logger.info(" 浏览器已关闭,程序结束")
- # ==================== 程序入口 ====================
- if __name__ == '__main__':
- main()
|