snapshot_taobao_crawl.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. import hashlib
  2. import json
  3. import random
  4. import re
  5. import time
  6. from decimal import Decimal, InvalidOperation
  7. from curl_cffi import requests
  8. from lxml import etree
  9. from commons.Logger import get_spider_logger
  10. from commons.conn_mysql import MySQLPoolOnline
  11. from pipelines.drug_pipelines import DrugPipeline
  12. from spiders.taobao.snapshot_taobao_login import (TaobaoAutoCrawl)
  13. from area_info.city_name_to_id import get_city
  14. from oss_upload.oss_upload import AliyunOSSUploader
  15. logger = get_spider_logger("taobao")
  16. from urllib.parse import quote
  17. MTOP_APP_KEY = "12574478"
  18. MTOP_APP_ID = "34385"
  19. SEARCH_MAX_PAGE = 20
  20. REQUEST_RETRY_COUNT = 3
  21. COOKIE_MAX_AGE_SEC = 3600
  22. def build_taobao_search_url(keyword: str, page: int = 1) -> str:
  23. """
  24. 构建淘宝搜索URL
  25. 参数:
  26. keyword: 搜索关键词 (例如: "999 玉屏风口服液 10支")
  27. page: 页码,从1开始
  28. 返回:
  29. 完整的淘宝搜索URL字符串
  30. """
  31. # 对关键词进行URL编码(空格转为%20)
  32. encoded_keyword = quote(keyword, safe='').replace(' ', '%20')
  33. # 固定参数
  34. fixed_params = {
  35. "_input_charset": "utf-8",
  36. "commend": "all",
  37. "ie": "utf8",
  38. "preLoadOrigin": "https://www.taobao.com",
  39. "search_type": "item",
  40. "source": "suggest",
  41. "sourceId": "tb.index",
  42. "spm": "a21bo.jianhua/a.search_history.d1",
  43. "ssid": "s5-e",
  44. "tab": "all",
  45. "suggest_query": "",
  46. }
  47. # 动态参数
  48. dynamic_params = {
  49. "q": encoded_keyword,
  50. "page": str(page),
  51. }
  52. # 合并参数
  53. all_params = {**fixed_params, **dynamic_params}
  54. # 构建查询字符串并返回完整URL
  55. query_string = "&".join([f"{k}={v}" for k, v in all_params.items()])
  56. return f"https://s.taobao.com/search?{query_string}"
  57. def extract_item_data(item_element):
  58. """
  59. 从商品元素中提取数据
  60. """
  61. result = {
  62. "item_id": "",
  63. "title": "",
  64. "price": "",
  65. "realSales": "",
  66. "shopInfo": {"title": ""},
  67. "procity": "",
  68. "auctionURL": ""
  69. }
  70. # 1. 提取 item_id - 从 a 标签的 id 属性
  71. a_elem = item_element.ele('xpath=.//a[contains(@id, "item_id_")]')
  72. if a_elem:
  73. item_id_full = a_elem.attr('id')
  74. if item_id_full:
  75. result["item_id"] = item_id_full.replace("item_id_", "")
  76. # 2. 提取 title - 从 div 的 title 属性
  77. title_elem = item_element.ele('xpath=.//div[contains(@class, "title--")]')
  78. if title_elem:
  79. title = title_elem.attr('title')
  80. if not title:
  81. # 如果没有 title 属性,取文本内容
  82. title = title_elem.text
  83. result["title"] = title
  84. # 3. 提取 price - 整数部分 + 小数部分
  85. price_int = item_element.ele('xpath=.//div[contains(@class, "priceInt--")]')
  86. price_float = item_element.ele('xpath=.//div[contains(@class, "priceFloat--")]')
  87. if price_int and price_float:
  88. result["price"] = f"{price_int.text}.{price_float.text.replace('.', '')}"
  89. # 4. 提取 realSales (销量)
  90. sales_elem = item_element.ele('xpath=.//span[contains(@class, "realSales--")]')
  91. if sales_elem:
  92. result["realSales"] = sales_elem.text
  93. # 5. 提取 shopInfo.title (店铺名称)
  94. shop_elem = item_element.ele('xpath=.//span[contains(@class, "shopNameText--")]')
  95. if shop_elem:
  96. result["shopInfo"]["title"] = shop_elem.text
  97. # 6. 提取 procity (发货地)
  98. procity_elem = item_element.ele('xpath=.//div[contains(@class, "procity--")]/span')
  99. if procity_elem:
  100. result["procity"] = procity_elem.text
  101. # 7. 提取 auctionURL
  102. if a_elem:
  103. href = a_elem.attr('href')
  104. if href:
  105. result["auctionURL"] = href
  106. elif result["item_id"]:
  107. result["auctionURL"] = f"https://item.taobao.com/item.htm?id={result['item_id']}"
  108. return result
  109. headers = {
  110. "accept": "*/*",
  111. "accept-language": "zh-CN,zh;q=0.9",
  112. "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall",
  113. "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
  114. "sec-ch-ua-mobile": "?0",
  115. "sec-ch-ua-platform": '"Windows"',
  116. "sec-fetch-dest": "script",
  117. "sec-fetch-mode": "no-cors",
  118. "sec-fetch-site": "same-site",
  119. "user-agent": (
  120. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  121. "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
  122. ),
  123. }
  124. MTOP_URL = (
  125. "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/"
  126. )
  127. class TaobaoCrawl:
  128. def __init__(self, drug_dict=None):
  129. self.cookies = None
  130. self.db = MySQLPoolOnline()
  131. self.pipeline = DrugPipeline("taobao")
  132. self.session = None
  133. self.proxies = None
  134. self.account_name = None
  135. self.ip = None
  136. self.cookie_stamp = None
  137. self.platform = 1
  138. self.task_dict = drug_dict or {}
  139. self.collect_task_id = None
  140. self.success = True
  141. if self.task_dict:
  142. self.get_product_data()
  143. self.is_no_product = 0
  144. self.driver=''
  145. self.ossuploader = AliyunOSSUploader()
  146. def get_product_data(self):
  147. self.task_id = self.task_dict["id"]
  148. self.company_id = self.task_dict["company_id"]
  149. self.product = self.task_dict["product_name"]
  150. self.product_desc = self.task_dict.get("product_specs", "")
  151. self.brand = self.task_dict.get("product_brand", "")
  152. self.product_keyword = self.task_dict.get("product_keyword", "")
  153. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  154. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  155. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  156. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  157. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  158. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  159. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  160. self.collect_round = self.task_dict.get("collect_round", 1)
  161. @staticmethod
  162. def _normalize_url(url):
  163. if not url:
  164. return ""
  165. url = str(url)
  166. if url.startswith("//"):
  167. return "https:" + url
  168. return url
  169. @staticmethod
  170. def _extract_shop_id(shop_url):
  171. if not shop_url:
  172. return ""
  173. shop_re = re.search(r"appUid=(\w+)", shop_url)
  174. if shop_re:
  175. return shop_re.group(1)
  176. return hashlib.md5(shop_url.encode("utf-8")).hexdigest()
  177. @staticmethod
  178. def _sql_literal(value):
  179. """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。"""
  180. if value is None:
  181. return ""
  182. return str(value).replace("'", "''").replace("\\", "\\\\")
  183. def init_session(self):
  184. self.session = requests.Session(impersonate="chrome124")
  185. self.session.cookies.update(self.cookies or {})
  186. self.session.headers.update(headers)
  187. if self.proxies:
  188. self.session.proxies.update(self.proxies)
  189. @staticmethod
  190. def _is_transport_error(err):
  191. msg = str(err or "")
  192. return ("curl: (16)" in msg) or ("Failed to perform" in msg)
  193. def get_token(self, t, app_key, data_str):
  194. _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "")
  195. token = _m_h5_tk.split("_")[0] if _m_h5_tk else ""
  196. text = f"{token}&{t}&{app_key}&{data_str}"
  197. return hashlib.md5(text.encode()).hexdigest()
  198. def get_html_content(self, res_html):
  199. if not res_html:
  200. return ""
  201. ele_html = etree.HTML(res_html)
  202. if ele_html is None:
  203. return str(res_html)
  204. text_list = ele_html.xpath(".//text()")
  205. return "".join(text_list)
  206. def _build_search_payload(self, keyword, page, page_size=50):
  207. return {
  208. "appId": MTOP_APP_ID,
  209. "params": {
  210. "device": "HMA-AL00",
  211. "isBeta": "false",
  212. "grayHair": "false",
  213. "from": "nt_history",
  214. "brand": "HUAWEI",
  215. "info": "wifi",
  216. "index": "4",
  217. "rainbow": "",
  218. "schemaType": "auction",
  219. "elderHome": "false",
  220. "isEnterSrpSearch": "true",
  221. "newSearch": "false",
  222. "network": "wifi",
  223. "subtype": "",
  224. "hasPreposeFilter": "false",
  225. "prepositionVersion": "v2",
  226. "client_os": "Android",
  227. "gpsEnabled": "false",
  228. "searchDoorFrom": "srp",
  229. "debug_rerankNewOpenCard": "false",
  230. "homePageVersion": "v7",
  231. "searchElderHomeOpen": "false",
  232. "search_action": "initiative",
  233. "sugg": "_4_1",
  234. "sversion": "13.6",
  235. "style": "list",
  236. "ttid": "600000@taobao_pc_10.7.0",
  237. "needTabs": "true",
  238. "areaCode": "CN",
  239. "vm": "nw",
  240. "countryNum": "156",
  241. "m": "pc",
  242. "page": page,
  243. "n": 48,
  244. "q": keyword,
  245. "qSource": "url",
  246. "pageSource": "",
  247. "channelSrp": "",
  248. "tab": "all",
  249. "pageSize": str(page_size),
  250. "sourceS": "2",
  251. "ntoffset": "0",
  252. "filterTag": "",
  253. "service": "",
  254. "prop": "",
  255. "loc": "",
  256. "categoryp": "",
  257. "screenResolution": "1920x1080",
  258. "viewResolution": "1092x4722",
  259. "userAgent": headers["user-agent"],
  260. "couponUnikey": "",
  261. "subTabId": "",
  262. "np": "",
  263. "clientType": "h5",
  264. "isNewDomainAb": "false",
  265. "forceOldDomain": "false",
  266. },
  267. }
  268. def _request_search_page(self, keyword, page):
  269. t = str(int(time.time() * 1000))
  270. data = self._build_search_payload(keyword, page)
  271. data_str = json.dumps(data, separators=(",", ":"))
  272. sign = self.get_token(t, MTOP_APP_KEY, data_str)
  273. params = {
  274. "jsv": "2.7.4",
  275. "appKey": MTOP_APP_KEY,
  276. "t": t,
  277. "sign": sign,
  278. "api": "mtop.relationrecommend.wirelessrecommend.recommend",
  279. "v": "2.0",
  280. "timeout": "10000",
  281. "type": "jsonp",
  282. "dataType": "jsonp",
  283. "callback": "",
  284. "data": data_str,
  285. }
  286. return self.session.get(MTOP_URL, params=params, timeout=30)
  287. def _parse_jsonp_body(self, res_text):
  288. res_text = (res_text or "").strip()
  289. json_str = res_text
  290. m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL)
  291. if m:
  292. json_str = m.group(1)
  293. return json.loads(json_str)
  294. def get_search(self):
  295. keyword = self.product
  296. if self.brand:
  297. keyword = (self.brand + " " + self.product).strip()
  298. if self.product_desc:
  299. keyword = (keyword + " " + self.product_desc).strip()
  300. for page in range(1, SEARCH_MAX_PAGE + 1):
  301. logger.info(f"正在爬取关键词:{keyword},{page}页数据")
  302. # input_box = self.driver.ele('xpath=//*[@id="q"]')
  303. # input_box.input(keyword)
  304. # time.sleep(1)
  305. # button = self.driver.ele('xpath=//*[@id="J_TSearchForm"]/div[2]/button')
  306. # button.click()
  307. # time.sleep(2)
  308. base_url = build_taobao_search_url(keyword)
  309. tab = self.driver.latest_tab
  310. tab.listen.start('https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/') # 开始监听,指定获取包含该文本的数据包
  311. if page==1:
  312. tab.get(base_url)
  313. else:
  314. next_btn = tab.ele('xpath=//button[contains(@aria-label, "下一页")]')
  315. if not next_btn.click():
  316. break
  317. for _ in range(5):
  318. response = tab.listen.wait() # 等待并获取一个数据包
  319. if len(response.url)>3000:
  320. res = response.response.raw_body
  321. break
  322. else:
  323. continue
  324. time.sleep(1.5)
  325. try:
  326. json_data = self._parse_jsonp_body(res)
  327. item_array = json_data.get("data", {}).get("itemsArray", [])
  328. except Exception as e:
  329. logger.warning(
  330. "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s",
  331. self.account_name,
  332. e,
  333. )
  334. break
  335. if not item_array:
  336. logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page)
  337. return
  338. elems = tab.eles('xpath=//*[@id="content_items_wrapper"]/div')
  339. for s,raw in enumerate(item_array):
  340. try:
  341. item_id = raw.get("item_id", "")
  342. if not item_id:
  343. continue
  344. item_title = self.get_html_content(raw.get("title") or "")
  345. if self.brand not in item_title:
  346. self.is_no_product += 1
  347. continue
  348. if self.product not in item_title:
  349. self.is_no_product += 1
  350. continue
  351. if "+" in item_title:
  352. continue
  353. if self.product_desc:
  354. if self.product_desc in item_title:
  355. crawl_product_desc = self.product_desc
  356. else:
  357. crawl_product_desc = ""
  358. else:
  359. crawl_product_desc = ""
  360. self.is_no_product = 0
  361. status = 1
  362. if self.product_keyword:
  363. search_keyword_list = self.product_keyword.split(",")
  364. for search_keyword in search_keyword_list:
  365. if search_keyword.strip() not in item_title:
  366. status = 0
  367. if status == 0:
  368. continue
  369. item_price = raw.get("price")
  370. item_price_show = raw.get("priceShow", {}).get("price", 0)
  371. item_sales = raw.get("realSales") or ""
  372. sale_num = ""
  373. sales_m = re.search(r"(.*?)人付款", item_sales)
  374. if sales_m:
  375. sale_num = sales_m.group(1)
  376. item_url = self._normalize_url(raw.get("auctionURL"))
  377. shop_name = raw.get("shopInfo", {}).get("title", "")
  378. area_str = (raw.get("procity", "") or "").strip()
  379. city_id, province_id, city, province = get_city(area_str)
  380. shop_url = self._normalize_url(
  381. raw.get("shopInfo", {}).get("url", "")
  382. )
  383. structured_list = raw.get("structuredUSPInfo",{})
  384. for structured in structured_list:
  385. if structured.get("propertyName","") == "规格":
  386. crawl_product_desc = structured.get("propertyValueName","")
  387. pic_path = raw.get("pic_path", "")
  388. raw_price = item_price_show
  389. if raw_price in (None, ""):
  390. price = Decimal("0.00")
  391. else:
  392. try:
  393. price = Decimal(str(raw_price)).quantize(Decimal("0.00"))
  394. except (InvalidOperation, ValueError):
  395. price = Decimal("0.00")
  396. upload_key = hashlib.md5(item_url.encode("utf-8")).hexdigest()
  397. try:
  398. jpg_bytes = elems[s].get_screenshot(as_bytes="jpg")
  399. snapshot_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
  400. except:
  401. snapshot_url=''
  402. scrape_date = time.strftime("%Y-%m-%d")
  403. update_time = time.strftime("%Y-%m-%d %H:%M:%S")
  404. #snapshot_url = self._normalize_url(pic_path) if pic_path else ""
  405. # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
  406. product = {
  407. "platform": self.platform,
  408. "item_id": item_id,
  409. "enterprise_id": self.company_id,
  410. "product_name": item_title,
  411. "spec": crawl_product_desc,
  412. "one_price": "",
  413. "detail_url": item_url,
  414. "shop_name": shop_name,
  415. "anonymous_store_name": "",
  416. "shop_url": shop_url,
  417. "city_name": "",
  418. "city_id": "",
  419. "province_name": "",
  420. "province_id": "",
  421. "shipment_city_name": city,
  422. "shipment_city_id": city_id,
  423. "shipment_province_name": province,
  424. "shipment_province_id": province_id,
  425. "area_info": area_str,
  426. "factory_name": "",
  427. "scrape_date": scrape_date,
  428. "price": price,
  429. "sales": sale_num,
  430. "stock_count": "",
  431. "snapshot_url": snapshot_url,
  432. "approval_num": "",
  433. "produced_time": "",
  434. "deadline": "",
  435. "update_time": update_time,
  436. "insert_time": update_time,
  437. "number": 1,
  438. "product_brand": self.brand or "",
  439. "collect_task_id": self.collect_task_id,
  440. "search_name": self.product,
  441. "company_name": "",
  442. "collect_config_info": json.dumps(
  443. {
  444. "sampling_cycle": self.sampling_cycle,
  445. "sampling_start_time": self.sampling_start_time,
  446. "sampling_end_time": self.sampling_end_time,
  447. }
  448. ),
  449. "account_id": self.account_id,
  450. "collect_region_id": self.collect_region_id,
  451. "collect_round": self.collect_round,
  452. "is_sold_out": 0
  453. }
  454. try:
  455. self.pipeline.storge_data(product)
  456. logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
  457. except Exception as e:
  458. logger.exception("写入数据库失败: %s", e)
  459. except:
  460. continue
  461. logger.info(
  462. "关键词 %s 第 %s 页爬取完成",
  463. keyword,
  464. page,
  465. )
  466. total_page = (
  467. (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage")
  468. )
  469. try:
  470. total_page_int = int(total_page) if total_page is not None else 50
  471. except (TypeError, ValueError):
  472. total_page_int = 50
  473. if page >= total_page_int:
  474. break
  475. if self.is_no_product > 20:
  476. break
  477. sleep_second = random.uniform(30, 60)
  478. logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second)
  479. time.sleep(sleep_second)
  480. def update_cookie(self):
  481. taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product)
  482. self.driver=taobao_auto.run()
  483. if not self.driver :
  484. return False
  485. return True
  486. def get_account(self):
  487. sql_account = """
  488. SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0
  489. """
  490. account_list = self.db.select_data(sql_account,self.account_id)
  491. if not account_list:
  492. return False
  493. account_dict = account_list[0]
  494. self.ip = account_dict.get("ip")
  495. cookie_str = account_dict.get("cookie_str")
  496. self.ip = account_dict.get("ip")
  497. self.account_name = account_dict.get("username")
  498. self.login_username = account_dict.get("phone", "")
  499. self.login_password = account_dict.get("password", "")
  500. self.cookie_stamp = account_dict.get("update_time")
  501. if self.ip:
  502. account_proxy = f"http://{self.ip}"
  503. self.proxies = {"http": account_proxy, "https": account_proxy}
  504. else:
  505. self.proxies = None
  506. need_refresh = (
  507. not cookie_str
  508. or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC
  509. )
  510. if 1:
  511. if not self.update_cookie():
  512. return False
  513. logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
  514. return True
  515. def run(self):
  516. if not self.get_account():
  517. logger.info("==================当前无账号可用==================")
  518. self.success = False
  519. return self.pipeline.crawl_count, self.success
  520. logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
  521. self.get_search()
  522. self.driver.quit()
  523. logger.info(
  524. "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条",
  525. self.task_id,
  526. self.product,
  527. self.pipeline.crawl_count,
  528. )
  529. return self.pipeline.crawl_count, self.success