YychengStoreJobs.php 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. <?php
  2. namespace App\Jobs\Manager\CollectData\Yycheng;
  3. use Illuminate\Bus\Queueable;
  4. use Illuminate\Contracts\Queue\ShouldBeUnique;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Bus\Dispatchable;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Queue\SerializesModels;
  9. use App\Facades\Servers\Logs\Log;
  10. use App\Jobs\Manager\CollectData\Yycheng\YychengProductDataJobs;
  11. /**
  12. * 采集数据-药师帮店铺数据
  13. * @author 唐远望
  14. * @version 1.0
  15. * @date 2026-02-06
  16. */
  17. class YychengStoreJobs implements ShouldQueue
  18. {
  19. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  20. protected $message_data;
  21. /**
  22. * Create a new job instance.
  23. *
  24. * @return void
  25. */
  26. public function __construct(array $message_data)
  27. {
  28. $this->message_data = $message_data;
  29. }
  30. /**
  31. * Execute the job.
  32. *
  33. * @return void
  34. */
  35. public function handle()
  36. {
  37. $list_data = $this->message_data['data'];
  38. if (empty($list_data)) return true;
  39. try {
  40. // 分批处理,每批 50-100 条
  41. $chunks = array_chunk($list_data, 50);
  42. foreach ($chunks as $chunkIndex => $chunk) {
  43. foreach ($chunk as $key => $item) {
  44. $globalIndex = $chunkIndex * 50 + $key;
  45. $item['queue_now_limit'] = (int)$globalIndex + 1;
  46. $item['queue_page'] = $this->message_data['queue_page'];
  47. $item['queue_limit'] = $this->message_data['queue_limit'];
  48. $item['queue_total'] = $this->message_data['queue_total'];
  49. // 使用延迟分发,避免一次性创建太多任务
  50. YychengProductDataJobs::dispatch($item)
  51. ->delay(now()->addSeconds($globalIndex * 0.1)); // 每 0.1 秒分发一个
  52. unset($item);
  53. }
  54. // 处理完一批后强制垃圾回收
  55. gc_collect_cycles();
  56. // 可选:添加短暂延迟
  57. if (count($chunks) > 1 && $chunkIndex < count($chunks) - 1) {
  58. usleep(100000); // 0.1 秒
  59. }
  60. }
  61. } catch (\Exception $e) {
  62. Log::info('job_error', '采集数据-药师帮店铺数据同步队列失败', ['error' => $e->getMessage()]);
  63. }
  64. }
  65. public function failed(\Throwable $exception)
  66. {
  67. Log::info('job_error', '采集数据-药师帮店铺数据同步队列完全失败', ['data' => $this->message_data, 'error' => $exception]);
  68. }
  69. }