| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- <?php
- namespace App\Jobs\Manager\Process;
- use Illuminate\Bus\Queueable;
- use Illuminate\Contracts\Queue\ShouldBeUnique;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Bus\Dispatchable;
- use Illuminate\Queue\InteractsWithQueue;
- use Illuminate\Queue\SerializesModels;
- use App\Facades\Servers\Logs\Log;
- use App\Models\Manager\Process\ScrapeData as ScrapeDataModel;
- use Illuminate\Support\Facades\DB;
- use Illuminate\Support\Facades\Cache;
- use App\Jobs\Manager\Process\LowPriceGoodsJobs;
- use App\Jobs\Manager\Process\ViolationProductJobs;
- use App\Jobs\Manager\Process\ViolationCompanyJobs;
- /**
- * 数据清洗-读取采集商品数据队列
- * @author 唐远望
- * @version 1.0
- * @date 2025-12-10
- */
- class ScrapeDataProductJobs implements ShouldQueue
- {
- use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- public $tries = 3; // 限制重试次数
- public $timeout = 600; // 10分钟超时
- protected $message_data;
- /**
- * Create a new job instance.
- *
- * @return void
- */
- public function __construct(array $message_data)
- {
- $this->message_data = $message_data;
- }
- /**
- * Execute the job.
- *
- * @return void
- */
- public function handle()
- {
- try {
- $ScrapeDataModel = new ScrapeDataModel();
- $limit = isset($message_data['limit']) ? $message_data['limit'] : 10000;
- $page = isset($message_data['page']) ? $message_data['page'] : 1;
- $company_id = isset($message_data['company_id']) ? $message_data['company_id'] : 0; //品牌方公司ID
- $start_time = $this->message_data['start_time'];
- $end_time = $this->message_data['end_time'];
- $start_time_string = date('Y-m-d H:i:s', $start_time);
- $end_time_string = date('Y-m-d H:i:s', $end_time);
- $key_name = 'ScrapeDataProductJobs_' . $company_id . '_' . $start_time . '_' . $end_time;
- $product_datas = Cache::get($key_name);
- if (empty($product_datas)) {
- $where = [];
- $where[] = ['insert_time', '>=', $start_time_string];
- $where[] = ['insert_time', '<=', $end_time_string];
- $where[] = ['min_price', '>=', '0.01'];
- $where[] = ['number', '>=', '1'];
- $where[] = ['company_id', '=', $company_id];
- $product_data_info = $ScrapeDataModel->select('*', DB::raw('ROUND(min_price / NULLIF(number, 0), 2) as unit_price'))
- ->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray();
- $product_datas = $product_data_info['data'];
- if (empty($product_datas) && $page == 1) {
- //如果查询第一页为空,则直接返回
- return true;
- } else if (empty($product_datas) && $page > 1) {
- //如果查询第二页为空,则直接返回,表示查询完毕,则处理数据清洗任务
- $data_totle = count($product_datas);
- $index_number = 0;
- foreach ($product_datas as $key => $product_data) {
- $index_number = $key + 1;
- $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '50', 'product_data' => $product_data, 'index_number' => $index_number, 'data_totle' => $data_totle];
- LowPriceGoodsJobs::dispatch($message_data);
- // LowPriceGoodsJobs::dispatchSync($message_data);
- ViolationProductJobs::dispatch($message_data);
- // ViolationProductJobs::dispatchSync($message_data);
- ViolationCompanyJobs::dispatch($message_data);
- // ViolationCompanyJobs::dispatchSync($message_data);
- }
- }
- return true;
- } else {
- //合并数据
- $product_datas = array_merge($product_datas, Cache::get($key_name));
- Cache::put($key_name, $product_datas, 360); //缓存6分钟
- //继续执行下一页
- $message_data['page'] = $page + 1;
- $message_data['limit'] = $limit;
- ScrapeDataProductJobs::dispatch($message_data)->delay(now()->addSeconds(1));
- }
- } catch (\Exception $e) {
- Log::info('job_error', '数据清洗-读取采集商品数据队列失败', ['data' => $this->message_data, 'error' => $e->getMessage()]);
- }
- }
- public function failed(\Throwable $exception)
- {
- Log::info('job_error', '数据清洗-读取采集商品数据队列完全失败', ['data' => $this->message_data, 'error' => $exception->getMessage()]);
- }
- }
|