yaofangwang_crawl.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. import json
  2. import random
  3. import re
  4. import time
  5. import socket
  6. from urllib.parse import quote
  7. from decimal import Decimal, InvalidOperation
  8. from lxml import etree
  9. from commons.Logger import get_spider_logger
  10. from DrissionPage import ChromiumPage, ChromiumOptions
  11. from pipelines.drug_pipelines import DrugPipeline
  12. from area_info.city_name_to_id import get_city
  13. from oss_upload.oss_upload import AliyunOSSUploader
  14. logger = get_spider_logger("yaofangwang")
  15. MEDICINE_DETAIL_MAX_PAGES = 100
  16. WAIT_BETWEEN_PAGES = (2, 4)
  17. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  18. class YaofangwangCrawl:
  19. def __init__(self, drug_dict=None):
  20. self.driver = None
  21. self.ip = ""
  22. self.base_url = "https://www.yaofangwang.com"
  23. self.ua = (
  24. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  25. "AppleWebKit/537.36 (KHTML, like Gecko) "
  26. "Chrome/124.0.0.0 Safari/537.36"
  27. )
  28. self.platform = "11"
  29. self.task_dict = drug_dict or {}
  30. self.collect_task_id = None
  31. self.pipeline = DrugPipeline("yaofangwang")
  32. if self.task_dict:
  33. self.get_product_data()
  34. self.ossuploader = AliyunOSSUploader()
  35. self.is_success = True
  36. self.account_name = ""
  37. def get_product_data(self):
  38. self.task_id = self.task_dict["id"]
  39. self.company_id = self.task_dict["company_id"]
  40. self.product = self.task_dict["product_name"]
  41. self.product_desc = self.task_dict.get("product_specs", "")
  42. self.brand = self.task_dict.get("product_brand", "")
  43. self.product_keyword = self.task_dict.get("product_keyword", "")
  44. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  45. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  46. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  47. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  48. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  49. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  50. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  51. self.collect_round = self.task_dict.get("collect_round", 1)
  52. @staticmethod
  53. def _x1(node, xp):
  54. vals = node.xpath(xp)
  55. return vals[0] if vals else ""
  56. @staticmethod
  57. def replace_str(text):
  58. if text:
  59. return text.replace("\r\n", "").strip()
  60. return ""
  61. @staticmethod
  62. def normalize_price(price):
  63. price = YaofangwangCrawl.replace_str(price)
  64. price = re.sub(r"[^0-9.]", "", price)
  65. if price.count(".") > 1:
  66. head, tail = price.split(".", 1)
  67. tail = tail.replace(".", "")
  68. price = f"{head}.{tail}"
  69. return price
  70. @staticmethod
  71. def _camp_dict_str_values(camp_dict):
  72. """字形表里的数字统一为字符串,避免 0 在 if v 中被当成假值。"""
  73. if not camp_dict:
  74. return camp_dict
  75. return {k: str(v) if isinstance(v, int) else v for k, v in camp_dict.items()}
  76. def get_font(self, font_url):
  77. camp_dict = {
  78. "CC5E": "0",
  79. "3E73": "1",
  80. "B561": "2",
  81. "0F88": "3",
  82. "351D": "4",
  83. "0ECC": "5",
  84. "E171": "6",
  85. "0FFF": "7",
  86. "2FCF": "8",
  87. "2992": "9",
  88. "1C09": "g",
  89. "9887": "m",
  90. "29BE": "x",
  91. "1ECC": "5",
  92. "D6C2": "0",
  93. "31ED": "1",
  94. "9F43": "2",
  95. "398D": "3",
  96. "9220": "4",
  97. "0ED3": "5",
  98. "5B02": "6",
  99. "69E5": "7",
  100. "B899": "8",
  101. "D0AC": "9",
  102. "4A84": "g",
  103. "72A7": "m",
  104. "8C8C": "x",
  105. "BBB9": "0",
  106. "A3CF": "1",
  107. "E7AB": "2",
  108. "B053": "3",
  109. "0ADD": "4",
  110. "9322": "5",
  111. "A719": "6",
  112. "5C70": "7",
  113. "24CC": "8",
  114. "9B54": "9",
  115. "7F78": "Z",
  116. "4203": "H",
  117. "9F3A": "J",
  118. }
  119. return self._camp_dict_str_values(camp_dict)
  120. @staticmethod
  121. def parse_font(camp_dict, raw_str):
  122. raw_str = raw_str or ""
  123. result = []
  124. for ch in raw_str:
  125. cp = ord(ch)
  126. glyph_name = f"{cp:04X}"
  127. if glyph_name in camp_dict and camp_dict[glyph_name]:
  128. result.append(camp_dict[glyph_name])
  129. elif ch in ".:-~ ":
  130. result.append(ch)
  131. else:
  132. result.append(ch)
  133. return "".join(result)
  134. @staticmethod
  135. def _get_free_port():
  136. """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
  137. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  138. s.bind(("127.0.0.1", 0))
  139. return s.getsockname()[1]
  140. def init_drissionpage(self):
  141. co = ChromiumOptions().set_browser_path(chrome_path)
  142. # 获取独立端口
  143. debug_port = self._get_free_port()
  144. # # 设置用户目录(每个账号独立)
  145. # co.set_user_data_path(f"./{self.account_name}")
  146. # 设置端口(重要:两个都要设置)
  147. co.set_local_port(debug_port) # DrissionPage 内部端口
  148. co.set_argument(f"--remote-debugging-port={debug_port}") # Chrome 调试端口
  149. co.set_argument("--remote-debugging-address=127.0.0.1")
  150. # 基础参数
  151. co.set_argument("--disable-dev-shm-usage")
  152. co.set_argument("--no-first-run") # 避免首次运行弹窗
  153. co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
  154. co.set_user_agent(
  155. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36')
  156. # co.headless()
  157. if self.ip:
  158. proxy = self.ip.strip()
  159. if not proxy.startswith(("http://", "https://")):
  160. proxy = f"http://{proxy}"
  161. co.set_argument(f"--proxy-server={proxy}")
  162. self.driver = ChromiumPage(co)
  163. def parse_html(self, html):
  164. """解析详情页 HTML,返回 (data_items, shop_items);无列表节点时返回 ([], [])。"""
  165. tree = etree.HTML(html)
  166. if tree is None:
  167. return [], []
  168. font_match = re.search(r"(/fonts/\w+\.ttf)", html)
  169. camp_dict = {}
  170. if font_match:
  171. font_url = self.base_url + font_match.group(1)
  172. camp_dict = self.get_font(font_url)
  173. approval_number = ""
  174. manufacturer = ""
  175. for dt in tree.xpath("//div[@id='wrap']//dl[@class='clearfix']//dt"):
  176. dt_text = self.replace_str(self._x1(dt, "./text()"))
  177. dd_text = self.replace_str(self._x1(dt, "./following-sibling::dd[1]//text()"))
  178. if not dt_text:
  179. continue
  180. if "批准文号" in dt_text:
  181. approval_number = self.replace_str(
  182. self._x1(
  183. dt,
  184. './following-sibling::dd[1]//div[contains(@class,"ybfont")]/text()',
  185. )
  186. )
  187. approval_number = self.parse_font(camp_dict, approval_number)
  188. if "生产企业" in dt_text:
  189. manufacturer = dd_text
  190. li_list = tree.xpath("//div[@id='slist']//ul[@class='slist']//li")
  191. for li in li_list:
  192. title = self.brand + self.replace_str(self._x1(li, ".//div[@class='info']//h3/a/text()"))
  193. if self.product not in title:
  194. return
  195. detail_url = self._x1(li, './/div[@class="info"]//a/@href')
  196. info_texts = li.xpath('.//div[@class="info"]//p//text()')
  197. info_str = self.replace_str("---------".join(info_texts))
  198. specification = ""
  199. specification_re = re.search(r"规格:(.{12})", info_str)
  200. if specification_re:
  201. specification = specification_re.group(1).strip().strip("-").strip()
  202. inventory = self.replace_str(
  203. self._x1(li, './/div[@class="info"]//label[@class="sreserve"]/text()')
  204. )
  205. price_raw = self._x1(
  206. li, './/div[@class="sale"]//span[contains(@class,"ybfont")]//text()'
  207. )
  208. sale_texts = li.xpath('.//div[@class="sale"]//p//text()')
  209. sale_str = self.replace_str("-".join(sale_texts))
  210. expiry_date = ""
  211. expiry_date_re = re.search(r" 剩余效期:(\d+) 天", sale_str)
  212. if expiry_date_re:
  213. expiry_date = expiry_date_re.group(1) + "天"
  214. shop = self.replace_str(
  215. self._x1(
  216. li,
  217. './/div[@class="shop"]//a[contains(@class,"stitle sc_store")]/text()',
  218. )
  219. )
  220. shop_url = self._x1(
  221. li,
  222. './/div[@class="shop"]//a[contains(@class,"stitle sc_store")]/@href',
  223. )
  224. shop_str = self.replace_str(self._x1(li, './/div[@class="shop"]//p//text()'))
  225. shop_url = "https:" + shop_url
  226. price = self.normalize_price(self.parse_font(camp_dict, price_raw))
  227. m_item = re.search(r"/(\d+)\.html", detail_url or "")
  228. m_shop = re.search(r"yaodian/(\d+)/", shop_url or "")
  229. if not m_item or not m_shop:
  230. continue
  231. item_id = m_item.group(1)
  232. detail_url = f"{self.base_url}{detail_url}"
  233. shop_id = m_shop.group(1)
  234. try:
  235. price = Decimal(str(price)).quantize(Decimal("0.00"))
  236. except (InvalidOperation, ValueError):
  237. price = Decimal("0.00")
  238. city_id = province_id = city = province = ""
  239. if shop_str:
  240. city_id, province_id, city, province = get_city(shop_str)
  241. snapshot_url = ""
  242. try:
  243. snapshot_url = self.get_page_detail(detail_url, item_id) or ""
  244. except Exception as e:
  245. logger.exception("详情页截图或上传失败 item_id=%s: %s", item_id, e)
  246. now = time.strftime("%Y-%m-%d %H:%M:%S")
  247. product = {
  248. "platform": self.platform,
  249. "item_id": item_id,
  250. "enterprise_id": self.company_id,
  251. "product_name": title,
  252. "spec": specification,
  253. "one_price": '',
  254. "detail_url": detail_url,
  255. "shop_name": shop,
  256. "anonymous_store_name": "",
  257. "shop_url": shop_url,
  258. "city_name": city,
  259. "city_id": city_id,
  260. "province_name": province,
  261. "province_id": province_id,
  262. "factory_name": manufacturer,
  263. "scrape_date": time.strftime("%Y-%m-%d"),
  264. "price": price,
  265. "sales": "",
  266. "stock_count": inventory,
  267. "snapshot_url": snapshot_url,
  268. "approval_num": approval_number,
  269. "produced_time": "",
  270. "deadline": expiry_date,
  271. "update_time": now,
  272. "insert_time": now,
  273. "number": 1,
  274. "product_brand": self.brand or "",
  275. "collect_task_id": self.collect_task_id,
  276. "search_name": self.product,
  277. "company_name": shop,
  278. "collect_config_info": json.dumps(
  279. {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
  280. "sampling_end_time": self.sampling_end_time}),
  281. "account_id": self.account_id,
  282. "collect_region_id": self.collect_region_id,
  283. "collect_round": self.collect_round,
  284. "is_sold_out": 0
  285. }
  286. try:
  287. self.pipeline.storge_data(product)
  288. logger.info(json.dumps(product, ensure_ascii=False, default=str))
  289. except Exception as e:
  290. logger.exception("写入数据库失败: %s", e)
  291. return len(li_list)
  292. def get_page_detail(self, detail_url, item_id):
  293. """打开详情页、截取 maininfo2 区域并上传 OSS,返回 URL;失败返回空字符串。"""
  294. self.driver.get(detail_url, timeout=10)
  295. time.sleep(2)
  296. ele = self.driver.ele("xpath=//div[@id='wrap']/div[contains(@class,'maininfo2')]")
  297. jpg_bytes = ele.get_screenshot(as_bytes="jpg")
  298. img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id))
  299. time.sleep(random.uniform(0.5, 1))
  300. return img_url
  301. def get_list(self, medicine_id):
  302. """按分页拉取同一药品详情下的报价列表,连续空页则停止。"""
  303. for page in range(1, MEDICINE_DETAIL_MAX_PAGES + 1):
  304. url = f"{self.base_url}/medicine/{medicine_id}/p{page}/"
  305. self.driver.get(url, timeout=10)
  306. data_items = self.parse_html(self.driver.html)
  307. if not data_items:
  308. break
  309. time.sleep(random.uniform(*WAIT_BETWEEN_PAGES))
  310. def _search_result_medicine_ids(self, html):
  311. """从搜索结果 HTML 解析 medicine_id,避免遍历 DrissionPage 元素导致 ElementLostError。"""
  312. tree = etree.HTML(html)
  313. if tree is None:
  314. return []
  315. li_list = tree.xpath("//div[@id='wrap']//ul[contains(@class,'goodlist_search')]/li")
  316. id_list = []
  317. for li in li_list:
  318. href_raw = self._x1(li, ".//a[@class='txt sc_medicine']/@href")
  319. titles = li.xpath(".//a[@class='txt sc_medicine']/@title")
  320. title = "".join(titles).replace("\n", "").strip()
  321. if self.product not in title:
  322. break
  323. if not href_raw:
  324. continue
  325. href = "https:" + href_raw if href_raw.startswith("//") else href_raw
  326. spec = self.replace_str(self._x1(li, ".//p[@class='st']/text()"))
  327. factory = self.replace_str(self._x1(li, ".//p[@class='st text-overflow']/text()"))
  328. id_list.append({"spec": spec, "href": href, "factory": factory})
  329. self.driver.get(href, timeout=10)
  330. res_html = etree.HTML(self.driver.html)
  331. lis = res_html.xpath("//div[@id='wrap']//ul[@class='other']//li")
  332. for li_ele in lis:
  333. spec = self.replace_str(self._x1(li_ele, "./a/text()"))
  334. href = self._x1(li_ele, "./a/@href")
  335. if not href:
  336. continue
  337. id_list.append({"spec": spec, "href": href, "factory": factory})
  338. return id_list
  339. def search_data(self):
  340. # 必须用局部变量,不能写 self.search_data = ... ,否则会覆盖掉本方法
  341. keyword = f"{self.brand} {self.product or ''}".strip() if self.brand else (self.product or "")
  342. if not keyword:
  343. logger.warning("关键词为空,跳过搜索")
  344. return
  345. url = f"{self.base_url}/search.html?keyword={quote(keyword)}"
  346. self.driver.get(url, timeout=10)
  347. time.sleep(random.uniform(0.8, 1.5))
  348. drug_list = self._search_result_medicine_ids(self.driver.html)
  349. id_dict = {}
  350. for drug in drug_list:
  351. spec = drug["spec"]
  352. href = drug["href"]
  353. if "x" in self.product_desc:
  354. spec = spec.replace("*", "x")
  355. if "*" in self.product_desc:
  356. spec = spec.replace("x", "*")
  357. print(self.product_desc, spec)
  358. if self.product_desc in spec:
  359. m = re.search(r"/medicine/(\d+)/", href or "")
  360. if not m:
  361. continue
  362. drug_id = m.group(1)
  363. if drug_id in id_dict:
  364. continue
  365. self.get_list(drug_id)
  366. id_dict[drug_id] = 1
  367. def run(self):
  368. if not self.task_dict:
  369. logger.info("未提供任务参数,跳过爬取")
  370. return 0
  371. try:
  372. self.init_drissionpage()
  373. self.search_data()
  374. except Exception as e:
  375. print(f"运行异常: {e}")
  376. self.is_success = False
  377. finally:
  378. if self.driver:
  379. self.driver.quit()
  380. logger.info(f"药房网爬取总数:{self.pipeline.crawl_count}条")
  381. return self.pipeline.crawl_count, self.is_success