| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- import json
- import random
- import re
- import time
- import socket
- from urllib.parse import quote
- from decimal import Decimal, InvalidOperation
- from lxml import etree
- from commons.Logger import get_spider_logger
- from DrissionPage import ChromiumPage, ChromiumOptions
- from pipelines.drug_pipelines import DrugPipeline
- from area_info.city_name_to_id import get_city
- from oss_upload.oss_upload import AliyunOSSUploader
- logger = get_spider_logger("yaofangwang")
- MEDICINE_DETAIL_MAX_PAGES = 100
- WAIT_BETWEEN_PAGES = (2, 4)
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class YaofangwangCrawl:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.ip = ""
- self.base_url = "https://www.yaofangwang.com"
- self.ua = (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/124.0.0.0 Safari/537.36"
- )
- self.platform = "11"
- self.task_dict = drug_dict or {}
- self.collect_task_id = None
- self.pipeline = DrugPipeline("yaofangwang")
- if self.task_dict:
- self.get_product_data()
- self.ossuploader = AliyunOSSUploader()
- self.is_success = True
- self.account_name = ""
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
- self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
- self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
- self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
- self.account_id = self.task_dict.get("collect_equipment_account_id", "")
- self.collect_region_id = self.task_dict.get("collect_region_id", "")
- self.collect_round = self.task_dict.get("collect_round", 1)
- @staticmethod
- def _x1(node, xp):
- vals = node.xpath(xp)
- return vals[0] if vals else ""
- @staticmethod
- def replace_str(text):
- if text:
- return text.replace("\r\n", "").strip()
- return ""
- @staticmethod
- def normalize_price(price):
- price = YaofangwangCrawl.replace_str(price)
- price = re.sub(r"[^0-9.]", "", price)
- if price.count(".") > 1:
- head, tail = price.split(".", 1)
- tail = tail.replace(".", "")
- price = f"{head}.{tail}"
- return price
- @staticmethod
- def _camp_dict_str_values(camp_dict):
- """字形表里的数字统一为字符串,避免 0 在 if v 中被当成假值。"""
- if not camp_dict:
- return camp_dict
- return {k: str(v) if isinstance(v, int) else v for k, v in camp_dict.items()}
- def get_font(self, font_url):
- camp_dict = {
- "CC5E": "0",
- "3E73": "1",
- "B561": "2",
- "0F88": "3",
- "351D": "4",
- "0ECC": "5",
- "E171": "6",
- "0FFF": "7",
- "2FCF": "8",
- "2992": "9",
- "1C09": "g",
- "9887": "m",
- "29BE": "x",
- "1ECC": "5",
- "D6C2": "0",
- "31ED": "1",
- "9F43": "2",
- "398D": "3",
- "9220": "4",
- "0ED3": "5",
- "5B02": "6",
- "69E5": "7",
- "B899": "8",
- "D0AC": "9",
- "4A84": "g",
- "72A7": "m",
- "8C8C": "x",
- "BBB9": "0",
- "A3CF": "1",
- "E7AB": "2",
- "B053": "3",
- "0ADD": "4",
- "9322": "5",
- "A719": "6",
- "5C70": "7",
- "24CC": "8",
- "9B54": "9",
- "7F78": "Z",
- "4203": "H",
- "9F3A": "J",
- }
- return self._camp_dict_str_values(camp_dict)
- @staticmethod
- def parse_font(camp_dict, raw_str):
- raw_str = raw_str or ""
- result = []
- for ch in raw_str:
- cp = ord(ch)
- glyph_name = f"{cp:04X}"
- if glyph_name in camp_dict and camp_dict[glyph_name]:
- result.append(camp_dict[glyph_name])
- elif ch in ".:-~ ":
- result.append(ch)
- else:
- result.append(ch)
- return "".join(result)
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_drissionpage(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- # 获取独立端口
- debug_port = self._get_free_port()
- # # 设置用户目录(每个账号独立)
- # co.set_user_data_path(f"./{self.account_name}")
- # 设置端口(重要:两个都要设置)
- co.set_local_port(debug_port) # DrissionPage 内部端口
- co.set_argument(f"--remote-debugging-port={debug_port}") # Chrome 调试端口
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # 基础参数
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- co.set_user_agent(
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36')
- # co.headless()
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- self.driver = ChromiumPage(co)
- def parse_html(self, html):
- """解析详情页 HTML,返回 (data_items, shop_items);无列表节点时返回 ([], [])。"""
- tree = etree.HTML(html)
- if tree is None:
- return [], []
- font_match = re.search(r"(/fonts/\w+\.ttf)", html)
- camp_dict = {}
- if font_match:
- font_url = self.base_url + font_match.group(1)
- camp_dict = self.get_font(font_url)
- approval_number = ""
- manufacturer = ""
- for dt in tree.xpath("//div[@id='wrap']//dl[@class='clearfix']//dt"):
- dt_text = self.replace_str(self._x1(dt, "./text()"))
- dd_text = self.replace_str(self._x1(dt, "./following-sibling::dd[1]//text()"))
- if not dt_text:
- continue
- if "批准文号" in dt_text:
- approval_number = self.replace_str(
- self._x1(
- dt,
- './following-sibling::dd[1]//div[contains(@class,"ybfont")]/text()',
- )
- )
- approval_number = self.parse_font(camp_dict, approval_number)
- if "生产企业" in dt_text:
- manufacturer = dd_text
- li_list = tree.xpath("//div[@id='slist']//ul[@class='slist']//li")
- for li in li_list:
- title = self.brand + self.replace_str(self._x1(li, ".//div[@class='info']//h3/a/text()"))
- if self.product not in title:
- return
- detail_url = self._x1(li, './/div[@class="info"]//a/@href')
- info_texts = li.xpath('.//div[@class="info"]//p//text()')
- info_str = self.replace_str("---------".join(info_texts))
- specification = ""
- specification_re = re.search(r"规格:(.{12})", info_str)
- if specification_re:
- specification = specification_re.group(1).strip().strip("-").strip()
- inventory = self.replace_str(
- self._x1(li, './/div[@class="info"]//label[@class="sreserve"]/text()')
- )
- price_raw = self._x1(
- li, './/div[@class="sale"]//span[contains(@class,"ybfont")]//text()'
- )
- sale_texts = li.xpath('.//div[@class="sale"]//p//text()')
- sale_str = self.replace_str("-".join(sale_texts))
- expiry_date = ""
- expiry_date_re = re.search(r" 剩余效期:(\d+) 天", sale_str)
- if expiry_date_re:
- expiry_date = expiry_date_re.group(1) + "天"
- shop = self.replace_str(
- self._x1(
- li,
- './/div[@class="shop"]//a[contains(@class,"stitle sc_store")]/text()',
- )
- )
- shop_url = self._x1(
- li,
- './/div[@class="shop"]//a[contains(@class,"stitle sc_store")]/@href',
- )
- shop_str = self.replace_str(self._x1(li, './/div[@class="shop"]//p//text()'))
- shop_url = "https:" + shop_url
- price = self.normalize_price(self.parse_font(camp_dict, price_raw))
- m_item = re.search(r"/(\d+)\.html", detail_url or "")
- m_shop = re.search(r"yaodian/(\d+)/", shop_url or "")
- if not m_item or not m_shop:
- continue
- item_id = m_item.group(1)
- detail_url = f"{self.base_url}{detail_url}"
- shop_id = m_shop.group(1)
- try:
- price = Decimal(str(price)).quantize(Decimal("0.00"))
- except (InvalidOperation, ValueError):
- price = Decimal("0.00")
- city_id = province_id = city = province = ""
- if shop_str:
- city_id, province_id, city, province = get_city(shop_str)
- snapshot_url = ""
- try:
- snapshot_url = self.get_page_detail(detail_url, item_id) or ""
- except Exception as e:
- logger.exception("详情页截图或上传失败 item_id=%s: %s", item_id, e)
- now = time.strftime("%Y-%m-%d %H:%M:%S")
- product = {
- "platform": self.platform,
- "item_id": item_id,
- "enterprise_id": self.company_id,
- "product_name": title,
- "spec": specification,
- "one_price": '',
- "detail_url": detail_url,
- "shop_name": shop,
- "anonymous_store_name": "",
- "shop_url": shop_url,
- "city_name": city,
- "city_id": city_id,
- "province_name": province,
- "province_id": province_id,
- "factory_name": manufacturer,
- "scrape_date": time.strftime("%Y-%m-%d"),
- "price": price,
- "sales": "",
- "stock_count": inventory,
- "snapshot_url": snapshot_url,
- "approval_num": approval_number,
- "produced_time": "",
- "deadline": expiry_date,
- "update_time": now,
- "insert_time": now,
- "number": 1,
- "product_brand": self.brand or "",
- "collect_task_id": self.collect_task_id,
- "search_name": self.product,
- "company_name": shop,
- "collect_config_info": json.dumps(
- {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
- "sampling_end_time": self.sampling_end_time}),
- "account_id": self.account_id,
- "collect_region_id": self.collect_region_id,
- "collect_round": self.collect_round,
- "is_sold_out": 0
- }
- try:
- self.pipeline.storge_data(product)
- logger.info(json.dumps(product, ensure_ascii=False, default=str))
- except Exception as e:
- logger.exception("写入数据库失败: %s", e)
- return len(li_list)
- def get_page_detail(self, detail_url, item_id):
- """打开详情页、截取 maininfo2 区域并上传 OSS,返回 URL;失败返回空字符串。"""
- self.driver.get(detail_url, timeout=10)
- time.sleep(2)
- ele = self.driver.ele("xpath=//div[@id='wrap']/div[contains(@class,'maininfo2')]")
- jpg_bytes = ele.get_screenshot(as_bytes="jpg")
- img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id))
- time.sleep(random.uniform(0.5, 1))
- return img_url
- def get_list(self, medicine_id):
- """按分页拉取同一药品详情下的报价列表,连续空页则停止。"""
- for page in range(1, MEDICINE_DETAIL_MAX_PAGES + 1):
- url = f"{self.base_url}/medicine/{medicine_id}/p{page}/"
- self.driver.get(url, timeout=10)
- data_items = self.parse_html(self.driver.html)
- if not data_items:
- break
- time.sleep(random.uniform(*WAIT_BETWEEN_PAGES))
- def _search_result_medicine_ids(self, html):
- """从搜索结果 HTML 解析 medicine_id,避免遍历 DrissionPage 元素导致 ElementLostError。"""
- tree = etree.HTML(html)
- if tree is None:
- return []
- li_list = tree.xpath("//div[@id='wrap']//ul[contains(@class,'goodlist_search')]/li")
- id_list = []
- for li in li_list:
- href_raw = self._x1(li, ".//a[@class='txt sc_medicine']/@href")
- titles = li.xpath(".//a[@class='txt sc_medicine']/@title")
- title = "".join(titles).replace("\n", "").strip()
- if self.product not in title:
- break
- if not href_raw:
- continue
- href = "https:" + href_raw if href_raw.startswith("//") else href_raw
- spec = self.replace_str(self._x1(li, ".//p[@class='st']/text()"))
- factory = self.replace_str(self._x1(li, ".//p[@class='st text-overflow']/text()"))
- id_list.append({"spec": spec, "href": href, "factory": factory})
- self.driver.get(href, timeout=10)
- res_html = etree.HTML(self.driver.html)
- lis = res_html.xpath("//div[@id='wrap']//ul[@class='other']//li")
- for li_ele in lis:
- spec = self.replace_str(self._x1(li_ele, "./a/text()"))
- href = self._x1(li_ele, "./a/@href")
- if not href:
- continue
- id_list.append({"spec": spec, "href": href, "factory": factory})
- return id_list
- def search_data(self):
- # 必须用局部变量,不能写 self.search_data = ... ,否则会覆盖掉本方法
- keyword = f"{self.brand} {self.product or ''}".strip() if self.brand else (self.product or "")
- if not keyword:
- logger.warning("关键词为空,跳过搜索")
- return
- url = f"{self.base_url}/search.html?keyword={quote(keyword)}"
- self.driver.get(url, timeout=10)
- time.sleep(random.uniform(0.8, 1.5))
- drug_list = self._search_result_medicine_ids(self.driver.html)
- id_dict = {}
- for drug in drug_list:
- spec = drug["spec"]
- href = drug["href"]
- if "x" in self.product_desc:
- spec = spec.replace("*", "x")
- if "*" in self.product_desc:
- spec = spec.replace("x", "*")
- print(self.product_desc, spec)
- if self.product_desc in spec:
- m = re.search(r"/medicine/(\d+)/", href or "")
- if not m:
- continue
- drug_id = m.group(1)
- if drug_id in id_dict:
- continue
- self.get_list(drug_id)
- id_dict[drug_id] = 1
- def run(self):
- if not self.task_dict:
- logger.info("未提供任务参数,跳过爬取")
- return 0
- try:
- self.init_drissionpage()
- self.search_data()
- except Exception as e:
- print(f"运行异常: {e}")
- self.is_success = False
- finally:
- if self.driver:
- self.driver.quit()
- logger.info(f"药房网爬取总数:{self.pipeline.crawl_count}条")
- return self.pipeline.crawl_count, self.is_success
|