# config.py - 药帮忙采集配置 from datetime import datetime import pymysql from dotenv import load_dotenv import os import oss2 from PIL import Image from logger_config import logger # 步骤:加载 .env 文件(如需) # load_dotenv() 默认读取当前目录下的 .env;如在其他路径可手动指定 # load_dotenv() # MySQL 配置(与 MYSQL_CONFIG 结构一致) # MYSQL_CONFIG = { # "host": "47.119.164.65", # MySQL 地址 # "port": 3306, # 端口 # "user": "test_c", # 用户名 # "password": "Dfwy@2025", # 密码 # "database": "test2", # 数据库名 # "charset": "utf8mb4" # 字符集 # } # 测试环境 # MYSQL_CONFIG = { # "host": "39.108.116.125", # MySQL 地址 # "port": 3306, # 端口 # "user": "drug_retrieve", # 用户名 # "password": "Pem287...", # 密码 # "database": "drug_retrieve", # 数据库名 # "charset": "utf8mb4" # 字符集 # } # 线上环境 MYSQL_CONFIG = { "host": "120.24.49.2", # MySQL 地址 "port": 3306, # 端口 "user": "drug_retrieve", # 用户名 "password": "ksCt3xm6chzdkafj", # 密码 "database": "drug_retrieve", # 数据库名 "charset": "utf8mb4" # 字符集 } # MYSQL_CONFIG = { # "host": os.getenv("MYSQL_HOST"), # 从 .env 读取 MYSQL_HOST # "user": os.getenv("MYSQL_USER"), # "password": os.getenv("MYSQL_PASSWORD"), # 敏感信息建议放在 .env # "database": os.getenv("MYSQL_DATABASE"), # "port": int(os.getenv("MYSQL_PORT", 3306)), # 默认端口 3306 # "charset": "utf8mb4" # } # 模糊匹配 URL def fuzzy_match_product_url_in_db_mysql(product_url): # 先做非空校验 if not product_url: logger.warning("鈿狅笍 寰呭尮閰嶇殑 product_url 涓虹┖锛岃烦杩囨暟鎹烘煡") return None # 如有需要可转义 % / _,避免 LIKE 通配符影响 # escaped_product_url = product_url.replace("%", "\%").replace("_", "\_") try: conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() # 使用 LIKE 做模糊匹配 # 例如:%product_url% sql = "SELECT * FROM ybm_drug_middle WHERE product_link LIKE %s" match_value = f"%{product_url}%" cursor.execute(sql, (match_value,)) # 取第一条匹配记录并格式化为字典 result = cursor.fetchone() # 返回元组,如 (id, product_url, price, ...) if result: # 将查询结果转换为字典,便于后续按字段名取值 column_names = [desc[0] for desc in cursor.description] result_dict = dict(zip(column_names, result)) return result_dict # 匹配成功 else: return None # 未匹配到记录 except Exception as e: logger.error(f"MySQL 模糊匹配失败:{str(e)}") return None finally: # 无论成功与否都关闭连接,避免连接泄漏 if 'conn' in locals() and conn: conn.close() # ==================== 从数据库提取任务 ==================== def get_search_keywords_from_db(platform: int = 9): """读取待执行任务(status=1),返回 [(task_id, brand, keyword, company_id), ...]。""" keywords = [] conn = None cursor = None try: required_configs = ['host', 'user', 'password', 'database'] for cfg in required_configs: if cfg not in MYSQL_CONFIG: raise ValueError(f"MYSQL_CONFIG 缺少必要字段: {cfg}") conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() sql = ( 'SELECT id, product_brand, product_name, product_specs, company_id ' 'FROM retrieve_collect_task_allocate ' 'WHERE status = 1 AND platform = %s' ) cursor.execute(sql, (platform,)) results = cursor.fetchall() for row in results: task_id = row[0] brand = (row[1] or '').strip() name = (row[2] or '').strip() company_id = row[4] if row[4] is not None else 0 parts = [p for p in [brand, name] if p] if parts: keyword = ''.join(parts) keywords.append((task_id, brand, keyword, company_id)) logger.debug(f"读取待执行任务完成,platform={platform},数量={len(keywords)}") except Exception as e: logger.error(f"读取待执行任务失败,platform={platform},错误:{str(e)}") keywords = [] finally: if cursor: try: cursor.close() except Exception: pass if conn: try: conn.close() except Exception: pass return keywords def has_running_task(platform: int = 9) -> bool: """检查指定 platform 在当天是否存在执行中任务(status=2)。""" conn = None cursor = None try: required_configs = ['host', 'user', 'password', 'database'] for cfg in required_configs: if cfg not in MYSQL_CONFIG: raise ValueError(f"MYSQL_CONFIG 缺少必要字段: {cfg}") day_start_ts = int(datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()) next_day_ts = day_start_ts + 24 * 60 * 60 conn = pymysql.connect(**MYSQL_CONFIG) cursor = conn.cursor() sql = ( 'SELECT 1 FROM retrieve_collect_task_allocate ' 'WHERE status = 2 AND platform = %s ' 'AND update_time >= %s AND update_time < %s LIMIT 1' ) cursor.execute(sql, (platform, day_start_ts, next_day_ts)) return cursor.fetchone() is not None except Exception as e: logger.error(f"检查执行中任务失败,platform={platform},错误:{str(e)}") return False finally: if cursor: try: cursor.close() except Exception: pass if conn: try: conn.close() except Exception: pass # 以下历史示例注释已保留为空,避免乱码干扰 # ==================== 2. 反爬配置 ==================== # 随机延迟范围(模拟真人操作) MIN_CLICK_DELAY = 1.5 # 点击最小延迟(秒) MAX_CLICK_DELAY = 3.5 # 点击最大延迟(秒) MIN_INPUT_DELAY = 0.1 # 输入最小延迟(秒) MAX_INPUT_DELAY = 0.3 # 输入最大延迟(秒) MIN_PAGE_DELAY = 2.0 # 页面最小等待(秒) MAX_PAGE_DELAY = 4.0 # 页面最大等待(秒) # 关键词之间的随机延迟(秒) MIN_KEYWORD_DELAY = 8.0 MAX_KEYWORD_DELAY = 15.0 # 滚动配置(目标 400px,含随机偏移) SCROLL_TARGET_DISTANCE = 400 # 目标滚动距离 SCROLL_OFFSET_RANGE = 50 # 随机偏移范围 SCROLL_STEP = 50 # 每次滚动步长 SCROLL_INTERVAL = 0.05 # 步长间隔(秒) # ==================== 3. Cookie & 登录配置 ==================== COOKIE_FILE_PATH = "ybm_cookies.json" # Cookie 保存路径 # Cookie 有效性验证页面 LOGIN_VALIDATE_URL = "https://www.ybm100.com/new/" # 登录账号密码 USERNAME = "18008650300" PASSWORD = "12345678" # USERNAME = "yjj112031" # PASSWORD = "123456" # 登录URL TARGET_LOGIN_URL = "https://www.ybm100.com/new/login" # "https://www.yyjzt.com/login?redirect=%2FgoodDetail%3FladderNum%26itemStoreId%3D124250306%26sourceProdetail%3D%252Fsearch%26is_store%3D0" # ==================== 4. 元素选择器配置 ==================== # 基础选择器 USERNAME_SELECTOR = "input[placeholder*=请输入账号]" PASSWORD_SELECTOR = "input[placeholder*=请输入密码]" LOGIN_BTN_SELECTOR = "button:has(span:text('登录'))" SEARCH_INPUT_SELECTOR = "input[placeholder*='药品名称/厂家名称']" SEARCH_INPUT_SELECTOR2 = "div.home-search-container-search-head" SEARCH_BTN_SELECTOR = "div.home-search-container-search-head-btn[data-scmd=\"text-搜索\"]" # 采集元素选择器可根据页面实际情况调整 # 如页面结构变化,请优先更新以上选择器 PRODUCT_ITEM_SELECTOR = "div.product-list-item" # 商品项容器 PRODUCT_TITLE_SELECTOR = "div.product-name" # 商品标题 PRODUCT_PRICE_SELECTOR = "div.main-price" # 商品价格 PRODUCT_STORE_SELECTOR = 'div.prduct-shop-name div.shop-name' # 店铺名称 PRODUCT_COMPANY_SELECTOR = "div.product-manufacturer" # 公司名称 PRODUCT_VALIDITY_SELECTOR = "div.product-period" # 有效期 # div.shop-info-container-left-info-name span # ==================== 5. 等待时间配置(秒) ==================== ELEMENT_TIMEOUT = 10000 LOGIN_AFTER_CLICK = 5000 SEARCH_BTN_TIMEOUT = 5000 COLLECT_DELAY = 3000 DETAIL_LOAD_TIMEOUT = 5000 # 点击商品后等待详情加载时间 # ==================== 6. 浏览器配置 ==================== BROWSER_HEADLESS = False BROWSER_CHANNEL = "chrome" SLOW_MO_MIN = 50 SLOW_MO_MAX = 100 # ==================== 7. CSV 配置 ==================== CSV_HEADERS = [ "商品标题", "商品采购价格", "商品折扣价格", "规格", "盒数", "店铺名称", "公司名称", "有效日期", "生产日期", "批准文号", "采集时间" ] # CSV 表头 # 存放营业执照截图路径(如需) # cropped_screenshot_path = # 百度 OCR 配置 request_url_config = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license" AppKey_config = "tRK2RhyItCSh6BzyT4CNVXQa" AppSecret_config = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh" token_url_config = 'https://aip.baidubce.com/oauth/2.0/token' # ---------------------- OSS 配置 ---------------------- OSS_ACCESS_KEY_ID = 'LTAI5tDwjfteBvivYN41r8sJ' OSS_ACCESS_KEY_SECRET = 'yowuOGi2nYYnrqGpO3qcz94C4brcPp' OSS_ENDPOINT = "oss-cn-shenzhen.aliyuncs.com" OSS_BUCKET_NAME = "zhijiayun-jiansuo" OSS_PREFIX = "scrape_data/" # 本地截图配置 LOCAL_SCREENSHOT_DIR = "local_screenshots" # 本地截图目录 LOCAL_SCREENSHOT_NAME = None # 自动生成文件名 LOCAL_CROPPED_DIR = "./local_cropped_screenshots" # 裁剪后目录 # 图片压缩配置 IMAGE_COMPRESS_ENABLE = True # 是否启用压缩(True/False) IMAGE_COMPRESS_QUALITY = 30 # JPG 质量(1-95) IMAGE_COMPRESS_PNG_LEVEL = 9 # PNG 压缩级别(0-9) # ---------------------- 宸ュ叿鍑芥暟 ---------------------- def init_local_screenshot_dir(): """初始化本地截图目录(不存在则创建)。""" if not os.path.exists(LOCAL_SCREENSHOT_DIR): os.makedirs(LOCAL_SCREENSHOT_DIR) logger.info(f"本地截图目录已创建: {LOCAL_SCREENSHOT_DIR}") else: logger.debug(f"本地截图目录已存在: {LOCAL_SCREENSHOT_DIR}") def init_oss_bucket(): """初始化 OSS Bucket 对象。""" try: auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) bucket.get_bucket_info() logger.info("OSS Bucket 初始化成功") return bucket except Exception as e: logger.error(f"OSS Bucket 初始化失败: {str(e)}") raise def upload_local_screenshot_to_oss(bucket, local_file_path, oss_file_path=None): """将本地截图上传到 OSS,返回公网访问链接。""" if not os.path.exists(local_file_path): raise FileNotFoundError(f"本地截图文件不存在: {local_file_path}") if not oss_file_path: local_file_name = os.path.basename(local_file_path) oss_file_path = f"screenshots/{local_file_name}" try: bucket.put_object_from_file(oss_file_path, local_file_path) oss_file_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_file_path}" logger.info(f"截图上传 OSS 成功: {oss_file_url}") return oss_file_url except Exception as e: logger.error(f"截图上传 OSS 失败: {str(e)}") raise # ---------------------- 图片裁剪与压缩 ---------------------- def crop_local_screenshot(local_file_path, cropped_file_path=None, crop_region=None): """裁剪本地截图并保存,成功后删除原图,返回裁剪后文件路径。""" if not os.path.exists(local_file_path): raise FileNotFoundError(f"原始截图文件不存在: {local_file_path}") os.makedirs(LOCAL_CROPPED_DIR, exist_ok=True) if not cropped_file_path: file_name = os.path.basename(local_file_path) file_name_no_ext, file_ext = os.path.splitext(file_name) cropped_file_name = f"{file_name_no_ext}_cropped{file_ext}" cropped_file_path = os.path.join(LOCAL_CROPPED_DIR, cropped_file_name) with Image.open(local_file_path) as img: img_width, img_height = img.size logger.debug(f"原图尺寸: width={img_width}, height={img_height}") if not crop_region: left = 0 upper = 0 right = int(img_width) lower = int(img_height * 0.3) crop_region = (left, upper, right, lower) logger.debug(f"未指定裁剪区域,使用默认区域: {crop_region}") c_left, c_upper, c_right, c_lower = crop_region if c_right > img_width or c_lower > img_height or c_left < 0 or c_upper < 0: raise ValueError( f"裁剪区域超出图片范围,图片尺寸=({img_width}, {img_height}),裁剪区域={crop_region}" ) cropped_img = img.crop(crop_region) file_ext = os.path.splitext(cropped_file_path)[1].lower() try: if IMAGE_COMPRESS_ENABLE: if file_ext in ['.jpg', '.jpeg']: cropped_img.save( cropped_file_path, format='JPEG', quality=IMAGE_COMPRESS_QUALITY, optimize=True, progressive=True, ) else: cropped_img.save(cropped_file_path) logger.info(f"裁剪图片已保存(压缩开启): {cropped_file_path}") else: cropped_img.save(cropped_file_path, format='JPEG') logger.info(f"裁剪图片已保存(压缩关闭): {cropped_file_path}") except Exception as e: cropped_img.save(cropped_file_path, format='JPEG') logger.warning(f"图片压缩失败,已按普通 JPEG 保存: {str(e)}") try: if os.path.exists(cropped_file_path): os.remove(local_file_path) logger.debug(f"已删除原始截图: {local_file_path}") else: logger.warning(f"裁剪文件不存在,跳过删除原图: {cropped_file_path}") except OSError as e: logger.warning(f"删除原始截图失败: {str(e)}") return cropped_file_path def screenshot_target_page_to_local_then_oss(target_page, local_file_path=None, oss_file_path=None, full_page=True, crop_region=None): """页面截图到本地后裁剪,再上传 OSS,返回(裁剪路径, OSS链接)。""" os.makedirs(LOCAL_SCREENSHOT_DIR, exist_ok=True) if not local_file_path: current_time = datetime.now().strftime("%Y%m%d_%H%M%S") local_file_name = f"{current_time}_target_page.jpg" local_file_path = os.path.join(LOCAL_SCREENSHOT_DIR, local_file_name) logger.info(f"开始页面截图: {local_file_path}") target_page.screenshot( path=local_file_path, full_page=full_page, omit_background=False, timeout=10000, ) logger.debug("页面截图完成") cropped_file_path = crop_local_screenshot( local_file_path=local_file_path, crop_region=crop_region, ) bucket = init_oss_bucket() oss_file_url = upload_local_screenshot_to_oss(bucket, cropped_file_path, oss_file_path) return cropped_file_path, oss_file_url