message_data = $message_data; } public function getCorpId() { return $this->message_data['corpid'] ?? null; } /** * Execute the job. * * @return void */ public function handle() { try { $corpid = $this->message_data['corpid']; (new DbService())->getConnectionNameByCorpId($corpid); //处理分块数据 $chunk = $this->message_data['chunk']; $tagGroup = $this->message_data['tagGroup']; $totle_page = $this->message_data['totle_page']; $now_page = $this->message_data['now_page']; Log::info('job_error', 'UserExportDataJobs查询处理进度', ['totle_page' => $totle_page, 'now_page' => $now_page]); $user_data = $this->handleData($chunk, $tagGroup, $corpid); //处理数据 $key_name = 'UserExportDataJobs_' . $corpid; $fileName = $this->message_data['file_name']; // 文件名 $companyId = $this->message_data['companyId']; // 公司ID $taskId = $this->message_data['taskId']; // 任务ID $user_data_info = Cache::get($key_name); $user_new_data = !empty($user_data_info) ? array_merge($user_data_info, $user_data): $user_data; Cache::put($key_name, $user_new_data, 60 * 60 * 24); if ($now_page == $totle_page) { // 设置表头 $headers = ['企微ID', '客户名称', '客户备注', '客户描述', '客户状态', '企业名称', '所属客服', '添加时间', '手机号', '系统客户ID', 'SAAS关联手机号', '群聊名称', '入群时间', '退群时间']; // 如果有标签组 if ($tagGroup) { foreach (array_keys($tagGroup) as $col => $value) { // 设置表头 $headers[] = '标签组(' . $tagGroup[$value] . ')'; } } // 路径写入 $path = public_path('uploads/exports/' . $companyId); // 判断目录是否存在 if (!is_dir($path)) mkdir($path, 0777, true); // 获取上传文件 $ossUrl = $this->toDown($path, $fileName, $user_new_data, $headers); $TaskModel = new TaskModel(); $TaskModel->edit($taskId, ['status' => 1, 'url' => $ossUrl, 'file_dir_name' => $fileName]); //删除缓存$key_name Cache::forget($key_name); } return true; // 成功处理... } catch (\Exception $e) { $taskId = $this->message_data['taskId']; // 任务ID $corpid = $this->message_data['corpid']; //查询下载任务 (new TaskModel)->query()->where(['id' => $taskId, 'corpid' => $corpid])->update(['status' => 2, 'update_time' => time()]); // 失败处理... Log::info('job_error', 'UserExportDataJobs任务队列处理失败', ['data' => $this->message_data, 'error' => $e->getMessage()]); } } /** * 处理分块数据 * @param $data * @param $key_name */ public function handleData($chunk, $tagGroup, $corpid) { $ExternalFollowTag = new ExternalFollowTag(); $CustomExternalModel = new CustomExternalModel(); $CustomModel = new CustomModel(); $ChatModel = new ChatModel(); $list = []; // 查询客户的标签 $tagNames = $ExternalFollowTag->query() ->join('openwork_external_tag', 'openwork_external_tag.tag_id', '=', 'openwork_external_follow_tag.tag_id') ->whereIn('openwork_external_follow_tag.external_userid', array_column($chunk, 'external_userid')) ->get([ 'openwork_external_follow_tag.external_userid', 'openwork_external_follow_tag.follow_userid', 'openwork_external_tag.tag_id', 'openwork_external_tag.tag_name', 'openwork_external_tag.group_id' ]) ->toArray(); //查询绑定SAAS客户ID $custom_external_userid = []; $custom_external_data = $CustomExternalModel->whereIn('external_userid', array_column($chunk, 'external_userid'))->get(['external_userid', 'custom_uid'])->toarray(); if (!empty($custom_external_data)) { foreach ($custom_external_data as $key => $value) { $custom_external_userid[$value['external_userid']]['custom_uids'][] = $value['custom_uid']; unset($value); } foreach ($custom_external_userid as $key => $value) { $phone_list = $CustomModel->whereIn('uid', $value['custom_uids'])->pluck('phone')->implode(','); $custom_external_userid[$key]['phone_list'] = $phone_list; unset($value); } } //查询加入的群聊 $chat_list = []; $chat_group_list = $ChatModel->join('openwork_group_member', function ($join) { $join->on('openwork_group.chat_id', '=', 'openwork_group_member.chat_id'); })->select([ 'openwork_group.corpid', 'openwork_group_member.userid', 'openwork_group.id', 'openwork_group.chat_id', 'openwork_group.name', 'openwork_group.status', 'openwork_group_member.join_time', 'openwork_group_member.quit_time', ])->where(['openwork_group.corpid' => $corpid])->whereIn('openwork_group_member.userid', array_column($chunk, 'external_userid'))->get('id')->toarray(); if (!empty($chat_group_list)) { foreach ($chat_group_list as $key => $value) { $join_time = $value['join_time'] ? date('Y-m-d H:i:s', $value['join_time']) : ''; $quit_time = $value['quit_time'] ? date('Y-m-d H:i:s', $value['quit_time']) : '未退群'; $group_name = $value['name'] == '' ? '未命名群聊' : $value['name']; $chat_list[$value['userid']][] = $group_name . '&' . $join_time . '&' . $quit_time; unset($value); } } // 处理数据 $keyTags = []; // 循环处理标签数据 foreach ($tagNames as $value) { // 处理标签 $keyTags[$value['external_userid']][$value['follow_userid']][$value['group_id']][$value['tag_id']] = $value['tag_name']; } // 循环数据 foreach ($chunk as $value) { // 手机号 $value['status'] = $value['status'] ? '已流失' : '未流失'; $value['createtime'] = $value['createtime'] ? date('Y-m-d H:i:s', $value['createtime']) : ''; $value['custom_uid'] = empty($value['custom_uid']) ? '' : $value['custom_uid']; $value['corp_name'] = empty($value['corp_name']) ? ($value['remark_corp_name'] ? $value['remark_corp_name'] : '') : $value['corp_name']; $value['phone'] = empty($value['phone']) ? ($value['remark_mobiles'] ? $value['remark_mobiles'] : '') : $value['phone']; $value['phone_list'] = array_key_exists($value['external_userid'], $custom_external_userid) ? $custom_external_userid[$value['external_userid']]['phone_list'] : ''; if (array_key_exists($value['external_userid'], $chat_list)) { $chat_list_info = $chat_list[$value['external_userid']]; foreach ($chat_list_info as $key => $chat_list_info_value) { $chat_info = explode('&', $chat_list_info_value); $value['chat_name'] = $chat_info[0]; $value['join_time'] = $chat_info[1]; $value['quit_time'] = $chat_info[2]; // 循环标签数据 foreach ($tagGroup as $groupId => $groupName) { // 获取分组标签 $value[$groupId] = isset($keyTags[$value['external_userid']][$value['follow_userid']][$groupId]) ? implode(',', $keyTags[$value['external_userid']][$value['follow_userid']][$groupId]) : ''; } // 去除字段 unset($value['remark_corp_name'], $value['remark_mobiles']); // 修改数据 $list[] = $value; } } else { $value['chat_name'] = ''; $value['join_time'] = ''; $value['quit_time'] = ''; // 循环标签数据 foreach ($tagGroup as $groupId => $groupName) { // 获取分组标签 $value[$groupId] = isset($keyTags[$value['external_userid']][$value['follow_userid']][$groupId]) ? implode(',', $keyTags[$value['external_userid']][$value['follow_userid']][$groupId]) : ''; } // 去除字段 unset($value['remark_corp_name'], $value['remark_mobiles']); // 修改数据 $list[] = $value; } } return $list; } /** * 去下载 上传oss */ private function toDown($path, $fileName, &$data, $header) { // xlsx文件保存路径 $excel = new \Vtiful\Kernel\Excel(['path' => $path]); $filePath = $excel->filename($fileName, 'sheet1')->constMemory($fileName, 'sheet1')->header($header)->data($data)->output(); $Oss = new Oss(); $res = $Oss->uploadFile($fileName, $filePath); if ($res) @unlink($filePath); return $res; } public function failed(\Throwable $exception) { Log::info('job_error', 'UserExportDataJobs任务完全失败', ['data' => $this->message_data, 'error' => $exception->getMessage()]); } }