import json import random import re import time import socket from urllib.parse import quote from decimal import Decimal, InvalidOperation from lxml import etree from commons.Logger import get_spider_logger from DrissionPage import ChromiumPage, ChromiumOptions from pipelines.drug_pipelines import DrugPipeline from area_info.city_name_to_id import get_city from oss_upload.oss_upload import AliyunOSSUploader logger = get_spider_logger("yaofangwang") MEDICINE_DETAIL_MAX_PAGES = 100 WAIT_BETWEEN_PAGES = (2, 4) chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class YaofangwangCrawl: def __init__(self, drug_dict=None): self.driver = None self.ip = "" self.base_url = "https://www.yaofangwang.com" self.ua = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) self.platform = "11" self.task_dict = drug_dict or {} self.collect_task_id = None self.pipeline = DrugPipeline("yaofangwang") if self.task_dict: self.get_product_data() self.ossuploader = AliyunOSSUploader() self.is_success = True self.account_name = "" def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) @staticmethod def _x1(node, xp): vals = node.xpath(xp) return vals[0] if vals else "" @staticmethod def replace_str(text): if text: return text.replace("\r\n", "").strip() return "" @staticmethod def normalize_price(price): price = YaofangwangCrawl.replace_str(price) price = re.sub(r"[^0-9.]", "", price) if price.count(".") > 1: head, tail = price.split(".", 1) tail = tail.replace(".", "") price = f"{head}.{tail}" return price @staticmethod def _camp_dict_str_values(camp_dict): """字形表里的数字统一为字符串,避免 0 在 if v 中被当成假值。""" if not camp_dict: return camp_dict return {k: str(v) if isinstance(v, int) else v for k, v in camp_dict.items()} def get_font(self, font_url): camp_dict = { "CC5E": "0", "3E73": "1", "B561": "2", "0F88": "3", "351D": "4", "0ECC": "5", "E171": "6", "0FFF": "7", "2FCF": "8", "2992": "9", "1C09": "g", "9887": "m", "29BE": "x", "1ECC": "5", "D6C2": "0", "31ED": "1", "9F43": "2", "398D": "3", "9220": "4", "0ED3": "5", "5B02": "6", "69E5": "7", "B899": "8", "D0AC": "9", "4A84": "g", "72A7": "m", "8C8C": "x", "BBB9": "0", "A3CF": "1", "E7AB": "2", "B053": "3", "0ADD": "4", "9322": "5", "A719": "6", "5C70": "7", "24CC": "8", "9B54": "9", "7F78": "Z", "4203": "H", "9F3A": "J", } return self._camp_dict_str_values(camp_dict) @staticmethod def parse_font(camp_dict, raw_str): raw_str = raw_str or "" result = [] for ch in raw_str: cp = ord(ch) glyph_name = f"{cp:04X}" if glyph_name in camp_dict and camp_dict[glyph_name]: result.append(camp_dict[glyph_name]) elif ch in ".:-~ ": result.append(ch) else: result.append(ch) return "".join(result) @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_drissionpage(self): co = ChromiumOptions().set_browser_path(chrome_path) # 获取独立端口 debug_port = self._get_free_port() # # 设置用户目录(每个账号独立) # co.set_user_data_path(f"./{self.account_name}") # 设置端口(重要:两个都要设置) co.set_local_port(debug_port) # DrissionPage 内部端口 co.set_argument(f"--remote-debugging-port={debug_port}") # Chrome 调试端口 co.set_argument("--remote-debugging-address=127.0.0.1") # 基础参数 co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 co.set_user_agent( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36') # co.headless() if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") self.driver = ChromiumPage(co) def parse_html(self, html): """解析详情页 HTML,返回 (data_items, shop_items);无列表节点时返回 ([], [])。""" tree = etree.HTML(html) if tree is None: return [], [] font_match = re.search(r"(/fonts/\w+\.ttf)", html) camp_dict = {} if font_match: font_url = self.base_url + font_match.group(1) camp_dict = self.get_font(font_url) approval_number = "" manufacturer = "" for dt in tree.xpath("//div[@id='wrap']//dl[@class='clearfix']//dt"): dt_text = self.replace_str(self._x1(dt, "./text()")) dd_text = self.replace_str(self._x1(dt, "./following-sibling::dd[1]//text()")) if not dt_text: continue if "批准文号" in dt_text: approval_number = self.replace_str( self._x1( dt, './following-sibling::dd[1]//div[contains(@class,"ybfont")]/text()', ) ) approval_number = self.parse_font(camp_dict, approval_number) if "生产企业" in dt_text: manufacturer = dd_text li_list = tree.xpath("//div[@id='slist']//ul[@class='slist']//li") for li in li_list: title = self.brand + self.replace_str(self._x1(li, ".//div[@class='info']//h3/a/text()")) if self.product not in title: return detail_url = self._x1(li, './/div[@class="info"]//a/@href') info_texts = li.xpath('.//div[@class="info"]//p//text()') info_str = self.replace_str("---------".join(info_texts)) specification = "" specification_re = re.search(r"规格:(.{12})", info_str) if specification_re: specification = specification_re.group(1).strip().strip("-").strip() inventory = self.replace_str( self._x1(li, './/div[@class="info"]//label[@class="sreserve"]/text()') ) price_raw = self._x1( li, './/div[@class="sale"]//span[contains(@class,"ybfont")]//text()' ) sale_texts = li.xpath('.//div[@class="sale"]//p//text()') sale_str = self.replace_str("-".join(sale_texts)) expiry_date = "" expiry_date_re = re.search(r" 剩余效期:(\d+) 天", sale_str) if expiry_date_re: expiry_date = expiry_date_re.group(1) + "天" shop = self.replace_str( self._x1( li, './/div[@class="shop"]//a[contains(@class,"stitle sc_store")]/text()', ) ) shop_url = self._x1( li, './/div[@class="shop"]//a[contains(@class,"stitle sc_store")]/@href', ) shop_str = self.replace_str(self._x1(li, './/div[@class="shop"]//p//text()')) shop_url = "https:" + shop_url price = self.normalize_price(self.parse_font(camp_dict, price_raw)) m_item = re.search(r"/(\d+)\.html", detail_url or "") m_shop = re.search(r"yaodian/(\d+)/", shop_url or "") if not m_item or not m_shop: continue item_id = m_item.group(1) detail_url = f"{self.base_url}{detail_url}" shop_id = m_shop.group(1) try: price = Decimal(str(price)).quantize(Decimal("0.00")) except (InvalidOperation, ValueError): price = Decimal("0.00") city_id = province_id = city = province = "" if shop_str: city_id, province_id, city, province = get_city(shop_str) snapshot_url = "" try: snapshot_url = self.get_page_detail(detail_url, item_id) or "" except Exception as e: logger.exception("详情页截图或上传失败 item_id=%s: %s", item_id, e) now = time.strftime("%Y-%m-%d %H:%M:%S") product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": title, "spec": specification, "one_price": '', "detail_url": detail_url, "shop_name": shop, "anonymous_store_name": "", "shop_url": shop_url, "city_name": city, "city_id": city_id, "province_name": province, "province_id": province_id, "factory_name": manufacturer, "scrape_date": time.strftime("%Y-%m-%d"), "price": price, "sales": "", "stock_count": inventory, "snapshot_url": snapshot_url, "approval_num": approval_number, "produced_time": "", "deadline": expiry_date, "update_time": now, "insert_time": now, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "collect_config_info": json.dumps( {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time}), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 1 } try: self.pipeline.storge_data(product) logger.info(json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) return len(li_list) def get_page_detail(self, detail_url, item_id): """打开详情页、截取 maininfo2 区域并上传 OSS,返回 URL;失败返回空字符串。""" self.driver.get(detail_url, timeout=10) time.sleep(2) ele = self.driver.ele("xpath=//div[@id='wrap']/div[contains(@class,'maininfo2')]") jpg_bytes = ele.get_screenshot(as_bytes="jpg") img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id)) time.sleep(random.uniform(0.5, 1)) return img_url def get_list(self, medicine_id): """按分页拉取同一药品详情下的报价列表,连续空页则停止。""" for page in range(1, MEDICINE_DETAIL_MAX_PAGES + 1): url = f"{self.base_url}/medicine/{medicine_id}/p{page}/" self.driver.get(url, timeout=10) data_items = self.parse_html(self.driver.html) if not data_items: break time.sleep(random.uniform(*WAIT_BETWEEN_PAGES)) def _search_result_medicine_ids(self, html): """从搜索结果 HTML 解析 medicine_id,避免遍历 DrissionPage 元素导致 ElementLostError。""" tree = etree.HTML(html) if tree is None: return [] li_list = tree.xpath("//div[@id='wrap']//ul[contains(@class,'goodlist_search')]/li") id_list = [] for li in li_list: href_raw = self._x1(li, ".//a/@href") title = self._x1(li, ".//a/text()") print(title) if self.product not in title: break if not href_raw: continue href = "https:" + href_raw if href_raw.startswith("//") else href_raw spec = self.replace_str(self._x1(li, ".//p[@class='st']/text()")) factory = self.replace_str(self._x1(li, ".//p[@class='st text-overflow']/text()")) id_list.append({"spec": spec, "href": href, "factory": factory}) self.driver.get(href, timeout=10) res_html = etree.HTML(self.driver.html) lis = res_html.xpath("//div[@id='wrap']//ul[@class='other']//li") for li_ele in lis: spec = self.replace_str(self._x1(li_ele, "./a/text()")) href = self._x1(li_ele, "./a/@href") if not href: continue id_list.append({"spec": spec, "href": href, "factory": factory}) return id_list def search_data(self): # 必须用局部变量,不能写 self.search_data = ... ,否则会覆盖掉本方法 keyword = f"{self.brand} {self.product or ''}".strip() if self.brand else (self.product or "") if not keyword: logger.warning("关键词为空,跳过搜索") return url = f"{self.base_url}/search.html?keyword={quote(keyword)}" self.driver.get(url, timeout=10) time.sleep(random.uniform(0.8, 1.5)) drug_list = self._search_result_medicine_ids(self.driver.html) id_dict = {} for drug in drug_list: spec = drug["spec"] href = drug["href"] if "x" in self.product_desc: spec = spec.replace("*", "x") if "*" in self.product_desc: spec = spec.replace("x", "*") print(self.product_desc, spec) if self.product_desc in spec: m = re.search(r"/medicine/(\d+)/", href or "") if not m: continue drug_id = m.group(1) if drug_id in id_dict: continue self.get_list(drug_id) id_dict[drug_id] = 1 def run(self): if not self.task_dict: logger.info("未提供任务参数,跳过爬取") return 0 try: self.init_drissionpage() self.search_data() except Exception as e: print(f"运行异常: {e}") self.is_success = False finally: if self.driver: self.driver.quit() logger.info(f"药房网爬取总数:{self.pipeline.crawl_count}条") return self.pipeline.crawl_count, self.is_success