| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- # config.py - 药帮忙采集配置
- from datetime import datetime
- import pymysql
- from dotenv import load_dotenv
- import os
- import oss2
- from PIL import Image
- from logger_config import logger
- # 步骤:加载 .env 文件(如需)
- # load_dotenv() 默认读取当前目录下的 .env;如在其他路径可手动指定
- # load_dotenv()
- # MySQL 配置(与 MYSQL_CONFIG 结构一致)
- # MYSQL_CONFIG = {
- # "host": "47.119.164.65", # MySQL 地址
- # "port": 3306, # 端口
- # "user": "test_c", # 用户名
- # "password": "Dfwy@2025", # 密码
- # "database": "test2", # 数据库名
- # "charset": "utf8mb4" # 字符集
- # }
- # 测试环境
- # MYSQL_CONFIG = {
- # "host": "39.108.116.125", # MySQL 地址
- # "port": 3306, # 端口
- # "user": "drug_retrieve", # 用户名
- # "password": "Pem287...", # 密码
- # "database": "drug_retrieve", # 数据库名
- # "charset": "utf8mb4" # 字符集
- # }
- # 线上环境
- MYSQL_CONFIG = {
- "host": "120.24.49.2", # MySQL 地址
- "port": 3306, # 端口
- "user": "drug_retrieve", # 用户名
- "password": "ksCt3xm6chzdkafj", # 密码
- "database": "drug_retrieve", # 数据库名
- "charset": "utf8mb4" # 字符集
- }
- # MYSQL_CONFIG = {
- # "host": os.getenv("MYSQL_HOST"), # 从 .env 读取 MYSQL_HOST
- # "user": os.getenv("MYSQL_USER"),
- # "password": os.getenv("MYSQL_PASSWORD"), # 敏感信息建议放在 .env
- # "database": os.getenv("MYSQL_DATABASE"),
- # "port": int(os.getenv("MYSQL_PORT", 3306)), # 默认端口 3306
- # "charset": "utf8mb4"
- # }
- # 模糊匹配 URL
- def fuzzy_match_product_url_in_db_mysql(product_url):
- # 先做非空校验
- if not product_url:
- logger.warning("鈿狅笍 寰呭尮閰嶇殑 product_url 涓虹┖锛岃烦杩囨暟鎹烘煡")
- return None
- # 如有需要可转义 % / _,避免 LIKE 通配符影响
- # escaped_product_url = product_url.replace("%", "\%").replace("_", "\_")
- try:
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor()
- # 使用 LIKE 做模糊匹配
- # 例如:%product_url%
- sql = "SELECT * FROM ybm_drug_middle WHERE product_link LIKE %s"
- match_value = f"%{product_url}%"
- cursor.execute(sql, (match_value,))
- # 取第一条匹配记录并格式化为字典
- result = cursor.fetchone() # 返回元组,如 (id, product_url, price, ...)
- if result:
- # 将查询结果转换为字典,便于后续按字段名取值
- column_names = [desc[0] for desc in cursor.description]
- result_dict = dict(zip(column_names, result))
- return result_dict # 匹配成功
- else:
- return None # 未匹配到记录
- except Exception as e:
- logger.error(f"MySQL 模糊匹配失败:{str(e)}")
- return None
- finally:
- # 无论成功与否都关闭连接,避免连接泄漏
- if 'conn' in locals() and conn:
- conn.close()
- # ==================== 从数据库提取任务 ====================
- def get_search_keywords_from_db(platform: int = 9):
- """读取待执行任务(status=1),返回 [(task_id, brand, keyword, company_id), ...]。"""
- keywords = []
- conn = None
- cursor = None
- try:
- required_configs = ['host', 'user', 'password', 'database']
- for cfg in required_configs:
- if cfg not in MYSQL_CONFIG:
- raise ValueError(f"MYSQL_CONFIG 缺少必要字段: {cfg}")
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor()
- sql = (
- 'SELECT id, product_brand, product_name, product_specs, company_id '
- 'FROM retrieve_collect_task_allocate '
- 'WHERE status = 1 AND platform = %s'
- )
- cursor.execute(sql, (platform,))
- results = cursor.fetchall()
- for row in results:
- task_id = row[0]
- brand = (row[1] or '').strip()
- name = (row[2] or '').strip()
- company_id = row[4] if row[4] is not None else 0
- parts = [p for p in [brand, name] if p]
- if parts:
- keyword = ''.join(parts)
- keywords.append((task_id, brand, keyword, company_id))
- logger.debug(f"读取待执行任务完成,platform={platform},数量={len(keywords)}")
- except Exception as e:
- logger.error(f"读取待执行任务失败,platform={platform},错误:{str(e)}")
- keywords = []
- finally:
- if cursor:
- try:
- cursor.close()
- except Exception:
- pass
- if conn:
- try:
- conn.close()
- except Exception:
- pass
- return keywords
- def has_running_task(platform: int = 9) -> bool:
- """检查指定 platform 在当天是否存在执行中任务(status=2)。"""
- conn = None
- cursor = None
- try:
- required_configs = ['host', 'user', 'password', 'database']
- for cfg in required_configs:
- if cfg not in MYSQL_CONFIG:
- raise ValueError(f"MYSQL_CONFIG 缺少必要字段: {cfg}")
- day_start_ts = int(datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp())
- next_day_ts = day_start_ts + 24 * 60 * 60
- conn = pymysql.connect(**MYSQL_CONFIG)
- cursor = conn.cursor()
- sql = (
- 'SELECT 1 FROM retrieve_collect_task_allocate '
- 'WHERE status = 2 AND platform = %s '
- 'AND update_time >= %s AND update_time < %s LIMIT 1'
- )
- cursor.execute(sql, (platform, day_start_ts, next_day_ts))
- return cursor.fetchone() is not None
- except Exception as e:
- logger.error(f"检查执行中任务失败,platform={platform},错误:{str(e)}")
- return False
- finally:
- if cursor:
- try:
- cursor.close()
- except Exception:
- pass
- if conn:
- try:
- conn.close()
- except Exception:
- pass
- # 以下历史示例注释已保留为空,避免乱码干扰
- # ==================== 2. 反爬配置 ====================
- # 随机延迟范围(模拟真人操作)
- MIN_CLICK_DELAY = 1.5 # 点击最小延迟(秒)
- MAX_CLICK_DELAY = 3.5 # 点击最大延迟(秒)
- MIN_INPUT_DELAY = 0.1 # 输入最小延迟(秒)
- MAX_INPUT_DELAY = 0.3 # 输入最大延迟(秒)
- MIN_PAGE_DELAY = 2.0 # 页面最小等待(秒)
- MAX_PAGE_DELAY = 4.0 # 页面最大等待(秒)
- # 关键词之间的随机延迟(秒)
- MIN_KEYWORD_DELAY = 8.0
- MAX_KEYWORD_DELAY = 15.0
- # 滚动配置(目标 400px,含随机偏移)
- SCROLL_TARGET_DISTANCE = 400 # 目标滚动距离
- SCROLL_OFFSET_RANGE = 50 # 随机偏移范围
- SCROLL_STEP = 50 # 每次滚动步长
- SCROLL_INTERVAL = 0.05 # 步长间隔(秒)
- # ==================== 3. Cookie & 登录配置 ====================
- COOKIE_FILE_PATH = "ybm_cookies.json" # Cookie 保存路径
- # Cookie 有效性验证页面
- LOGIN_VALIDATE_URL = "https://www.ybm100.com/new/"
- # 登录账号密码
- USERNAME = "18008650300"
- PASSWORD = "12345678"
- # USERNAME = "yjj112031"
- # PASSWORD = "123456"
- # 登录URL
- TARGET_LOGIN_URL = "https://www.ybm100.com/new/login"
- # "https://www.yyjzt.com/login?redirect=%2FgoodDetail%3FladderNum%26itemStoreId%3D124250306%26sourceProdetail%3D%252Fsearch%26is_store%3D0"
- # ==================== 4. 元素选择器配置 ====================
- # 基础选择器
- USERNAME_SELECTOR = "input[placeholder*=请输入账号]"
- PASSWORD_SELECTOR = "input[placeholder*=请输入密码]"
- LOGIN_BTN_SELECTOR = "button:has(span:text('登录'))"
- SEARCH_INPUT_SELECTOR = "input[placeholder*='药品名称/厂家名称']"
- SEARCH_INPUT_SELECTOR2 = "div.home-search-container-search-head"
- SEARCH_BTN_SELECTOR = "div.home-search-container-search-head-btn[data-scmd=\"text-搜索\"]"
- # 采集元素选择器可根据页面实际情况调整
- # 如页面结构变化,请优先更新以上选择器
- PRODUCT_ITEM_SELECTOR = "div.product-list-item" # 商品项容器
- PRODUCT_TITLE_SELECTOR = "div.product-name" # 商品标题
- PRODUCT_PRICE_SELECTOR = "div.main-price" # 商品价格
- PRODUCT_STORE_SELECTOR = 'div.prduct-shop-name div.shop-name' # 店铺名称
- PRODUCT_COMPANY_SELECTOR = "div.product-manufacturer" # 公司名称
- PRODUCT_VALIDITY_SELECTOR = "div.product-period" # 有效期
- # div.shop-info-container-left-info-name span
- # ==================== 5. 等待时间配置(秒) ====================
- ELEMENT_TIMEOUT = 10000
- LOGIN_AFTER_CLICK = 5000
- SEARCH_BTN_TIMEOUT = 5000
- COLLECT_DELAY = 3000
- DETAIL_LOAD_TIMEOUT = 5000 # 点击商品后等待详情加载时间
- # ==================== 6. 浏览器配置 ====================
- BROWSER_HEADLESS = False
- BROWSER_CHANNEL = "chrome"
- SLOW_MO_MIN = 50
- SLOW_MO_MAX = 100
- # ==================== 7. CSV 配置 ====================
- CSV_HEADERS = [
- "商品标题", "商品采购价格", "商品折扣价格", "规格", "盒数",
- "店铺名称", "公司名称",
- "有效日期", "生产日期", "批准文号", "采集时间"
- ] # CSV 表头
- # 存放营业执照截图路径(如需)
- # cropped_screenshot_path =
- # 百度 OCR 配置
- request_url_config = "https://aip.baidubce.com/rest/2.0/ocr/v1/business_license"
- AppKey_config = "tRK2RhyItCSh6BzyT4CNVXQa"
- AppSecret_config = "TDgKiPo94i2mOM1sDqOuDnlcK1bG66jh"
- token_url_config = 'https://aip.baidubce.com/oauth/2.0/token'
- # ---------------------- OSS 配置 ----------------------
- OSS_ACCESS_KEY_ID = 'LTAI5tDwjfteBvivYN41r8sJ'
- OSS_ACCESS_KEY_SECRET = 'yowuOGi2nYYnrqGpO3qcz94C4brcPp'
- OSS_ENDPOINT = "oss-cn-shenzhen.aliyuncs.com"
- OSS_BUCKET_NAME = "zhijiayun-jiansuo"
- OSS_PREFIX = "scrape_data/"
- # 本地截图配置
- LOCAL_SCREENSHOT_DIR = "local_screenshots" # 本地截图目录
- LOCAL_SCREENSHOT_NAME = None # 自动生成文件名
- LOCAL_CROPPED_DIR = "./local_cropped_screenshots" # 裁剪后目录
- # 图片压缩配置
- IMAGE_COMPRESS_ENABLE = True # 是否启用压缩(True/False)
- IMAGE_COMPRESS_QUALITY = 30 # JPG 质量(1-95)
- IMAGE_COMPRESS_PNG_LEVEL = 9 # PNG 压缩级别(0-9)
- # ---------------------- 宸ュ叿鍑芥暟 ----------------------
- def init_local_screenshot_dir():
- """初始化本地截图目录(不存在则创建)。"""
- if not os.path.exists(LOCAL_SCREENSHOT_DIR):
- os.makedirs(LOCAL_SCREENSHOT_DIR)
- logger.info(f"本地截图目录已创建: {LOCAL_SCREENSHOT_DIR}")
- else:
- logger.debug(f"本地截图目录已存在: {LOCAL_SCREENSHOT_DIR}")
- def init_oss_bucket():
- """初始化 OSS Bucket 对象。"""
- try:
- auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
- bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
- bucket.get_bucket_info()
- logger.info("OSS Bucket 初始化成功")
- return bucket
- except Exception as e:
- logger.error(f"OSS Bucket 初始化失败: {str(e)}")
- raise
- def upload_local_screenshot_to_oss(bucket, local_file_path, oss_file_path=None):
- """将本地截图上传到 OSS,返回公网访问链接。"""
- if not os.path.exists(local_file_path):
- raise FileNotFoundError(f"本地截图文件不存在: {local_file_path}")
- if not oss_file_path:
- local_file_name = os.path.basename(local_file_path)
- oss_file_path = f"screenshots/{local_file_name}"
- try:
- bucket.put_object_from_file(oss_file_path, local_file_path)
- oss_file_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_file_path}"
- logger.info(f"截图上传 OSS 成功: {oss_file_url}")
- return oss_file_url
- except Exception as e:
- logger.error(f"截图上传 OSS 失败: {str(e)}")
- raise
- # ---------------------- 图片裁剪与压缩 ----------------------
- def crop_local_screenshot(local_file_path, cropped_file_path=None, crop_region=None):
- """裁剪本地截图并保存,成功后删除原图,返回裁剪后文件路径。"""
- if not os.path.exists(local_file_path):
- raise FileNotFoundError(f"原始截图文件不存在: {local_file_path}")
- os.makedirs(LOCAL_CROPPED_DIR, exist_ok=True)
- if not cropped_file_path:
- file_name = os.path.basename(local_file_path)
- file_name_no_ext, file_ext = os.path.splitext(file_name)
- cropped_file_name = f"{file_name_no_ext}_cropped{file_ext}"
- cropped_file_path = os.path.join(LOCAL_CROPPED_DIR, cropped_file_name)
- with Image.open(local_file_path) as img:
- img_width, img_height = img.size
- logger.debug(f"原图尺寸: width={img_width}, height={img_height}")
- if not crop_region:
- left = 0
- upper = 0
- right = int(img_width)
- lower = int(img_height * 0.3)
- crop_region = (left, upper, right, lower)
- logger.debug(f"未指定裁剪区域,使用默认区域: {crop_region}")
- c_left, c_upper, c_right, c_lower = crop_region
- if c_right > img_width or c_lower > img_height or c_left < 0 or c_upper < 0:
- raise ValueError(
- f"裁剪区域超出图片范围,图片尺寸=({img_width}, {img_height}),裁剪区域={crop_region}"
- )
- cropped_img = img.crop(crop_region)
- file_ext = os.path.splitext(cropped_file_path)[1].lower()
- try:
- if IMAGE_COMPRESS_ENABLE:
- if file_ext in ['.jpg', '.jpeg']:
- cropped_img.save(
- cropped_file_path,
- format='JPEG',
- quality=IMAGE_COMPRESS_QUALITY,
- optimize=True,
- progressive=True,
- )
- else:
- cropped_img.save(cropped_file_path)
- logger.info(f"裁剪图片已保存(压缩开启): {cropped_file_path}")
- else:
- cropped_img.save(cropped_file_path, format='JPEG')
- logger.info(f"裁剪图片已保存(压缩关闭): {cropped_file_path}")
- except Exception as e:
- cropped_img.save(cropped_file_path, format='JPEG')
- logger.warning(f"图片压缩失败,已按普通 JPEG 保存: {str(e)}")
- try:
- if os.path.exists(cropped_file_path):
- os.remove(local_file_path)
- logger.debug(f"已删除原始截图: {local_file_path}")
- else:
- logger.warning(f"裁剪文件不存在,跳过删除原图: {cropped_file_path}")
- except OSError as e:
- logger.warning(f"删除原始截图失败: {str(e)}")
- return cropped_file_path
- def screenshot_target_page_to_local_then_oss(target_page, local_file_path=None, oss_file_path=None, full_page=True, crop_region=None):
- """页面截图到本地后裁剪,再上传 OSS,返回(裁剪路径, OSS链接)。"""
- os.makedirs(LOCAL_SCREENSHOT_DIR, exist_ok=True)
- if not local_file_path:
- current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
- local_file_name = f"{current_time}_target_page.jpg"
- local_file_path = os.path.join(LOCAL_SCREENSHOT_DIR, local_file_name)
- logger.info(f"开始页面截图: {local_file_path}")
- target_page.screenshot(
- path=local_file_path,
- full_page=full_page,
- omit_background=False,
- timeout=10000,
- )
- logger.debug("页面截图完成")
- cropped_file_path = crop_local_screenshot(
- local_file_path=local_file_path,
- crop_region=crop_region,
- )
- bucket = init_oss_bucket()
- oss_file_url = upload_local_screenshot_to_oss(bucket, cropped_file_path, oss_file_path)
- return cropped_file_path, oss_file_url
|