taobao_crawl.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. import hashlib
  2. import json
  3. import random
  4. import re
  5. import time
  6. from decimal import Decimal, InvalidOperation
  7. from curl_cffi import requests
  8. from lxml import etree
  9. from commons.Logger import get_spider_logger
  10. from commons.conn_mysql import MySQLPool
  11. from pipelines.drug_pipelines import DrugPipeline
  12. from spiders.taobao.taobao_login import (TaobaoAutoCrawl)
  13. from area_info.city_name_to_id import get_city
  14. logger = get_spider_logger("taobao")
  15. MTOP_APP_KEY = "12574478"
  16. MTOP_APP_ID = "34385"
  17. SEARCH_MAX_PAGE = 10
  18. REQUEST_RETRY_COUNT = 3
  19. COOKIE_MAX_AGE_SEC = 1800
  20. headers = {
  21. "accept": "*/*",
  22. "accept-language": "zh-CN,zh;q=0.9",
  23. "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall",
  24. "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
  25. "sec-ch-ua-mobile": "?0",
  26. "sec-ch-ua-platform": '"Windows"',
  27. "sec-fetch-dest": "script",
  28. "sec-fetch-mode": "no-cors",
  29. "sec-fetch-site": "same-site",
  30. "user-agent": (
  31. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  32. "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
  33. ),
  34. }
  35. MTOP_URL = (
  36. "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/"
  37. )
  38. class TaobaoCrawl:
  39. def __init__(self, drug_dict=None):
  40. self.cookies = None
  41. self.db = MySQLPool()
  42. self.pipeline = DrugPipeline("taobao")
  43. self.session = None
  44. self.proxies = None
  45. self.account_name = None
  46. self.ip = None
  47. self.cookie_stamp = None
  48. self.platform = 1
  49. self.task_dict = drug_dict or {}
  50. self.collect_task_id = None
  51. self.success = True
  52. if self.task_dict:
  53. self.get_product_data()
  54. self.is_no_product = 0
  55. def get_product_data(self):
  56. self.task_id = self.task_dict["id"]
  57. self.company_id = self.task_dict["company_id"]
  58. self.product = self.task_dict["product_name"]
  59. self.product_desc = self.task_dict.get("product_specs", "")
  60. self.brand = self.task_dict.get("product_brand", "")
  61. self.product_keyword = self.task_dict.get("product_keyword", "")
  62. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  63. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  64. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  65. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  66. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  67. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  68. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  69. self.collect_round = self.task_dict.get("collect_round", 1)
  70. @staticmethod
  71. def _normalize_url(url):
  72. if not url:
  73. return ""
  74. url = str(url)
  75. if url.startswith("//"):
  76. return "https:" + url
  77. return url
  78. @staticmethod
  79. def _extract_shop_id(shop_url):
  80. if not shop_url:
  81. return ""
  82. shop_re = re.search(r"appUid=(\w+)", shop_url)
  83. if shop_re:
  84. return shop_re.group(1)
  85. return hashlib.md5(shop_url.encode("utf-8")).hexdigest()
  86. @staticmethod
  87. def _sql_literal(value):
  88. """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。"""
  89. if value is None:
  90. return ""
  91. return str(value).replace("'", "''").replace("\\", "\\\\")
  92. def init_session(self):
  93. self.session = requests.Session(impersonate="chrome124")
  94. self.session.cookies.update(self.cookies or {})
  95. self.session.headers.update(headers)
  96. if self.proxies:
  97. self.session.proxies.update(self.proxies)
  98. @staticmethod
  99. def _is_transport_error(err):
  100. msg = str(err or "")
  101. return ("curl: (16)" in msg) or ("Failed to perform" in msg)
  102. def get_token(self, t, app_key, data_str):
  103. _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "")
  104. token = _m_h5_tk.split("_")[0] if _m_h5_tk else ""
  105. text = f"{token}&{t}&{app_key}&{data_str}"
  106. return hashlib.md5(text.encode()).hexdigest()
  107. def get_html_content(self, res_html):
  108. if not res_html:
  109. return ""
  110. ele_html = etree.HTML(res_html)
  111. if ele_html is None:
  112. return str(res_html)
  113. text_list = ele_html.xpath(".//text()")
  114. return "".join(text_list)
  115. def _build_search_payload(self, keyword, page, page_size=50):
  116. return {
  117. "appId": MTOP_APP_ID,
  118. "params": {
  119. "device": "HMA-AL00",
  120. "isBeta": "false",
  121. "grayHair": "false",
  122. "from": "nt_history",
  123. "brand": "HUAWEI",
  124. "info": "wifi",
  125. "index": "4",
  126. "rainbow": "",
  127. "schemaType": "auction",
  128. "elderHome": "false",
  129. "isEnterSrpSearch": "true",
  130. "newSearch": "false",
  131. "network": "wifi",
  132. "subtype": "",
  133. "hasPreposeFilter": "false",
  134. "prepositionVersion": "v2",
  135. "client_os": "Android",
  136. "gpsEnabled": "false",
  137. "searchDoorFrom": "srp",
  138. "debug_rerankNewOpenCard": "false",
  139. "homePageVersion": "v7",
  140. "searchElderHomeOpen": "false",
  141. "search_action": "initiative",
  142. "sugg": "_4_1",
  143. "sversion": "13.6",
  144. "style": "list",
  145. "ttid": "600000@taobao_pc_10.7.0",
  146. "needTabs": "true",
  147. "areaCode": "CN",
  148. "vm": "nw",
  149. "countryNum": "156",
  150. "m": "pc",
  151. "page": page,
  152. "n": 48,
  153. "q": keyword,
  154. "qSource": "url",
  155. "pageSource": "",
  156. "channelSrp": "",
  157. "tab": "all",
  158. "pageSize": str(page_size),
  159. "sourceS": "2",
  160. "ntoffset": "0",
  161. "filterTag": "",
  162. "service": "",
  163. "prop": "",
  164. "loc": "",
  165. "categoryp": "",
  166. "screenResolution": "1920x1080",
  167. "viewResolution": "1092x4722",
  168. "userAgent": headers["user-agent"],
  169. "couponUnikey": "",
  170. "subTabId": "",
  171. "np": "",
  172. "clientType": "h5",
  173. "isNewDomainAb": "false",
  174. "forceOldDomain": "false",
  175. },
  176. }
  177. def _request_search_page(self, keyword, page):
  178. t = str(int(time.time() * 1000))
  179. data = self._build_search_payload(keyword, page)
  180. data_str = json.dumps(data, separators=(",", ":"))
  181. sign = self.get_token(t, MTOP_APP_KEY, data_str)
  182. params = {
  183. "jsv": "2.7.4",
  184. "appKey": MTOP_APP_KEY,
  185. "t": t,
  186. "sign": sign,
  187. "api": "mtop.relationrecommend.wirelessrecommend.recommend",
  188. "v": "2.0",
  189. "timeout": "10000",
  190. "type": "jsonp",
  191. "dataType": "jsonp",
  192. "callback": "",
  193. "data": data_str,
  194. }
  195. return self.session.get(MTOP_URL, params=params, timeout=30)
  196. def _parse_jsonp_body(self, res_text):
  197. res_text = (res_text or "").strip()
  198. json_str = res_text
  199. m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL)
  200. if m:
  201. json_str = m.group(1)
  202. return json.loads(json_str)
  203. def get_search(self):
  204. keyword = ""
  205. if self.brand:
  206. keyword = (self.brand + " " + self.product).strip()
  207. if self.product_desc:
  208. keyword = (keyword + " " + self.product_desc).strip()
  209. for page in range(1, SEARCH_MAX_PAGE + 1):
  210. logger.info(f"正在爬取关键词:{keyword},{page}页数据")
  211. for attempt in range(1, REQUEST_RETRY_COUNT + 1):
  212. try:
  213. if not self.session:
  214. self.init_session()
  215. res = self._request_search_page(keyword, page)
  216. if res.status_code != 200:
  217. logger.warning(
  218. "请求失败 HTTP %s,第%s/%s 次重试",
  219. res.status_code,
  220. attempt,
  221. REQUEST_RETRY_COUNT,
  222. )
  223. time.sleep(random.randint(3, 8))
  224. continue
  225. except Exception as e:
  226. if self._is_transport_error(e):
  227. logger.warning(
  228. "检测到网络传输异常(curl),重建会话后重试: %s",
  229. e,
  230. )
  231. self.init_session()
  232. logger.warning(
  233. "请求异常,第%s/%s 次重试: %s",
  234. attempt,
  235. REQUEST_RETRY_COUNT,
  236. e,
  237. )
  238. # 指数退避,避免连续瞬时失败
  239. time.sleep(min(3 * attempt, 10))
  240. continue
  241. try:
  242. json_data = self._parse_jsonp_body(res.text)
  243. item_array = json_data.get("data", {}).get("itemsArray", [])
  244. break
  245. except Exception as e:
  246. logger.warning(
  247. "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s",
  248. self.account_name,
  249. e,
  250. )
  251. if self.update_cookie():
  252. self.init_session()
  253. else:
  254. return
  255. else:
  256. logger.warning("关键词 %s 第 %s 页连续重试失败", keyword, page)
  257. continue
  258. if not item_array:
  259. logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page)
  260. return
  261. for raw in item_array:
  262. item_id = raw.get("item_id", "")
  263. if not item_id:
  264. continue
  265. item_title = self.get_html_content(raw.get("title") or "")
  266. if self.brand not in item_title:
  267. self.is_no_product += 1
  268. continue
  269. if self.product not in item_title:
  270. self.is_no_product += 1
  271. continue
  272. if "+" in item_title:
  273. self.is_no_product += 1
  274. continue
  275. if self.product_desc and self.product_desc not in item_title:
  276. continue
  277. self.is_no_product = 0
  278. status = 1
  279. if self.product_keyword:
  280. search_keyword_list = self.product_keyword.split(",")
  281. for search_keyword in search_keyword_list:
  282. if search_keyword.strip() not in item_title:
  283. status = 0
  284. if status == 0:
  285. continue
  286. item_price = raw.get("price")
  287. item_price_show = raw.get("priceShow", {}).get("price", 0)
  288. item_sales = raw.get("realSales") or ""
  289. sale_num = ""
  290. sales_m = re.search(r"(.*?)人付款", item_sales)
  291. if sales_m:
  292. sale_num = sales_m.group(1)
  293. item_url = self._normalize_url(raw.get("auctionURL"))
  294. shop_name = raw.get("shopInfo", {}).get("title", "")
  295. area_str = (raw.get("procity", "") or "").strip()
  296. city_id, province_id, city, province = get_city(area_str)
  297. shop_url = self._normalize_url(
  298. raw.get("shopInfo", {}).get("url", "")
  299. )
  300. pic_path = raw.get("pic_path", "")
  301. raw_price = item_price_show
  302. if raw_price in (None, ""):
  303. price = Decimal("0.00")
  304. else:
  305. try:
  306. price = Decimal(str(raw_price)).quantize(Decimal("0.00"))
  307. except (InvalidOperation, ValueError):
  308. price = Decimal("0.00")
  309. scrape_date = time.strftime("%Y-%m-%d")
  310. update_time = time.strftime("%Y-%m-%d %H:%M:%S")
  311. snapshot_url = self._normalize_url(pic_path) if pic_path else ""
  312. # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
  313. product = {
  314. "platform": self.platform,
  315. "item_id": item_id,
  316. "enterprise_id": self.company_id,
  317. "product_name": item_title,
  318. "spec": self.product_desc,
  319. "one_price": "",
  320. "detail_url": item_url,
  321. "shop_name": shop_name,
  322. "anonymous_store_name": "",
  323. "shop_url": "",
  324. "city_name": "",
  325. "city_id": "",
  326. "province_name": "",
  327. "province_id": "",
  328. "shipment_city_name": city,
  329. "shipment_city_id": city_id,
  330. "shipment_province_name": province,
  331. "shipment_province_id": province_id,
  332. "area_info": area_str,
  333. "factory_name": "",
  334. "scrape_date": scrape_date,
  335. "price": price,
  336. "sales": sale_num,
  337. "stock_count": "",
  338. "snapshot_url": snapshot_url,
  339. "approval_num": "",
  340. "produced_time": "",
  341. "deadline": "",
  342. "update_time": update_time,
  343. "insert_time": update_time,
  344. "number": 1,
  345. "product_brand": self.brand or "",
  346. "collect_task_id": self.collect_task_id,
  347. "search_name": self.product,
  348. "company_name": "",
  349. "collect_config_info": json.dumps(
  350. {
  351. "sampling_cycle": self.sampling_cycle,
  352. "sampling_start_time": self.sampling_start_time,
  353. "sampling_end_time": self.sampling_end_time,
  354. }
  355. ),
  356. "account_id": self.account_id,
  357. "collect_region_id": self.collect_region_id,
  358. "collect_round": self.collect_round,
  359. "is_sold_out": 1
  360. }
  361. try:
  362. self.pipeline.storge_data(product)
  363. logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
  364. except Exception as e:
  365. logger.exception("写入数据库失败: %s", e)
  366. logger.info(
  367. "关键词 %s 第 %s 页爬取完成",
  368. keyword,
  369. page,
  370. )
  371. total_page = (
  372. (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage")
  373. )
  374. try:
  375. total_page_int = int(total_page) if total_page is not None else 50
  376. except (TypeError, ValueError):
  377. total_page_int = 50
  378. if page >= total_page_int:
  379. break
  380. if self.is_no_product > 20:
  381. break
  382. sleep_second = random.uniform(30, 60)
  383. logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second)
  384. time.sleep(sleep_second)
  385. def update_cookie(self):
  386. taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product)
  387. if not taobao_auto.run():
  388. return False
  389. safe_name = self._sql_literal(self.account_name)
  390. sql_account = (
  391. "select `id`,`nickname`,`ip`,`cookie_timestamp`,`cookie_str` "
  392. f"from `accounts_platform` where `name`='{safe_name}'"
  393. )
  394. account_list = self.db.select_data(sql_account)
  395. if not account_list:
  396. logger.error("账号 %s 未查询到 cookie 信息", self.account_name)
  397. return False
  398. cookie_str = account_list[0].get("cookie_str")
  399. if not cookie_str:
  400. logger.error("账号 %s cookie 为空", self.account_name)
  401. return False
  402. try:
  403. self.cookies = json.loads(cookie_str)
  404. except Exception as e:
  405. logger.error("账号 %s cookie 解析失败: %s", self.account_name, e)
  406. return False
  407. return True
  408. def get_account(self):
  409. sql_account = f""" select `id`,`nickname`,`ip`,`cookie_timestamp`,`cookie_str` from `accounts_platform` where `platform`=1 and `status`=1 and `equipment_id`=1 order by `cookie_timestamp` asc limit 1 """
  410. account_list = self.db.select_data(sql_account)
  411. if not account_list:
  412. return False
  413. account_dict = account_list[0]
  414. self.ip = account_dict.get("ip")
  415. cookie_str = account_dict.get("cookie_str")
  416. self.account_name = account_dict.get("nickname")
  417. self.cookie_stamp = account_dict.get("cookie_timestamp")
  418. if self.ip:
  419. account_proxy = f"http://{self.ip}"
  420. self.proxies = {"http": account_proxy, "https": account_proxy}
  421. else:
  422. self.proxies = None
  423. need_refresh = (
  424. not cookie_str
  425. or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC
  426. )
  427. if need_refresh:
  428. if not self.update_cookie():
  429. return False
  430. else:
  431. try:
  432. self.cookies = json.loads(cookie_str)
  433. except Exception as e:
  434. logger.error("cookie 解析失败,尝试刷新: %s", e)
  435. if not self.update_cookie():
  436. return False
  437. logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
  438. self.init_session()
  439. return True
  440. def run(self):
  441. if not self.get_account():
  442. logger.info("==================当前无账号可用==================")
  443. self.success = False
  444. return self.pipeline.crawl_count, self.success
  445. logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
  446. self.get_search()
  447. logger.info(
  448. "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条",
  449. self.task_id,
  450. self.product,
  451. self.pipeline.crawl_count,
  452. )
  453. return self.pipeline.crawl_count, self.success