taobao_crawl.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. import hashlib
  2. import json
  3. import random
  4. import re
  5. import time
  6. from decimal import Decimal, InvalidOperation
  7. from curl_cffi import requests
  8. from lxml import etree
  9. from commons.Logger import get_spider_logger
  10. from commons.conn_mysql import MySQLPoolOnline
  11. from pipelines.drug_pipelines import DrugPipeline
  12. from spiders.taobao.taobao_login import (TaobaoAutoCrawl)
  13. from area_info.city_name_to_id import get_city
  14. logger = get_spider_logger("taobao")
  15. MTOP_APP_KEY = "12574478"
  16. MTOP_APP_ID = "34385"
  17. SEARCH_MAX_PAGE = 20
  18. REQUEST_RETRY_COUNT = 3
  19. COOKIE_MAX_AGE_SEC = 1800
  20. headers = {
  21. "accept": "*/*",
  22. "accept-language": "zh-CN,zh;q=0.9",
  23. "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall",
  24. "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
  25. "sec-ch-ua-mobile": "?0",
  26. "sec-ch-ua-platform": '"Windows"',
  27. "sec-fetch-dest": "script",
  28. "sec-fetch-mode": "no-cors",
  29. "sec-fetch-site": "same-site",
  30. "user-agent": (
  31. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  32. "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
  33. ),
  34. }
  35. MTOP_URL = (
  36. "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/"
  37. )
  38. class TaobaoCrawl:
  39. def __init__(self, drug_dict=None):
  40. self.cookies = None
  41. self.db = MySQLPoolOnline()
  42. self.pipeline = DrugPipeline("taobao")
  43. self.session = None
  44. self.proxies = None
  45. self.account_name = None
  46. self.ip = None
  47. self.cookie_stamp = None
  48. self.platform = 1
  49. self.task_dict = drug_dict or {}
  50. self.collect_task_id = None
  51. self.success = True
  52. if self.task_dict:
  53. self.get_product_data()
  54. self.is_no_product = 0
  55. def get_product_data(self):
  56. self.task_id = self.task_dict["id"]
  57. self.company_id = self.task_dict["company_id"]
  58. self.product = self.task_dict["product_name"]
  59. self.product_desc = self.task_dict.get("product_specs", "")
  60. self.brand = self.task_dict.get("product_brand", "")
  61. self.product_keyword = self.task_dict.get("product_keyword", "")
  62. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  63. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  64. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  65. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  66. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  67. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  68. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  69. self.collect_round = self.task_dict.get("collect_round", 1)
  70. @staticmethod
  71. def _normalize_url(url):
  72. if not url:
  73. return ""
  74. url = str(url)
  75. if url.startswith("//"):
  76. return "https:" + url
  77. return url
  78. @staticmethod
  79. def _extract_shop_id(shop_url):
  80. if not shop_url:
  81. return ""
  82. shop_re = re.search(r"appUid=(\w+)", shop_url)
  83. if shop_re:
  84. return shop_re.group(1)
  85. return hashlib.md5(shop_url.encode("utf-8")).hexdigest()
  86. @staticmethod
  87. def _sql_literal(value):
  88. """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。"""
  89. if value is None:
  90. return ""
  91. return str(value).replace("'", "''").replace("\\", "\\\\")
  92. def init_session(self):
  93. self.session = requests.Session(impersonate="chrome124")
  94. self.session.cookies.update(self.cookies or {})
  95. self.session.headers.update(headers)
  96. if self.proxies:
  97. self.session.proxies.update(self.proxies)
  98. @staticmethod
  99. def _is_transport_error(err):
  100. msg = str(err or "")
  101. return ("curl: (16)" in msg) or ("Failed to perform" in msg)
  102. def get_token(self, t, app_key, data_str):
  103. _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "")
  104. token = _m_h5_tk.split("_")[0] if _m_h5_tk else ""
  105. text = f"{token}&{t}&{app_key}&{data_str}"
  106. return hashlib.md5(text.encode()).hexdigest()
  107. def get_html_content(self, res_html):
  108. if not res_html:
  109. return ""
  110. ele_html = etree.HTML(res_html)
  111. if ele_html is None:
  112. return str(res_html)
  113. text_list = ele_html.xpath(".//text()")
  114. return "".join(text_list)
  115. def _build_search_payload(self, keyword, page, page_size=50):
  116. return {
  117. "appId": MTOP_APP_ID,
  118. "params": {
  119. "device": "HMA-AL00",
  120. "isBeta": "false",
  121. "grayHair": "false",
  122. "from": "nt_history",
  123. "brand": "HUAWEI",
  124. "info": "wifi",
  125. "index": "4",
  126. "rainbow": "",
  127. "schemaType": "auction",
  128. "elderHome": "false",
  129. "isEnterSrpSearch": "true",
  130. "newSearch": "false",
  131. "network": "wifi",
  132. "subtype": "",
  133. "hasPreposeFilter": "false",
  134. "prepositionVersion": "v2",
  135. "client_os": "Android",
  136. "gpsEnabled": "false",
  137. "searchDoorFrom": "srp",
  138. "debug_rerankNewOpenCard": "false",
  139. "homePageVersion": "v7",
  140. "searchElderHomeOpen": "false",
  141. "search_action": "initiative",
  142. "sugg": "_4_1",
  143. "sversion": "13.6",
  144. "style": "list",
  145. "ttid": "600000@taobao_pc_10.7.0",
  146. "needTabs": "true",
  147. "areaCode": "CN",
  148. "vm": "nw",
  149. "countryNum": "156",
  150. "m": "pc",
  151. "page": page,
  152. "n": 48,
  153. "q": keyword,
  154. "qSource": "url",
  155. "pageSource": "",
  156. "channelSrp": "",
  157. "tab": "all",
  158. "pageSize": str(page_size),
  159. "sourceS": "2",
  160. "ntoffset": "0",
  161. "filterTag": "",
  162. "service": "",
  163. "prop": "",
  164. "loc": "",
  165. "categoryp": "",
  166. "screenResolution": "1920x1080",
  167. "viewResolution": "1092x4722",
  168. "userAgent": headers["user-agent"],
  169. "couponUnikey": "",
  170. "subTabId": "",
  171. "np": "",
  172. "clientType": "h5",
  173. "isNewDomainAb": "false",
  174. "forceOldDomain": "false",
  175. },
  176. }
  177. def _request_search_page(self, keyword, page):
  178. t = str(int(time.time() * 1000))
  179. data = self._build_search_payload(keyword, page)
  180. data_str = json.dumps(data, separators=(",", ":"))
  181. sign = self.get_token(t, MTOP_APP_KEY, data_str)
  182. params = {
  183. "jsv": "2.7.4",
  184. "appKey": MTOP_APP_KEY,
  185. "t": t,
  186. "sign": sign,
  187. "api": "mtop.relationrecommend.wirelessrecommend.recommend",
  188. "v": "2.0",
  189. "timeout": "10000",
  190. "type": "jsonp",
  191. "dataType": "jsonp",
  192. "callback": "",
  193. "data": data_str,
  194. }
  195. return self.session.get(MTOP_URL, params=params, timeout=30)
  196. def _parse_jsonp_body(self, res_text):
  197. res_text = (res_text or "").strip()
  198. json_str = res_text
  199. m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL)
  200. if m:
  201. json_str = m.group(1)
  202. return json.loads(json_str)
  203. def get_search(self):
  204. keyword = self.product
  205. if self.brand:
  206. keyword = (self.brand + " " + self.product).strip()
  207. if self.product_desc:
  208. keyword = (keyword + " " + self.product_desc).strip()
  209. for page in range(1, SEARCH_MAX_PAGE + 1):
  210. logger.info(f"正在爬取关键词:{keyword},{page}页数据")
  211. for attempt in range(1, REQUEST_RETRY_COUNT + 1):
  212. try:
  213. if not self.session:
  214. self.init_session()
  215. res = self._request_search_page(keyword, page)
  216. if res.status_code != 200:
  217. logger.warning(
  218. "请求失败 HTTP %s,第%s/%s 次重试",
  219. res.status_code,
  220. attempt,
  221. REQUEST_RETRY_COUNT,
  222. )
  223. time.sleep(random.randint(3, 8))
  224. continue
  225. except Exception as e:
  226. if self._is_transport_error(e):
  227. logger.warning(
  228. "检测到网络传输异常(curl),重建会话后重试: %s",
  229. e,
  230. )
  231. self.init_session()
  232. logger.warning(
  233. "请求异常,第%s/%s 次重试: %s",
  234. attempt,
  235. REQUEST_RETRY_COUNT,
  236. e,
  237. )
  238. # 指数退避,避免连续瞬时失败
  239. time.sleep(min(3 * attempt, 10))
  240. continue
  241. try:
  242. json_data = self._parse_jsonp_body(res.text)
  243. item_array = json_data.get("data", {}).get("itemsArray", [])
  244. break
  245. except Exception as e:
  246. logger.warning(
  247. "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s",
  248. self.account_name,
  249. e,
  250. )
  251. if self.update_cookie():
  252. self.init_session()
  253. else:
  254. return
  255. else:
  256. logger.warning("关键词 %s 第 %s 页连续重试失败", keyword, page)
  257. continue
  258. if not item_array:
  259. logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page)
  260. return
  261. for raw in item_array:
  262. item_id = raw.get("item_id", "")
  263. if not item_id:
  264. continue
  265. item_title = self.get_html_content(raw.get("title") or "")
  266. if self.brand not in item_title:
  267. self.is_no_product += 1
  268. continue
  269. if self.product not in item_title:
  270. self.is_no_product += 1
  271. continue
  272. if "+" in item_title:
  273. continue
  274. if self.product_desc:
  275. if self.product_desc in item_title:
  276. crawl_product_desc = self.product_desc
  277. else:
  278. crawl_product_desc = ""
  279. else:
  280. crawl_product_desc = ""
  281. self.is_no_product = 0
  282. status = 1
  283. if self.product_keyword:
  284. search_keyword_list = self.product_keyword.split(",")
  285. for search_keyword in search_keyword_list:
  286. if search_keyword.strip() not in item_title:
  287. status = 0
  288. if status == 0:
  289. continue
  290. item_price = raw.get("price")
  291. item_price_show = raw.get("priceShow", {}).get("price", 0)
  292. item_sales = raw.get("realSales") or ""
  293. sale_num = ""
  294. sales_m = re.search(r"(.*?)人付款", item_sales)
  295. if sales_m:
  296. sale_num = sales_m.group(1)
  297. item_url = self._normalize_url(raw.get("auctionURL"))
  298. shop_name = raw.get("shopInfo", {}).get("title", "")
  299. area_str = (raw.get("procity", "") or "").strip()
  300. city_id, province_id, city, province = get_city(area_str)
  301. shop_url = self._normalize_url(
  302. raw.get("shopInfo", {}).get("url", "")
  303. )
  304. pic_path = raw.get("pic_path", "")
  305. raw_price = item_price_show
  306. if raw_price in (None, ""):
  307. price = Decimal("0.00")
  308. else:
  309. try:
  310. price = Decimal(str(raw_price)).quantize(Decimal("0.00"))
  311. except (InvalidOperation, ValueError):
  312. price = Decimal("0.00")
  313. scrape_date = time.strftime("%Y-%m-%d")
  314. update_time = time.strftime("%Y-%m-%d %H:%M:%S")
  315. snapshot_url = self._normalize_url(pic_path) if pic_path else ""
  316. # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
  317. product = {
  318. "platform": self.platform,
  319. "item_id": item_id,
  320. "enterprise_id": self.company_id,
  321. "product_name": item_title,
  322. "spec": crawl_product_desc,
  323. "one_price": "",
  324. "detail_url": item_url,
  325. "shop_name": shop_name,
  326. "anonymous_store_name": "",
  327. "shop_url": shop_url,
  328. "city_name": "",
  329. "city_id": "",
  330. "province_name": "",
  331. "province_id": "",
  332. "shipment_city_name": city,
  333. "shipment_city_id": city_id,
  334. "shipment_province_name": province,
  335. "shipment_province_id": province_id,
  336. "area_info": area_str,
  337. "factory_name": "",
  338. "scrape_date": scrape_date,
  339. "price": price,
  340. "sales": sale_num,
  341. "stock_count": "",
  342. "snapshot_url": "",
  343. "approval_num": "",
  344. "produced_time": "",
  345. "deadline": "",
  346. "update_time": update_time,
  347. "insert_time": update_time,
  348. "number": 1,
  349. "product_brand": self.brand or "",
  350. "collect_task_id": self.collect_task_id,
  351. "search_name": self.product,
  352. "company_name": "",
  353. "collect_config_info": json.dumps(
  354. {
  355. "sampling_cycle": self.sampling_cycle,
  356. "sampling_start_time": self.sampling_start_time,
  357. "sampling_end_time": self.sampling_end_time,
  358. }
  359. ),
  360. "account_id": self.account_id,
  361. "collect_region_id": self.collect_region_id,
  362. "collect_round": self.collect_round,
  363. "is_sold_out": 0
  364. }
  365. try:
  366. self.pipeline.storge_data(product)
  367. logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
  368. except Exception as e:
  369. logger.exception("写入数据库失败: %s", e)
  370. logger.info(
  371. "关键词 %s 第 %s 页爬取完成",
  372. keyword,
  373. page,
  374. )
  375. total_page = (
  376. (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage")
  377. )
  378. try:
  379. total_page_int = int(total_page) if total_page is not None else 50
  380. except (TypeError, ValueError):
  381. total_page_int = 50
  382. if page >= total_page_int:
  383. break
  384. if self.is_no_product > 20:
  385. break
  386. sleep_second = random.uniform(30, 60)
  387. logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second)
  388. time.sleep(sleep_second)
  389. def update_cookie(self):
  390. taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product)
  391. if not taobao_auto.run():
  392. return False
  393. safe_name = self._sql_literal(self.account_name)
  394. sql_account = (
  395. f"select * from `retrieve_collect_equipment_account` where `username`='{safe_name}'"
  396. )
  397. account_list = self.db.select_data(sql_account)
  398. if not account_list:
  399. logger.error("账号 %s 未查询到 cookie 信息", self.account_name)
  400. return False
  401. cookie_str = account_list[0].get("cookie_str")
  402. if not cookie_str:
  403. logger.error("账号 %s cookie 为空", self.account_name)
  404. return False
  405. try:
  406. self.cookies = json.loads(cookie_str)
  407. except Exception as e:
  408. logger.error("账号 %s cookie 解析失败: %s", self.account_name, e)
  409. return False
  410. return True
  411. def get_account(self):
  412. sql_account = """
  413. SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0
  414. """
  415. account_list = self.db.select_data(sql_account,self.account_id)
  416. if not account_list:
  417. return False
  418. account_dict = account_list[0]
  419. self.ip = account_dict.get("ip")
  420. cookie_str = account_dict.get("cookie_str")
  421. self.ip = account_dict.get("ip")
  422. self.account_name = account_dict.get("username")
  423. self.login_username = account_dict.get("phone", "")
  424. self.login_password = account_dict.get("password", "")
  425. self.cookie_stamp = account_dict.get("update_time")
  426. if self.ip:
  427. account_proxy = f"http://{self.ip}"
  428. self.proxies = {"http": account_proxy, "https": account_proxy}
  429. else:
  430. self.proxies = None
  431. need_refresh = (
  432. not cookie_str
  433. or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC
  434. )
  435. print(account_dict)
  436. if need_refresh:
  437. if not self.update_cookie():
  438. return False
  439. else:
  440. try:
  441. self.cookies = json.loads(cookie_str)
  442. except Exception as e:
  443. logger.error("cookie 解析失败,尝试刷新: %s", e)
  444. if not self.update_cookie():
  445. return False
  446. logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
  447. self.init_session()
  448. return True
  449. def run(self):
  450. if not self.get_account():
  451. logger.info("==================当前无账号可用==================")
  452. self.success = False
  453. return self.pipeline.crawl_count, self.success
  454. logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
  455. self.get_search()
  456. logger.info(
  457. "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条",
  458. self.task_id,
  459. self.product,
  460. self.pipeline.crawl_count,
  461. )
  462. return self.pipeline.crawl_count, self.success