# config.py - 药帮忙采集配置 from datetime import datetime from dotenv import load_dotenv import os import oss2 from PIL import Image from commons.Logger import logger from conn_mysql import MySQLPoolOnline # 步骤:加载 .env 文件(如需) # load_dotenv() 默认读取当前目录下的 .env;如在其他路径可手动指定 # load_dotenv() # MySQL:与 conn_mysql.MySQLPoolOnline 一致(默认 drug_retrieve 线上库)。 # 覆盖方式:环境变量 MYSQL_DRUG_HOST / MYSQL_DRUG_PORT / MYSQL_DRUG_USER / # MYSQL_DRUG_PASSWORD / MYSQL_DRUG_DB mysql_pool = MySQLPoolOnline() # 模糊匹配 URL def fuzzy_match_product_url_in_db_mysql(product_url): # 先做非空校验 if not product_url: logger.warning("鈿狅笍 寰呭尮閰嶇殑 product_url 涓虹┖锛岃烦杩囨暟鎹烘煡") return None # 如有需要可转义 % / _,避免 LIKE 通配符影响 # escaped_product_url = product_url.replace("%", "\%").replace("_", "\_") try: sql = "SELECT * FROM ybm_drug_middle WHERE product_link LIKE %s" rows = mysql_pool.select_data(sql, (f"%{product_url}%",)) return rows[0] if rows else None except Exception as e: logger.error(f"MySQL 模糊匹配失败:{str(e)}") return None # ==================== 从数据库提取任务 ==================== def get_search_keywords_from_db(platform= 9, include_collect_config=False): """读取待执行任务(status=3),返回单条任务字典;无任务返回空 dict。""" sql = " SELECT * FROM retrieve_collect_task_allocate WHERE status = 1 AND platform = %s LIMIT 1 " try: rows = mysql_pool.select_data(sql, (platform,)) data = rows[0] if rows else {} logger.debug( "读取待执行任务完成 platform=%s task_id=%s", platform, data.get("id") if data else None, ) except Exception as e: logger.error(f"读取待执行任务失败,platform={platform},错误:{str(e)}") data = {} print(data) return data def has_running_task(platform: int = 9) -> bool: """检查指定 platform 在当天是否存在执行中任务(status=2)。""" try: day_start_ts = int(datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()) next_day_ts = day_start_ts + 24 * 60 * 60 sql = ( "SELECT 1 AS one FROM retrieve_collect_task_allocate " "WHERE status = 2 AND platform = %s " "AND update_time >= %s AND update_time < %s LIMIT 1" ) rows = mysql_pool.select_data(sql, (platform, day_start_ts, next_day_ts)) return bool(rows) except Exception as e: logger.error(f"检查执行中任务失败,platform={platform},错误:{str(e)}") return False if __name__ == '__main__': get_search_keywords_from_db() # ==================== 2. 反爬配置 ==================== # 随机延迟范围(模拟真人操作) MIN_CLICK_DELAY = 1.5 # 点击最小延迟(秒) MAX_CLICK_DELAY = 3.5 # 点击最大延迟(秒) MIN_INPUT_DELAY = 0.1 # 输入最小延迟(秒) MAX_INPUT_DELAY = 0.3 # 输入最大延迟(秒) MIN_PAGE_DELAY = 2.0 # 页面最小等待(秒) MAX_PAGE_DELAY = 4.0 # 页面最大等待(秒) # 关键词之间的随机延迟(秒) MIN_KEYWORD_DELAY = 8.0 MAX_KEYWORD_DELAY = 15.0 # 滚动配置(目标 400px,含随机偏移) SCROLL_TARGET_DISTANCE = 400 # 目标滚动距离 SCROLL_OFFSET_RANGE = 50 # 随机偏移范围 SCROLL_STEP = 50 # 每次滚动步长 SCROLL_INTERVAL = 0.05 # 步长间隔(秒) # ==================== 3. Cookie & 登录配置 ==================== COOKIE_FILE_PATH = "ybm_cookies.json" # Cookie 保存路径 # Cookie 有效性验证页面 LOGIN_VALIDATE_URL = "https://www.ybm100.com/new/" # 登录账号密码 USERNAME = "18008650300" PASSWORD = "12345678" # USERNAME = "yjj112031" # PASSWORD = "123456" # 登录URL TARGET_LOGIN_URL = "https://www.ybm100.com/new/login" # "https://www.yyjzt.com/login?redirect=%2FgoodDetail%3FladderNum%26itemStoreId%3D124250306%26sourceProdetail%3D%252Fsearch%26is_store%3D0" # ==================== 4. 元素选择器配置 ==================== # 基础选择器 USERNAME_SELECTOR = "input[placeholder*=请输入账号]" PASSWORD_SELECTOR = "input[placeholder*=请输入密码]" LOGIN_BTN_SELECTOR = "button:has(span:text('登录'))" SEARCH_INPUT_SELECTOR = "input[placeholder*='药品名称/厂家名称']" SEARCH_INPUT_SELECTOR2 = "div.home-search-container-search-head" SEARCH_BTN_SELECTOR = "div.home-search-container-search-head-btn[data-scmd=\"text-搜索\"]" # 采集元素选择器可根据页面实际情况调整 # 如页面结构变化,请优先更新以上选择器 PRODUCT_ITEM_SELECTOR = "div.product-list-item" # 商品项容器 PRODUCT_TITLE_SELECTOR = "div.product-name" # 商品标题 PRODUCT_PRICE_SELECTOR = "div.main-price" # 商品价格 PRODUCT_STORE_SELECTOR = 'div.prduct-shop-name div.shop-name' # 店铺名称 PRODUCT_COMPANY_SELECTOR = "div.product-manufacturer" # 公司名称 PRODUCT_VALIDITY_SELECTOR = "div.product-period" # 有效期 # div.shop-info-container-left-info-name span # ==================== 5. 等待时间配置(秒) ==================== ELEMENT_TIMEOUT = 10000 LOGIN_AFTER_CLICK = 5000 SEARCH_BTN_TIMEOUT = 5000 COLLECT_DELAY = 3000 DETAIL_LOAD_TIMEOUT = 5000 # 点击商品后等待详情加载时间 # ==================== 6. 浏览器配置 ==================== BROWSER_HEADLESS = False BROWSER_CHANNEL = "chrome" SLOW_MO_MIN = 50 SLOW_MO_MAX = 100 # ==================== 7. CSV 配置 ==================== CSV_HEADERS = [ "商品标题", "商品采购价格", "商品折扣价格", "规格", "盒数", "店铺名称", "公司名称", "有效日期", "生产日期", "批准文号", "采集时间" ] # CSV 表头 # 存放营业执照截图路径(如需) # cropped_screenshot_path = # 百度 OCR 配置 request_url_config = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license" AppKey_config = "tRK2RhyItCSh6BzyT4CNVXQa" AppSecret_config = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh" token_url_config = 'https://aip.baidubce.com/oauth/2.0/token' # ---------------------- OSS 配置 ---------------------- OSS_ACCESS_KEY_ID = 'LTAI5tDwjfteBvivYN41r8sJ' OSS_ACCESS_KEY_SECRET = 'yowuOGi2nYYnrqGpO3qcz94C4brcPp' OSS_ENDPOINT = "oss-cn-shenzhen.aliyuncs.com" OSS_BUCKET_NAME = "zhijiayun-jiansuo" OSS_PREFIX = "scrape_data/" # 本地截图配置 LOCAL_SCREENSHOT_DIR = "local_screenshots" # 本地截图目录 LOCAL_SCREENSHOT_NAME = None # 自动生成文件名 LOCAL_CROPPED_DIR = "./local_cropped_screenshots" # 裁剪后目录 # 图片压缩配置 IMAGE_COMPRESS_ENABLE = True # 是否启用压缩(True/False) IMAGE_COMPRESS_QUALITY = 30 # JPG 质量(1-95) IMAGE_COMPRESS_PNG_LEVEL = 9 # PNG 压缩级别(0-9) # ---------------------- 宸ュ叿鍑芥暟 ---------------------- def init_local_screenshot_dir(): """初始化本地截图目录(不存在则创建)。""" if not os.path.exists(LOCAL_SCREENSHOT_DIR): os.makedirs(LOCAL_SCREENSHOT_DIR) logger.info(f"本地截图目录已创建: {LOCAL_SCREENSHOT_DIR}") else: logger.debug(f"本地截图目录已存在: {LOCAL_SCREENSHOT_DIR}") def init_oss_bucket(): """初始化 OSS Bucket 对象。""" try: auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) bucket.get_bucket_info() logger.info("OSS Bucket 初始化成功") return bucket except Exception as e: logger.error(f"OSS Bucket 初始化失败: {str(e)}") raise def upload_local_screenshot_to_oss(bucket, local_file_path, oss_file_path=None): """将本地截图上传到 OSS,返回公网访问链接。""" if not os.path.exists(local_file_path): raise FileNotFoundError(f"本地截图文件不存在: {local_file_path}") if not oss_file_path: local_file_name = os.path.basename(local_file_path) oss_file_path = f"screenshots/{local_file_name}" try: bucket.put_object_from_file(oss_file_path, local_file_path) oss_file_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_file_path}" logger.info(f"截图上传 OSS 成功: {oss_file_url}") return oss_file_url except Exception as e: logger.error(f"截图上传 OSS 失败: {str(e)}") raise # ---------------------- 图片裁剪与压缩 ---------------------- def crop_local_screenshot(local_file_path, cropped_file_path=None, crop_region=None): """裁剪本地截图并保存,成功后删除原图,返回裁剪后文件路径。""" if not os.path.exists(local_file_path): raise FileNotFoundError(f"原始截图文件不存在: {local_file_path}") os.makedirs(LOCAL_CROPPED_DIR, exist_ok=True) if not cropped_file_path: file_name = os.path.basename(local_file_path) file_name_no_ext, file_ext = os.path.splitext(file_name) cropped_file_name = f"{file_name_no_ext}_cropped{file_ext}" cropped_file_path = os.path.join(LOCAL_CROPPED_DIR, cropped_file_name) with Image.open(local_file_path) as img: img_width, img_height = img.size logger.debug(f"原图尺寸: width={img_width}, height={img_height}") if not crop_region: left = 0 upper = 0 right = int(img_width) lower = int(img_height * 0.3) crop_region = (left, upper, right, lower) logger.debug(f"未指定裁剪区域,使用默认区域: {crop_region}") c_left, c_upper, c_right, c_lower = crop_region if c_right > img_width or c_lower > img_height or c_left < 0 or c_upper < 0: raise ValueError( f"裁剪区域超出图片范围,图片尺寸=({img_width}, {img_height}),裁剪区域={crop_region}" ) cropped_img = img.crop(crop_region) file_ext = os.path.splitext(cropped_file_path)[1].lower() try: if IMAGE_COMPRESS_ENABLE: if file_ext in ['.jpg', '.jpeg']: cropped_img.save( cropped_file_path, format='JPEG', quality=IMAGE_COMPRESS_QUALITY, optimize=True, progressive=True, ) else: cropped_img.save(cropped_file_path) logger.info(f"裁剪图片已保存(压缩开启): {cropped_file_path}") else: cropped_img.save(cropped_file_path, format='JPEG') logger.info(f"裁剪图片已保存(压缩关闭): {cropped_file_path}") except Exception as e: cropped_img.save(cropped_file_path, format='JPEG') logger.warning(f"图片压缩失败,已按普通 JPEG 保存: {str(e)}") try: if os.path.exists(cropped_file_path): os.remove(local_file_path) logger.debug(f"已删除原始截图: {local_file_path}") else: logger.warning(f"裁剪文件不存在,跳过删除原图: {cropped_file_path}") except OSError as e: logger.warning(f"删除原始截图失败: {str(e)}") return cropped_file_path def screenshot_target_page_to_local_then_oss(target_page, local_file_path=None, oss_file_path=None, full_page=True, crop_region=None): """页面截图到本地后裁剪,再上传 OSS,返回(裁剪路径, OSS链接)。""" os.makedirs(LOCAL_SCREENSHOT_DIR, exist_ok=True) if not local_file_path: current_time = datetime.now().strftime("%Y%m%d_%H%M%S") local_file_name = f"{current_time}_target_page.jpg" local_file_path = os.path.join(LOCAL_SCREENSHOT_DIR, local_file_name) logger.info(f"开始页面截图: {local_file_path}") target_page.screenshot( path=local_file_path, full_page=full_page, omit_background=False, timeout=10000, ) logger.debug("页面截图完成") cropped_file_path = crop_local_screenshot( local_file_path=local_file_path, crop_region=crop_region, ) bucket = init_oss_bucket() oss_file_url = upload_local_screenshot_to_oss(bucket, cropped_file_path, oss_file_path) return cropped_file_path, oss_file_url