def collect_data(page, keyword): """ 1) 先获取当前页商品个数(count) 2) 按循环次数采集;每循环15次滚动一次 slow_scroll_1200px 3) 当前页循环完 -> goto_next_page;有下一页继续;无下一页结束该关键词 """ collect_result = [] # seen = set() logger.info(f"📊 开始采集「{keyword}」的商品数据") page.wait_for_load_state("networkidle") #没有找到商品就跳过这个商品 page_no = 1 while True: logger.info(f"\n📄 「{keyword}」开始采集第 {page_no} 页") # 记录列表页URL(可用于你后续兜底) list_page_url = page.url logger.info(f"📌 已记录商品列表页URL:{list_page_url}") # ✅ 先获取当前页商品个数 page.wait_for_load_state("networkidle") total_limit = page.locator(PRODUCT_ITEM_SELECTOR).count() logger.info(f"📌 「{keyword}」第{page_no}页 初始商品个数(count):{total_limit}") # 重置当前页的采集计数 collected_count = 0 # ========= 初始化无匹配计数器(记录标题不包含核心关键词的次数) ========= # no_match_count = 0 # 无匹配次数初始化为0 # MAX_NO_MATCH = 10 # 最大无匹配次数阈值 #补充没找到关键词的兜底 not_found_keywords = page.locator("span:has-text('新品登记')") if not_found_keywords.count() > 0: logger.warning(f"⚠️ 关键词「{keyword}」无匹配商品,直接跳过整个关键词采集") return [] for idx in range(total_limit): detail_page = None try: item = page.locator(PRODUCT_ITEM_SELECTOR).nth(idx) collected_count += 1 # 实际采集计数(用于日志) # ========= 反爬随机延迟(保留你的原逻辑也行) ========= page.wait_for_load_state("networkidle") delay = random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f"📌 「{keyword}」第{page_no}页 第{collected_count}/{total_limit}个商品 - 等待{delay:.2f}秒后采集(反爬)") # 1. 初始化所有字段默认值 title = "无标题" price = "0.00" shop = "无店名" expiry_date = "无有效期" manufacture_date = "无生产日期" approval_number = "无批准文号" manufacturer = "未知公司" # discount_price = "0.00" spec = "未知规格" num = 1 # ✅ 默认 1 platform = '药九九' current_time = datetime.now().strftime("%Y-%m-%d") is_sold_out = 0 # ========= 售罄不跳过 ========= sold_locator = item.locator('div[data-v-480da687].gc-l1-cirle_tip') if sold_locator.count() > 0: is_sold_out = 1 logger.warning(f" 「{keyword}」第{page_no}页 第{collected_count}个商品已售罄") # if collected_count % 5 == 0 and collected_count > 0: # logger.info("采满5个往下滑") # slow_scroll_400px(page) # page.wait_for_load_state("networkidle") # continue # 提取商品标题(处理空值) product_locator = item.locator(PRODUCT_TITLE_SELECTOR) if product_locator.count() > 0: title = product_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页标题:{title}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品 - 列表页标题元素未找到,使用默认值:{title}") #关键词不在标题中,跳过当前商品 # core_keyword = re.sub(r'^999[\s\(\)()、·]*', '', keyword) # if core_keyword not in title: # no_match_count += 1 # logger.warning(f" 「{keyword}」第{collected_count}个商品 - 标题「{title}」不包含核心关键词「{core_keyword}」(无匹配次数:{no_match_count}/{MAX_NO_MATCH}),跳过本次循环") # continue # if no_match_count >= MAX_NO_MATCH: # logger.error(f"❌ 关键词「{keyword}」无匹配商品次数已达{MAX_NO_MATCH}次,直接终止当前关键词采集,进入下一个关键词") # return [] # 提取价格(带缺失日志) price_locator = item.locator(PRODUCT_PRICE_SELECTOR).nth(0) if price_locator.count() > 0: price = price_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}{keyword}」第{collected_count}个商品 - 列表页采购价格:{price}{'='*10}") else: price = "0.00" # 初始化默认值,避免后续报错 logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页采购价格元素未找到,使用默认值:{price}") # 5. 提取公司名称(带缺失日志) manufacturer_locator = item.locator(PRODUCT_COMPANY_SELECTOR) if manufacturer_locator.count() > 0: manufacturer = manufacturer_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页公司名:{manufacturer}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页公司名称元素未找到,使用默认值:{manufacturer}") #提取店铺名称 shop_locator = item.locator(PRODUCT_STORE_SELECTOR) if shop_locator.count() > 0: shop = shop_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 列表页店名:{shop}{'='*10}") else: logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 列表页店铺名称元素未找到,使用默认值:{shop}") #提取折扣价 discount_price_val_origin = "" discount_price = "" discount_price_locator = item.locator('span[data-v-480da687].gc-l2-discount_price').first if discount_price_locator.count() > 0: discount_price = discount_price_locator.inner_text(timeout=3000).strip() discount_price_val_origin = discount_price match = re.search(r'\d+\.?\d*', str(discount_price_val_origin)) discount_price_val = float(match.group()) if match else 0.00 logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页折扣价:{discount_price_val}{'='*10}") else: #如果没有拿原价替换 price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "") else "0.00" discount_price_val = float(price) logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 折扣价元素未找到,使用采购价兜底:{discount_price_val}") merged_price = f"{price}{discount_price_val_origin}" if discount_price_val_origin else price # ========= 模拟点击商品进入详情页 ========= logger.info( f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 模拟鼠标移动并点击" ) # 点击商品项容器,触发详情展示 # ========== 点击商品跳详情页 ========== # 反爬:模拟真人鼠标移动到商品上再点击(不是直接点击) logger.info(f"📌 「{keyword}」第{collected_count}个商品「{title}」- 模拟鼠标移动并点击") item.hover() # 先悬停 random_delay(0.2, 0.5) # 悬停后延迟 item.dispatch_event("mousedown") random_delay(0.05, 0.15) # 鼠标按下后延迟 item.dispatch_event("mouseup") random_delay(0.05, 0.1) # 鼠标松开后延迟 try: with page.context.expect_page(timeout=60000) as p: item.click(delay=random.uniform(0.1, 0.3)) detail_page = p.value except PlaywrightTimeoutError: logger.warning( f" 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 未检测到新标签页,使用当前页采集详情" ) detail_page = None # 标记为无新标签页,避免关闭列表页 # 等待详情加载(优先用新标签页,无则用列表页) target_page = detail_page if detail_page else page target_page.wait_for_load_state("networkidle", timeout=20000) delay = random_delay(MIN_PAGE_DELAY, MAX_PAGE_DELAY) logger.info( f"📌 「{keyword}」第{page_no}页 第{collected_count}个商品「{title}」- 详情页加载完成,等待{delay:.2f}秒(反爬)" ) # 反爬:检测详情页反爬验证 # check_anti_crawl(page) # ========== 采集详情页的专属信息(有效期/生产日期/批准文号) ========== #获取商品详情页链接 product_link = target_page.url logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页链接:{product_link}{'='*10}") # ========= ✅ 去重逻辑,拿商品链接和折扣价 ========= if check_dup_in_biz_db(product_link, discount_price_val): logger.warning(f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过") # ========== 关闭新标签页,切回列表页 ========== if detail_page and not detail_page.is_closed(): detail_page.close() # 关闭详情页标签 logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页") # 切回原列表页(第一个标签页) page.bring_to_front() # 激活列表页 page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标 random_delay(0.5, 1.0) # 增加切换后延迟 page.wait_for_load_state("networkidle") random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页") if collected_count % 5 == 0 and collected_count > 0: logger.info("采满5个往下滑") slow_scroll_400px(page) page.wait_for_load_state("networkidle") continue # key = f"{product_link.strip()}|{discount_price_val}" # if key in seen: # logger.warning( # f" 「{keyword}」第{page_no}页 第{collected_count}个商品(重复):{title},跳过" # ) # if collected_count % 5 == 0 and collected_count > 0: # logger.info("采满15个往下滑") # slow_scroll_400px(page) # page.wait_for_load_state("networkidle") # continue # seen.add(key) # 提取有效期(处理空值) expiry_date_locator = target_page.locator("//span[contains(text(), '有效期')]/following-sibling::span[contains(@class, 'gdb-desc-value4')]") if expiry_date_locator.count() > 0: expiry_date = expiry_date_locator.inner_text(timeout=3000).strip().replace('-', '') #.replace('近效期','') logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页有效期:{expiry_date}{'='*10}") else: # 修复:替换未定义的i为collected_count logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 有效期元素未找到,使用默认值:{expiry_date}") # 提取生产日期(修复完成) manufacture_date_locator = target_page.locator("//span[@class='gdb-desc-label' and text()='生产日期']/following-sibling::span[1]") if manufacture_date_locator.count() > 0: manufacture_date = manufacture_date_locator.inner_text(timeout=3000).strip().replace('-', "") logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页生产日期:{manufacture_date}{'='*10}") else: # 修复:替换未定义的i为collected_count logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 生产日期元素未找到,使用默认值:{manufacture_date}") # 提取批准文号 approval_number_locator = target_page.locator("//span[contains(text(), '国药准字')]").first if approval_number_locator.count() > 0: approval_number = approval_number_locator.inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页批准文号:{approval_number}{'='*10}") else: # 修复:替换未定义的i为collected_count logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 批准文号元素未找到,使用默认值:{approval_number}") #提取规格 spec_locator = target_page.locator('span.gddd-params_text_line_1[title]') if spec_locator.count() > 0: spec = spec_locator.nth(2).inner_text(timeout=3000).strip() logger.info(f"{'='*10}「{keyword}」第{collected_count}个商品 - 详情页规格:{spec}{'='*10}") else: # 修复:替换未定义的i为collected_count,补充规格数量不足的提示 logger.warning(f" 「{keyword}」第{collected_count}个商品「{title}」- 规格元素数量不足,使用默认值:{spec}") # input("...") # if shop_is_exists_database(shop): # continue # province = "" # city = "" # business_license_company = "" # qualification_number = '' if shop != "药品预约中心" and not shop_is_exists_database(shop): # 获取营业执照图片 li[data-v-4f79abe8].nth(2) # 进入店铺 random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) entershop_btn = target_page.locator('[data-v-c5790f48].btn-text') # 增强:先等待进入店铺按钮可见 entershop_btn.wait_for(state="visible", timeout=10000) entershop_btn.scroll_into_view_if_needed() # 确保按钮在视口内 entershop_btn.hover() # 先悬停 random_delay(0.2, 0.5) # 悬停后延迟 entershop_btn.click() # entershop_btn.dispatch_event("mousedown") random_delay(0.05, 0.15) # 鼠标按下后延迟 # entershop_btn.dispatch_event("mouseup") random_delay(0.05, 0.1) # 鼠标松开后延迟 target_page.wait_for_load_state("domcontentloaded") # 等DOM加载(比networkidle更适合页面内切换) #点击店铺资质 random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) shop_license_page = target_page.locator('li:has-text("店铺资质")') shop_license_page.wait_for(state="visible", timeout=10000) # 等待元素加载完成 shop_license_page.hover() # 先悬停 random_delay(0.2, 0.5) # 悬停后延迟 # shop_license_page.dispatch_event("mousedown") shop_license_page.click() random_delay(0.05, 0.15) # 鼠标按下后延迟 # shop_license_page.dispatch_event("mouseup") random_delay(0.05, 0.1) # 鼠标松开后延迟 target_page.wait_for_load_state("networkidle") slow_scroll_400px(target_page, scroll_distance1=700) #获取药品经营许可证图片 target_page.wait_for_load_state("load") ocr_res = None shop_license_div = target_page.locator('div[data-v-7f7214f6].shop-licensesImg').nth(0) shop_license_div.wait_for(state="attached", timeout=60000) shop_license_img = shop_license_div.locator('img') try: if shop_license_img.count() > 0: shop_license_src = shop_license_img.get_attribute('src') shop_license_src = shop_license_src.strip() if shop_license_src else None ocr_res = get_ocr_res(shop_license_src) # print(f'ocr_res:{ocr_res}') else: shop_license_src = None except Exception as e: # 捕获定位/提取失败的异常,避免程序崩溃 logger.warning(f"提取营业执照图片src失败:{e}") shop_license_src = None print("营业执照图片链接:", shop_license_src) # input("..") contact_address = '' qualification_number = ocr_res.get('社会信用代码', '') if ocr_res else '' business_license_company = ocr_res.get('单位名称', '') if ocr_res else '' business_license_address = ocr_res.get('地址', '') if ocr_res else '' # scrape_date = '' # 调用提取函数,获取省份和城市 province, city = extract_province_city(business_license_address) logger.info(f"原始地址:{business_license_address}") logger.info(f"提取的省份:{province} | 城市:{city}") insert_result = insert_shop_info_to_db( shop=shop, contact_address=contact_address, qualification_number=qualification_number, business_license_company=business_license_company, business_license_address=business_license_address, scrape_date=current_time, platform=platform, province=province, city=city, create_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") , update_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) # purchase_price = float(price.replace("¥", "").replace(",", "")) if price.replace("¥", "").replace(",", "").replace(".", "").isdigit() else 0.00 # ========== 关闭新标签页,切回列表页 ========== if detail_page and not detail_page.is_closed(): detail_page.close() # 关闭详情页标签 logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 已关闭详情页标签页") # 切回原列表页(第一个标签页) page.bring_to_front() # 激活列表页 page.mouse.move(random.randint(100, 300), random.randint(200, 400)) # 随机移动鼠标 random_delay(0.5, 1.0) # 增加切换后延迟 page.wait_for_load_state("networkidle") random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」- 已切回列表页") # credit_code = "" availability = "" # 组装单条数据(仅新增生产日期/批准文号字段,原有字段顺序/逻辑不变) # 构造单条数据元组(适配MySQL字段) single_data = { # 核心商品信息 "product": title, # 商品名称 "my_good_price": merged_price, # 自定义价格(可与min_price相同或单独提取) "min_price": discount_price_val, # 最低价格 "manufacture_date": manufacture_date, # 生产日期 "expiry_date": expiry_date, # 有效期 "shop": shop, # 店铺名 "business_license_company": business_license_company, # 营业执照主体(公司名称) "province": province, # 省份 "city": city, # 城市 "manufacturer": manufacturer, # 生产厂家 "specification": spec, # 规格 "approval_number": approval_number, # 批准文号 "product_link": product_link, # 商品链接 "scrape_date": current_time, # 采集日期 "scrape_province": "", # 采集省份(可留空或根据IP获取) "availability": availability, # 库存状态 "credit_code": qualification_number, # 统一信用代码(如有可补充提取) "platform": platform, # 平台名称(固定或动态获取) "search_key": keyword, # 搜索关键词 "number": num, # 数量(盒数) "is_sold_out": is_sold_out, # 售罄标记(0/1) "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 更新时间 "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 创建时间 } # 调用逐条插入函数 insert_single_to_mysql(single_data) collect_result.append(single_data) logger.info(f" 「{keyword}」第{collected_count}个商品「{title}」采集完成") except Exception as e: # 异常处理:关闭详情页,强制切回列表页 logger.exception(f" 「{keyword}」第{collected_count}个商品采集核心异常:{str(e)}") try: if detail_page and not detail_page.is_closed(): detail_page.close() logger.info(f"📌 「{keyword}」第{collected_count}个商品 - 异常时关闭详情页标签页") if page and not page.is_closed(): page.bring_to_front() # 切回列表页 page.wait_for_load_state("networkidle") random_delay(MIN_CLICK_DELAY, MAX_CLICK_DELAY) except Exception as e2: logger.error(f" 「{keyword}」第{collected_count}个商品详情采集异常(处理时):{str(e2)},原异常:{str(e)}") continue # ✅ 每15次滚动一次(修复:用collected_count,且排除0的情况) if collected_count % 5 == 0 and collected_count > 0 and collected_count != total_limit: logger.info("采满5个往下滑") slow_scroll_400px(page,) page.wait_for_load_state("networkidle") # ====== 当前页采集完毕,尝试翻页 ====== delay = random_delay(1.5, 3.0) logger.info(f"⏳ 翻页前随机等待 {delay:.2f}s(反爬)") if goto_next_page(page): page_no += 1 continue else: logger.info(f" 「{keyword}」已无下一页,关键词采集结束") break # 关键词采集完成后长延迟 long_delay = random_delay(MIN_KEYWORD_DELAY, MAX_KEYWORD_DELAY) logger.info(f" 「{keyword}」采集完成,共{len(collect_result)}条数据,等待{long_delay:.2f}秒后继续下一个关键词(反爬)") return collect_result def check_dup_in_biz_db(product_link, discount_price_val): """直接查询业务表是否存在该商品链接+价格""" conn = None cursor = None try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() sql = """ SELECT * FROM yjj_drug_middle WHERE product_link = %s AND min_price = %s """ # 先执行查询 cursor.execute(sql, (product_link.strip(), discount_price_val)) # 再判断是否有结果 # 如果 fetchone() 返回元组(比如(1,))→ (1,) is not None → 结果为 True; # 如果 fetchone() 返回 None → None is not None → 结果为 False。 is_dup = cursor.fetchone() is not None if is_dup: logger.debug(f"【去重校验】商品链接:{product_link} | 价格:{discount_price_val} - 表中已存在重复,跳过本次采集") else: logger.debug(f"【去重校验】商品链接:{product_link} | 价格:{discount_price_val} - 表中无重复,正常采集") return is_dup except Exception as e: logger.error(f"查询业务表去重失败:{str(e)}") return False finally: if cursor: cursor.close() if conn: conn.close() #判断店名是否已经在数据库 def shop_is_exists_database(shop): try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() query_sql = """ SELECT * FROM yjj_shop_info_middle WHERE shop = %s """ cursor.execute(query_sql, (shop,)) result = cursor.fetchone() # 正确的调试方式(替代cursor._last_executed) print(f"【调试】传入的店铺名:{repr(shop)}") # repr能显示空格/隐藏字符 print(f"【调试】查询参数:{shop}") print(f"【调试】查询结果:{result} → 函数返回:{bool(result)}") is_exists = bool(result) if is_exists: logger.info(f"【店铺存在校验】店铺已存在 | 店铺名:{repr(shop)} | 结果:存在(True),跳过本次循环") else: logger.info(f"【店铺存在校验】店铺不存在 | 店铺名:{repr(shop)} | 结果:不存在(False)") return is_exists except Exception as e: print(f"MySQL 错误: {str(e)}") return False # 异常时明确返回False,避免返回None finally: # 修复:关闭游标和连接,避免泄露 if cursor: cursor.close() if conn: conn.close()