TaskJobs.php 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. <?php
  2. namespace App\Jobs\OpenWork\Msg;
  3. use App\Facades\Servers\Logs\Log;
  4. use App\Servers\DB\DbService;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use App\Facades\Servers\Wechat\OpenWork;
  11. use App\Models\OpenWork\Msg\Task;
  12. /**
  13. * 群发消息任务
  14. *
  15. */
  16. class TaskJobs implements ShouldQueue
  17. {
  18. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  19. protected $taskData;
  20. /**
  21. * Create a new job instance.
  22. *
  23. * @return void
  24. */
  25. public function __construct(array $taskData)
  26. {
  27. $this->taskData = $taskData;
  28. }
  29. /**
  30. * Execute the job.
  31. *
  32. * @return void
  33. */
  34. public function handle()
  35. {
  36. try {
  37. // 接收参数
  38. $taskData = $this->taskData;
  39. // 切换数据
  40. (new DbService())->getConnectionNameByCorpId($taskData['corpid']);
  41. // 查询
  42. $Task = (new Task());
  43. // 查询所有
  44. $list = $Task->query()->where(['corpid' => $taskData['corpid'], 'tpl_id' => $taskData['tpl_id']])->get(['id', 'msg', 'msgid'])->toArray();
  45. // 实例
  46. $work = OpenWork::getWork($taskData['corpid']);
  47. // 返回结果
  48. $updateList = [];
  49. // 获取所有数据
  50. foreach ($list as $value) {
  51. // 已经发送的,不再重新发送
  52. if ( $value['msgid'] ) continue;
  53. // 发送结果
  54. $value['msg'] = json_decode($value['msg'], true);
  55. // 获取token
  56. $result = $work->external_contact_message->submit($value['msg']);
  57. // 临时结果
  58. $temp = ['id' => $value['id'], 'corpid' => $taskData['corpid'], 'tpl_id' => $taskData['tpl_id'], 'status' => 4, 'errmsg' => '', 'msgid' => '','update_time' => time()];
  59. // 接口返回失败的话
  60. if (!$result) {
  61. // 错误信息
  62. $temp['errmsg'] = '企微接口返回失败';
  63. Log::info('job_error', '群发消息任务队列请求企微接口失败', ['data' => $taskData]);
  64. // 接口返回失败的话
  65. } else if ($result['errcode']) {
  66. // 获取错误信息
  67. $temp['errmsg'] = OpenWork::getErrmsg($result['errcode']);
  68. Log::info('job_error', '群发消息任务队列请求企微接口失败', ['data' => $taskData,'error_data'=>$result]);
  69. } else {
  70. // 修改结果
  71. $temp['msgid'] = $result['msgid'];
  72. $temp['status'] = 0;
  73. }
  74. // 修改列表
  75. $updateList[] = $temp;
  76. }
  77. // 修改状态
  78. $result = $Task->query()->upsert($updateList, 'id');
  79. // 返回结果
  80. if (!$result) Log::error('openwork_msg_jobs', $updateList);
  81. } catch (\Throwable $th) {
  82. // 错误信息
  83. Log::info('job_error', '群发消息任务队列执行失败', ['data' => $this->taskData, 'error' => $th->getMessage()]);
  84. }
  85. }
  86. public function failed(\Throwable $exception)
  87. {
  88. Log::info('job_error', '群发消息任务队列完全执行失败', ['data' => $this->taskData, 'error' => $exception->getMessage()]);
  89. }
  90. }