message_data = $message_data; } /** * Execute the job. * * @return void */ public function handle() { try { $ScrapeDataModel = new ScrapeDataModel(); $limit = isset($this->message_data['limit']) ? $this->message_data['limit'] : 10000; $page = isset($this->message_data['page']) ? $this->message_data['page'] : 1; $company_id = isset($this->message_data['company_id']) ? $this->message_data['company_id'] : 0; //品牌方公司ID $start_time = $this->message_data['start_time']; $end_time = $this->message_data['end_time']; $start_time_string = date('Y-m-d H:i:s', $start_time); $end_time_string = date('Y-m-d H:i:s', $end_time); $key_end_name = 'ScrapeDataProductJobs_end_' . $company_id . '_' . $start_time . '_' . $end_time; $is_end_select = Cache::get($key_end_name); $key_name = 'ScrapeDataProductJobs_' . $company_id . '_' . $start_time . '_' . $end_time; $cache_product_datas = Cache::get($key_name) ?? []; //如果查询完毕,但是没有数据则关闭任务 if (!empty($is_end_select) && empty($cache_product_datas)) { return true; } if (!empty($is_end_select) && !empty($cache_product_datas)) { //如果查询完毕了,则直接处理 $this->cleanData($company_id, $cache_product_datas); } else if (empty($is_end_select) && !empty($is_end_select)) { //如果进行中,则合并数据 $where = []; $where[] = ['insert_time', '>=', $start_time_string]; $where[] = ['insert_time', '<=', $end_time_string]; $where[] = ['enterprise_id', '=', $company_id]; $product_data_info = $ScrapeDataModel->select('*')->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray(); $select_product_datas = $product_data_info['data']; $last_page = $product_data_info['last_page']; if (!empty($select_product_datas)) { //如果查询完毕了,则直接处理//合并数据 $product_datas = array_merge($select_product_datas, $cache_product_datas); Cache::put($key_name, $product_datas, 360); //缓存6分钟 if ($page < $last_page) { //继续执行下一页 $message_data['page'] = $page + 1; $message_data['limit'] = $limit; $message_data['company_id'] = $company_id; $message_data['start_time'] = $start_time; $message_data['end_time'] = $end_time; ScrapeDataProductJobs::dispatch($message_data); } else if ($page == $last_page) { Cache::put($key_end_name, '1', 360); //缓存6分钟 $this->cleanData($company_id, $product_datas); } } } else if (empty($is_end_select) && empty($is_end_select)) { $where = []; $where[] = ['insert_time', '>=', $start_time_string]; $where[] = ['insert_time', '<=', $end_time_string]; $where[] = ['enterprise_id', '=', $company_id]; $product_data_info = $ScrapeDataModel->select('*')->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray(); $select_product_datas = $product_data_info['data']; $last_page = $product_data_info['last_page']; if (empty($select_product_datas)) { //如果查询第一页为空,则直接返回 Cache::put($key_name, '', 360); //缓存6分钟 Cache::put($key_end_name, 1, 360); //缓存6分钟 return true; } else { //如果查询完毕了,则直接处理//合并数据 $product_datas = array_merge($select_product_datas, $cache_product_datas); Cache::put($key_name, $product_datas, 360); //缓存6分钟 if ($page < $last_page) { //继续执行下一页 $message_data['page'] = $page + 1; $message_data['limit'] = $limit; $message_data['company_id'] = $company_id; $message_data['start_time'] = $start_time; $message_data['end_time'] = $end_time; ScrapeDataProductJobs::dispatch($message_data); } else if ($page == $last_page) { Cache::put($key_end_name, '1', 360); //缓存6分钟 $this->cleanData($company_id, $product_datas); } } } } catch (\Exception $e) { Log::info('job_error', '数据清洗-读取采集商品数据队列失败', ['data' => $this->message_data, 'error' => $e->getMessage()]); } } /** * 执行数据清洗任务 * */ public function cleanData($company_id, $cache_product_datas) { $data_totle = count($cache_product_datas); $index_number = 0; foreach ($cache_product_datas as $key => $product_data) { $index_number = $key + 1; $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '10000', 'product_data' => $product_data, 'index_number' => $index_number, 'data_totle' => $data_totle]; LowPriceGoodsJobs::dispatch($message_data); // LowPriceGoodsJobs::dispatchSync($message_data); ViolationProductJobs::dispatch($message_data); // ViolationProductJobs::dispatchSync($message_data); ViolationCompanyJobs::dispatch($message_data); // ViolationCompanyJobs::dispatchSync($message_data); } } public function failed(\Throwable $exception) { Log::info('job_error', '数据清洗-读取采集商品数据队列完全失败', ['data' => $this->message_data, 'error' => $exception->getMessage()]); } }