ScrapeDataProductJobs.php 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. <?php
  2. namespace App\Jobs\Manager\Process;
  3. use Illuminate\Bus\Queueable;
  4. use Illuminate\Contracts\Queue\ShouldBeUnique;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Bus\Dispatchable;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Queue\SerializesModels;
  9. use App\Facades\Servers\Logs\Log;
  10. use App\Models\Manager\Process\ScrapeData as ScrapeDataModel;
  11. use Illuminate\Support\Facades\DB;
  12. use Illuminate\Support\Facades\Cache;
  13. use App\Jobs\Manager\Process\LowPriceGoodsJobs;
  14. use App\Jobs\Manager\Process\ViolationProductJobs;
  15. use App\Jobs\Manager\Process\ViolationCompanyJobs;
  16. /**
  17. * 数据清洗-读取采集商品数据队列
  18. * @author 唐远望
  19. * @version 1.0
  20. * @date 2025-12-10
  21. */
  22. class ScrapeDataProductJobs implements ShouldQueue
  23. {
  24. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  25. public $tries = 3; // 限制重试次数
  26. public $timeout = 600; // 10分钟超时
  27. protected $message_data;
  28. /**
  29. * Create a new job instance.
  30. *
  31. * @return void
  32. */
  33. public function __construct(array $message_data)
  34. {
  35. $this->message_data = $message_data;
  36. }
  37. /**
  38. * Execute the job.
  39. *
  40. * @return void
  41. */
  42. public function handle()
  43. {
  44. try {
  45. $ScrapeDataModel = new ScrapeDataModel();
  46. $limit = isset($this->message_data['limit']) ? $this->message_data['limit'] : 10000;
  47. $page = isset($this->message_data['page']) ? $this->message_data['page'] : 1;
  48. $company_id = isset($this->message_data['company_id']) ? $this->message_data['company_id'] : 0; //品牌方公司ID
  49. $start_time = $this->message_data['start_time'];
  50. $end_time = $this->message_data['end_time'];
  51. $start_time_string = date('Y-m-d H:i:s', $start_time);
  52. $end_time_string = date('Y-m-d H:i:s', $end_time);
  53. $key_end_name = 'ScrapeDataProductJobs_end_' . $company_id . '_' . $start_time . '_' . $end_time;
  54. $is_end_select = Cache::get($key_end_name);
  55. $key_name = 'ScrapeDataProductJobs_' . $company_id . '_' . $start_time . '_' . $end_time;
  56. $cache_product_datas = Cache::get($key_name) ?? [];
  57. //如果查询完毕,但是没有数据则关闭任务
  58. if (!empty($is_end_select) && empty($cache_product_datas)) {
  59. return true;
  60. }
  61. if (!empty($is_end_select) && !empty($cache_product_datas)) {
  62. //如果查询完毕了,则直接处理
  63. $this->cleanData($company_id, $cache_product_datas);
  64. } else if (empty($is_end_select) && !empty($is_end_select)) {
  65. //如果进行中,则合并数据
  66. $where = [];
  67. $where[] = ['insert_time', '>=', $start_time_string];
  68. $where[] = ['insert_time', '<=', $end_time_string];
  69. $where[] = ['enterprise_id', '=', $company_id];
  70. $product_data_info = $ScrapeDataModel->select('*')->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray();
  71. $select_product_datas = $product_data_info['data'];
  72. $last_page = $product_data_info['last_page'];
  73. if (!empty($select_product_datas)) {
  74. //如果查询完毕了,则直接处理//合并数据
  75. $product_datas = array_merge($select_product_datas, $cache_product_datas);
  76. Cache::put($key_name, $product_datas, 360); //缓存6分钟
  77. if ($page < $last_page) {
  78. //继续执行下一页
  79. $message_data['page'] = $page + 1;
  80. $message_data['limit'] = $limit;
  81. $message_data['company_id'] = $company_id;
  82. $message_data['start_time'] = $start_time;
  83. $message_data['end_time'] = $end_time;
  84. ScrapeDataProductJobs::dispatch($message_data);
  85. } else if ($page == $last_page) {
  86. Cache::put($key_end_name, '1', 360); //缓存6分钟
  87. $this->cleanData($company_id, $product_datas);
  88. }
  89. }
  90. } else if (empty($is_end_select) && empty($is_end_select)) {
  91. $where = [];
  92. $where[] = ['insert_time', '>=', $start_time_string];
  93. $where[] = ['insert_time', '<=', $end_time_string];
  94. $where[] = ['enterprise_id', '=', $company_id];
  95. $product_data_info = $ScrapeDataModel->select('*')->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray();
  96. $select_product_datas = $product_data_info['data'];
  97. $last_page = $product_data_info['last_page'];
  98. if (empty($select_product_datas)) {
  99. //如果查询第一页为空,则直接返回
  100. Cache::put($key_name, '', 360); //缓存6分钟
  101. Cache::put($key_end_name, 1, 360); //缓存6分钟
  102. return true;
  103. } else {
  104. //如果查询完毕了,则直接处理//合并数据
  105. $product_datas = array_merge($select_product_datas, $cache_product_datas);
  106. Cache::put($key_name, $product_datas, 360); //缓存6分钟
  107. if ($page < $last_page) {
  108. //继续执行下一页
  109. $message_data['page'] = $page + 1;
  110. $message_data['limit'] = $limit;
  111. $message_data['company_id'] = $company_id;
  112. $message_data['start_time'] = $start_time;
  113. $message_data['end_time'] = $end_time;
  114. ScrapeDataProductJobs::dispatch($message_data);
  115. } else if ($page == $last_page) {
  116. Cache::put($key_end_name, '1', 360); //缓存6分钟
  117. $this->cleanData($company_id, $product_datas);
  118. }
  119. }
  120. }
  121. } catch (\Exception $e) {
  122. Log::info('job_error', '数据清洗-读取采集商品数据队列失败', ['data' => $this->message_data, 'error' => $e->getMessage()]);
  123. }
  124. }
  125. /**
  126. * 执行数据清洗任务
  127. *
  128. */
  129. public function cleanData($company_id, $cache_product_datas)
  130. {
  131. $data_totle = count($cache_product_datas);
  132. $index_number = 0;
  133. foreach ($cache_product_datas as $key => $product_data) {
  134. $index_number = $key + 1;
  135. $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '1000', 'product_data' => $product_data, 'index_number' => $index_number, 'data_totle' => $data_totle];
  136. LowPriceGoodsJobs::dispatch($message_data);
  137. // LowPriceGoodsJobs::dispatchSync($message_data);
  138. ViolationProductJobs::dispatch($message_data);
  139. // ViolationProductJobs::dispatchSync($message_data);
  140. ViolationCompanyJobs::dispatch($message_data);
  141. // ViolationCompanyJobs::dispatchSync($message_data);
  142. }
  143. }
  144. public function failed(\Throwable $exception)
  145. {
  146. Log::info('job_error', '数据清洗-读取采集商品数据队列完全失败', ['data' => $this->message_data, 'error' => $exception->getMessage()]);
  147. }
  148. }