feishu_webhook.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import requests
  2. import json
  3. import time
  4. WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/f8a70e6b-6b75-43c2-aaf2-9ba8f79f40f6"
  5. def send_text(text):
  6. data = {
  7. "msg_type": "text",
  8. "content": {"text": text},
  9. }
  10. headers = {"Content-Type": "application/json"}
  11. try:
  12. response = requests.post(
  13. WEBHOOK_URL,
  14. headers=headers,
  15. data=json.dumps(data),
  16. timeout=5
  17. )
  18. except Exception as e:
  19. pass
  20. def send_error_card(task_name, err_msg, mention_all=False):
  21. """发送异常红色卡片"""
  22. content = (
  23. f"**任务**:{task_name}\n"
  24. f"**状态**:❌ 失败\n"
  25. f"**错误**:{err_msg}\n"
  26. f"**时间**:{time.strftime('%Y-%m-%d %H:%M:%S')}"
  27. )
  28. if mention_all:
  29. content = f"<at id=all></at>\n{content}"
  30. data = {
  31. "msg_type": "interactive",
  32. "card": {
  33. "config": {"wide_screen_mode": True},
  34. "header": {
  35. "template": "red",
  36. "title": {"tag": "plain_text", "content": "异常告警"},
  37. },
  38. "elements": [
  39. {
  40. "tag": "div",
  41. "text": {"tag": "lark_md", "content": content},
  42. }
  43. ],
  44. },
  45. }
  46. headers = {"Content-Type": "application/json"}
  47. response = requests.post(
  48. WEBHOOK_URL,
  49. headers=headers,
  50. data=json.dumps(data),
  51. timeout=5,
  52. )
  53. print(response.json())
  54. # 使用示例
  55. if __name__ == "__main__":
  56. account1 = "aaaaa"
  57. plat_form = "京东"
  58. drug = "999小儿感冒颗粒24粒"
  59. drug_count = 128
  60. text = f"**重要通知** {str(time.strftime("%Y-%m-%d %H:%M:%S"))}\n 账号{account1}, {plat_form} 采集 {drug} 数据 {drug_count} 条"
  61. send_text(text)