| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553 |
- from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
- from logger_config import logger
- from datetime import datetime
- import random
- import csv
- import os
- import time
- import json
- import pymysql
- from pymysql.err import OperationalError, ProgrammingError, DataError
- from config import *
- import re
- import uuid
- import requests
- from main import *
- #采集数据核心
- def collect_data1(page, keyword):
- """
- 1) 先获取当前页商品个数(count)
- 2) 按循环次数采集;每循环15次滚动一次 slow_scroll_1200px
- 3) 当前页循环完 -> goto_next_page;有下一页继续;无下一页结束该关键词
- """
- collect_result = []
- seen = set()
- logger.info(f"📊 开始采集「{keyword}」的商品数据")
- page.wait_for_load_state("networkidle")
- page_no = 1
- while True:
- logger.info(f"\n📄 「{keyword}」开始采集第 {page_no} 页")
- # 记录列表页URL(可用于你后续兜底)
- list_page_url = page.url
- logger.info(f"📌 已记录商品列表页URL:{list_page_url}")
- # ✅ 先获取当前页商品个数
- page.wait_for_load_state("networkidle")
- total_limit = page.locator(PRODUCT_ITEM_SELECTOR).count()
- logger.info(f"📌 「{keyword}」第{page_no}页 初始商品个数(count):{total_limit}")
- # 重置当前页的采集计数
- collected_count = 0
- for idx in range(total_limit):
- detail_page = None
- try:
- item = page.locator(PRODUCT_ITEM_SELECTOR).nth(idx)
- collected_count += 1 # 实际采集计数(用于日志)
- # ========= 反爬随机延迟(保留你的原逻辑也行) =========
- page.wait_for_load_state("networkidle")
- delay = random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- logger.info(f"📌 「{keyword}」第{page_no}页 第{collected_count}/{total_limit}个商品 - 等待{delay:.2f}秒后采集(反爬)")
- # ========= 售罄跳过 =========
- sold_locator = item.locator('div[data-v-480da687].gc-l1-cirle_tip')
- if sold_locator.count() > 0:
- is_sold_out = 1
- logger.info(f" 「{keyword}」第{page_no}页 第{collected_count}个商品已售罄")
- # if collected_count % 5 == 0 and collected_count > 0:
- # logger.info("采满5个往下滑")
- # slow_scroll_400px(page)
- # page.wait_for_load_state("networkidle")
- # continue
- # 1. 初始化所有字段默认值
- product = "无标题"
- price = "0.00"
- shop = "无店名"
- expiry_date = "无有效期"
- manufacture_date = "无生产日期"
- approval_number = "无批准文号"
- manufacturer = "未知公司"
- # discount_price = "0.00"
- spec = "未知规格"
- num = 1 # ✅ 默认 1
- platform = '药九九'
- current_time = datetime.now().strftime("%Y-%m-%d")
- is_sold_out = 0
- # 提取商品标题(处理空值)
- product_locator = item.locator(PRODUCT_TITLE_SELECTOR)
- if product_locator.count() > 0:
- title = product_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页标题:{title}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品 - 列表页标题元素未找到,使用默认值:{title}")
- # 提取价格(带缺失日志)
- # 4. 提取价格(带缺失日志)
- price_locator = item.locator(PRODUCT_PRICE_SELECTOR).nth(0)
- if price_locator.count() > 0:
- price = price_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}{keyword}」第{collected_count}个商品 - 列表页采购价格:{price}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页采购价格元素未找到,使用默认值:{price}")
- # 5. 提取公司名称(带缺失日志)
- manufacturer_locator = item.locator(PRODUCT_COMPANY_SELECTOR)
- if manufacturer_locator.count() > 0:
- manufacturer = manufacturer_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页公司名:{manufacturer}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页公司名称元素未找到,使用默认值:{manufacturer}")
- #提取店铺名称
- shop_locator = item.locator(PRODUCT_STORE_SELECTOR)
- if shop_locator.count() > 0:
- shop = shop_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页店名:{shop}{'='*10}")
- else:
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页店铺名称元素未找到,使用默认值:{shop}")
- #提取折扣价
- discount_price = ""
- discount_price_locator = item.locator('span[data-v-480da687].gc-l2-discount_price').first
- if discount_price_locator.count() > 0:
- discount_price = discount_price_locator.inner_text(timeout=3000).strip()
- discount_price_val_origin = discount_price
- match = re.search(r'\d+\.?\d*', str(discount_price_val_origin))
- discount_price_val = float(match.group()) if match else 0.00
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页折扣价:{discount_price_val}{'='*10}")
- else:
- #如果没有拿原价替换
- price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "") else "0.00"
- discount_price_val = price
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 折扣价元素未找到,使用采购价兜底:{discount_price_val}")
- merged_price = f"{price}{discount_price_val_origin}" if discount_price_val_origin else price
- # ========= ✅ 去重(最小且稳:可换成 href/data-id 更稳) =========
- # key = f"{title.strip()}|{store.strip()}|{company_name.strip()}|{price.strip()}"
- # if key in seen:
- # logger.warning(
- # f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过"
- # )
- # if collected_count % 5 == 0 and collected_count > 0:
- # logger.info("采满15个往下滑")
- # slow_scroll_400px(page)
- # page.wait_for_load_state("networkidle")
- # continue
- # seen.add(key)
- # ========= 模拟点击商品进入详情页 =========
- logger.info(
- f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 模拟鼠标移动并点击"
- )
- # 点击商品项容器,触发详情展示
- # ========== 点击商品跳详情页 ==========
- # 反爬:模拟真人鼠标移动到商品上再点击(不是直接点击)
- logger.info(f"📌 「{keyword}」第{collected_count}个商品「{title}」- 模拟鼠标移动并点击")
- item.hover() # 先悬停
- random_delay(0.2, 0.5) # 悬停后延迟
- item.dispatch_event("mousedown")
- random_delay(0.05, 0.15) # 鼠标按下后延迟
- item.dispatch_event("mouseup")
- random_delay(0.05, 0.1) # 鼠标松开后延迟
- try:
- with page.context.expect_page(timeout=60000) as p:
- item.click(delay=random.uniform(0.1, 0.3))
- detail_page = p.value
- except PlaywrightTimeoutError:
- logger.warning(
- f" 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 未检测到新标签页,使用当前页采集详情"
- )
- detail_page = None # 标记为无新标签页,避免关闭列表页
- # 等待详情加载(优先用新标签页,无则用列表页)
- target_page = detail_page if detail_page else page
- target_page.wait_for_load_state("networkidle", timeout=20000)
- delay = random_delay(MIN_PAGE_DELAY, MAX_PAGE_DELAY)
- logger.info(
- f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 详情页加载完成,等待{delay:.2f}秒(反爬)"
- )
- # 反爬:检测详情页反爬验证
- # check_anti_crawl(page)
- # ========== 采集详情页的专属信息(有效期/生产日期/批准文号) ==========
- #获取商品详情页链接
- product_link = target_page.url
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页链接:{product_link}{'='*10}")
- # 提取有效期(处理空值)
- expiry_date_locator = target_page.locator("//span[contains(text(), '有效期')]/following-sibling::span[contains(@class, 'gdb-desc-value4')]")
- if expiry_date_locator.count() > 0:
- expiry_date = expiry_date_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页有效期:{expiry_date}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 有效期元素未找到,使用默认值:{expiry_date}")
- # 提取生产日期(修复完成)
- manufacture_date_locator = target_page.locator("//span[@class='gdb-desc-label' and text()='生产日期']/following-sibling::span[1]")
- if manufacture_date_locator.count() > 0:
- manufacture_date = manufacture_date_locator.inner_text(timeout=3000).strip().replace('-', "")
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页生产日期:{manufacture_date}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 生产日期元素未找到,使用默认值:{manufacture_date}")
- # 提取批准文号(替换为你实际的选择器)
- approval_number_locator = target_page.locator("//span[contains(text(), '国药准字')]").first
- if approval_number_locator.count() > 0:
- approval_number = approval_number_locator.inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页批准文号:{approval_number}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 批准文号元素未找到,使用默认值:{approval_number}")
- #提取规格
- spec_locator = target_page.locator('span.gddd-params_text_line_1[title]')
- if spec_locator.count() > 0:
- spec = spec_locator.nth(2).inner_text(timeout=3000).strip()
- logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页规格:{spec}{'='*10}")
- else:
- # 修复:替换未定义的i为collected_count,补充规格数量不足的提示
- logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 规格元素数量不足,使用默认值:{spec}")
- #获取营业执照图片 li[data-v-4f79abe8].nth(2)
- #进入店铺
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- entershop_btn = target_page.locator('[data-v-c5790f48].btn-text')
- entershop_btn.click()
- target_page.wait_for_load_state("networkidle")
- #点击店铺资质
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- shop_license_page = target_page.locator('li[data-v-4f79abe8]').nth(2)
- shop_license_page.click()
- target_page.wait_for_load_state("networkidle")
- SCROLL_TARGET_DISTANCE = 1500
- try:
- # 生成400±50px的随机滚动距离
- scroll_distance = random.randint(
- SCROLL_TARGET_DISTANCE - SCROLL_OFFSET_RANGE,
- SCROLL_TARGET_DISTANCE + SCROLL_OFFSET_RANGE
- )
- remaining_distance = scroll_distance
- total_steps = int(scroll_distance / SCROLL_STEP)
- logger.info(
- f"📜 开始慢速滚动(目标距离:{scroll_distance}px,总步数:{total_steps},总时长约{total_steps*SCROLL_INTERVAL:.2f}秒)"
- )
- # 渐进式滚动(每步50px,间隔0.05秒)
- for _ in range(total_steps):
- step = min(SCROLL_STEP, remaining_distance)
- page.evaluate(f"window.scrollBy(0, {step});")
- remaining_distance -= step
- time.sleep(SCROLL_INTERVAL)
- # 处理剩余不足一步的距离
- if remaining_distance > 0:
- page.evaluate(f"window.scrollBy(0, {remaining_distance});")
- time.sleep(SCROLL_INTERVAL)
- # 滚动后等待懒加载完成
- page.wait_for_load_state("networkidle", timeout=8000)
- random_delay(2.0, 3.0) # 滚动后额外停顿,模拟真人
- logger.info(f" 慢速滚动完成,实际滚动距离:{scroll_distance - remaining_distance}px")
- except Exception as e:
- logger.warning(f" 慢速滚动失败:{e}")
- #获取店铺资质图片
- shop_license_div = target_page.locator('div[data-v-7f7214f6].shop-licensesImg').nth(2)
- shop_license_img = shop_license_div.locator('img')
- try:
- if shop_license_img.count() > 0:
- shop_license_src = shop_license_img.get_attribute('src')
- shop_license_src = shop_license_src.strip() if shop_license_src else None
- else:
- shop_license_src = None
- except Exception as e:
- # 捕获定位/提取失败的异常,避免程序崩溃
- print(f"提取营业执照图片src失败:{e}")
- shop_license_src = None
- print("营业执照图片链接:", shop_license_src)
- # purchase_price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "").isdigit() else 0.00
- # ========== 关闭新标签页,切回列表页 ==========
- if detail_page and not detail_page.is_closed():
- detail_page.close() # 关闭详情页标签
- logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页")
- # 切回原列表页(第一个标签页)
- page.bring_to_front() # 激活列表页
- page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标
- random_delay(0.5, 1.0) # 增加切换后延迟
- page.wait_for_load_state("networkidle")
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页")
- province = ""
- city = ""
- business_license_company = ""
- credit_code = ""
- availability = ""
- # 组装单条数据(仅新增生产日期/批准文号字段,原有字段顺序/逻辑不变)
- # 构造单条数据元组(适配MySQL字段)
- single_data = {
- # 核心商品信息
- "product": title, # 商品名称
- "my_good_price": merged_price, # 自定义价格(可与min_price相同或单独提取)
- "min_price": discount_price_val, # 最低价格
- "manufacture_date": manufacture_date, # 生产日期
- "expiry_date": expiry_date, # 有效期
- "shop": shop, # 店铺名
- "business_license_company": business_license_company, # 营业执照主体(公司名称)
- "province": province, # 省份
- "city": city, # 城市
- "manufacturer": manufacturer, # 生产厂家
- "specification": spec, # 规格
- "approval_number": approval_number, # 批准文号
- "product_link": product_link, # 商品链接
- "scrape_date": current_time, # 采集日期
- "scrape_province": "", # 采集省份(可留空或根据IP获取)
- "availability": availability, # 库存状态
- "credit_code": credit_code, # 统一信用代码(如有可补充提取)
- "platform": platform, # 平台名称(固定或动态获取)
- "search_key": keyword, # 搜索关键词
- "number": num, # 数量(盒数)
- "is_sold_out": is_sold_out, # 售罄标记(0/1)
- "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 更新时间
- "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 创建时间
- }
- # 调用逐条插入函数
- insert_single_to_mysql(single_data)
- collect_result.append(single_data)
- logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」采集完成")
- except Exception as e:
- # 异常处理:关闭详情页,强制切回列表页
- logger.exception(f" 「{keyword}」第{collected_count}个商品采集核心异常:{str(e)}")
- try:
- if detail_page and not detail_page.is_closed():
- detail_page.close()
- logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 异常时关闭详情页标签页")
- if page and not page.is_closed():
- page.bring_to_front() # 切回列表页
- page.wait_for_load_state("networkidle")
- random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY)
- except Exception as e2:
- logger.error(f" 「{keyword}」第{collected_count}个商品详情采集异常(处理时):{str(e2)},原异常:{str(e)}")
- continue
- # ✅ 每15次滚动一次(修复:用collected_count,且排除0的情况)
- if collected_count % 5 == 0 and collected_count > 0 and collected_count != total_limit:
- logger.info("采满5个往下滑")
- slow_scroll_400px(page)
- page.wait_for_load_state("networkidle")
- # ====== 当前页采集完毕,尝试翻页 ======
- delay = random_delay(1.5, 3.0)
- logger.info(f"⏳ 翻页前随机等待 {delay:.2f}s(反爬)")
- if goto_next_page(page):
- page_no += 1
- continue
- else:
- logger.info(f" 「{keyword}」已无下一页,关键词采集结束")
- break
- # 关键词采集完成后长延迟
- long_delay = random_delay(MIN_KEYWORD_DELAY, MAX_KEYWORD_DELAY)
- logger.info(f" 「{keyword}」采集完成,共{len(collect_result)}条数据,等待{long_delay:.2f}秒后继续下一个关键词(反爬)")
- return collect_result
- # ==================== 主函数(登录+批量搜索) ====================
- def main():
- logger.info("\n" + "="*50)
- logger.info("🚀 药九九采集程序启动")
- logger.info(f"⏰ 启动时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- logger.info("="*50)
- # 待搜索的关键词列表(直接写在这里,改起来更直观)
- # 存储所有关键词的采集数据
- # all_collect_data = []
- with sync_playwright() as p:
- # browser = init_browser_with_proxy(p)
- # 启动浏览器(用单个配置变量)
- browser = p.chromium.launch(
- headless=False, # 不要用无头模式(反爬:无头模式易被识别)
- channel="chrome", # 使用真实Chrome内核
- slow_mo=random.randint(100, 300), # 全局操作延迟(模拟真人慢速操作)
- args=[
- "--disable-blink-features=AutomationControlled", # 禁用webdriver特征(核心!)
- "--enable-automation=false", # 新增:禁用自动化标识
- "--disable-infobars", # 新增:禁用信息栏
- "--remote-debugging-port=0", # 新增:随机调试端口
- "--start-maximized", # 最大化窗口(模拟真人使用)
- "--disable-extensions", # 禁用扩展(避免特征)
- "--disable-plugins-discovery", # 禁用插件发现
- "--no-sandbox", # 避免沙箱模式特征
- "--disable-dev-shm-usage", # 避免内存限制导致的异常
- f"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(110, 120)}.0.0.0 Safari/537.36" # 随机Chrome版本的UA
- ]
- )
- # 创建页面时伪装指纹
- context = browser.new_context(
- locale="zh-CN", # 中文环境
- timezone_id="Asia/Shanghai", # 上海时区
- geolocation={"latitude": 31.230416, "longitude": 121.473701}, # 模拟上海地理位置(可选)
- permissions=["geolocation"], # 授予定位权限(模拟真人)
- user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- viewport={"width": 1920, "height": 1080},
- # 关键:隐藏自动化特征
- java_script_enabled=True,
- bypass_csp=True,
- # user_data_dir="./temp_user_data" # 模拟真实用户数据目录
- )
- page = context.new_page()
- # 关键:移除navigator.webdriver标识(反爬核心)
- page.add_init_script("""
- Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
- Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3] }); // 新增:模拟插件
- Object.defineProperty(navigator, 'mimeTypes', { get: () => [1, 2, 3] }); // 新增:模拟MIME类型
- window.chrome = { runtime: {}, loadTimes: () => ({}) }; // 增强Chrome模拟
- delete window.navigator.languages;
- window.navigator.languages = ['zh-CN', 'zh'];
- // 新增:模拟真实鼠标移动特征
- (() => {
- const originalAddEventListener = EventTarget.prototype.addEventListener;
- EventTarget.prototype.addEventListener = function(type, listener) {
- if (type === 'mousemove') {
- return originalAddEventListener.call(this, type, (e) => {
- e._automation = undefined;
- listener(e);
- });
- }
- return originalAddEventListener.call(this, type, listener);
- };
- })();
- """)
- try:
- # ========== 核心:Cookie复用逻辑 ==========
- # 1. 加载本地Cookie
- load_cookies(context)
- # 2. 验证登录状态
- if not is_login(page):
- # 3. Cookie失效/不存在,执行登录
- page.goto(TARGET_LOGIN_URL)
- page.wait_for_load_state("networkidle")
- logger.info("🔑 开始执行登录流程")
- # 执行登录操作
- login_success = login_operation(page, USERNAME, PASSWORD)
- if not login_success:
- logger.error(" 登录失败,程序终止")
- return
- # 4. 登录成功后保存Cookie
- save_cookies(context)
- logger.info(" 登录并保存Cookie成功!")
- # 2. 批量搜索+采集+保存
- for keyword_idx, keyword in enumerate(SEARCH_KEYWORDS, 1):
- logger.info(f"\n=====================================")
- logger.info(f"🔍 开始处理第{keyword_idx}/{len(SEARCH_KEYWORDS)}个关键词:{keyword}")
- logger.info(f"=====================================")
- # 执行搜索
- popup_guard(page, "before_search")
- search_success = search_operation(page, keyword)
- # input("")
- popup_guard(page, "after_search")
- if not search_success:
- logger.warning(f" 「{keyword}」搜索失败,跳过采集")
- continue
- # ✅ 再等页面稳定一下(networkidle 有时会等不到,建议加超时或换成 domcontentloaded)
- page.wait_for_load_state("domcontentloaded")
- page.wait_for_load_state('networkidle')
- # 采集数据
- data_list = collect_data1(page, keyword)
- # # 保存到CSV
- # if data_list:
- # save_to_csv(data_list)
- # else:
- # logger.warning(f" 「{keyword}」无数据,跳过保存")
- logger.info("\n🎉 所有关键词处理完成!CSV文件路径:" + os.path.abspath(CSV_FILE_PATH))
- input("\n按回车关闭程序...")
- except Exception as e:
- logger.error(f" 程序异常:{str(e)}")
- finally:
- browser.close()
- logger.info(" 浏览器已关闭,程序结束")
- if __name__ == '__main__':
- main()
|