Explorar el Código

[智价云] 采集同步队列优化

public hace 1 mes
padre
commit
bfebad02ad

+ 24 - 9
app/Jobs/Manager/CollectData/JdTmao/JdTmaoStoreJobs.php

@@ -39,20 +39,35 @@ class JdTmaoStoreJobs implements ShouldQueue
      */
     public function handle()
     {
-
         $list_data = $this->message_data['data'];
         if (empty($list_data)) return true;
+
         try {
-            foreach ($list_data as $key => $item) {
+            // 分批处理,每批 50-100 条
+            $chunks = array_chunk($list_data, 50);
+
+            foreach ($chunks as $chunkIndex => $chunk) {
+                foreach ($chunk as $key => $item) {
+                    $globalIndex = $chunkIndex * 50 + $key;
+                    $item['queue_now_limit'] = (int)$globalIndex + 1;
+                    $item['queue_page'] = $this->message_data['queue_page'];
+                    $item['queue_limit'] = $this->message_data['queue_limit'];
+                    $item['queue_total'] = $this->message_data['queue_total'];
 
-                $item['queue_now_limit'] =  (int)$key + 1;
-                $item['queue_page'] =  $this->message_data['queue_page'];
-                $item['queue_limit'] =  $this->message_data['queue_limit'];
-                $item['queue_total'] =  $this->message_data['queue_total'];
-                JdTmaoProductDataJobs::dispatch($item);
-                // JdTmaoProductDataJobs::dispatchSync($item);
-                unset($item);
+                    // 使用延迟分发,避免一次性创建太多任务
+                    JdTmaoProductDataJobs::dispatch($item)
+                        ->delay(now()->addSeconds($globalIndex * 0.1)); // 每 0.1 秒分发一个
+
+                    unset($item);
+                }
+
+                // 处理完一批后强制垃圾回收
                 gc_collect_cycles();
+
+                // 可选:添加短暂延迟
+                if (count($chunks) > 1 && $chunkIndex < count($chunks) - 1) {
+                    usleep(100000); // 0.1 秒
+                }
             }
         } catch (\Exception $e) {
             Log::info('job_error', '采集数据-药师帮店铺数据同步队列失败', ['error' => $e->getMessage()]);

+ 25 - 9
app/Jobs/Manager/CollectData/Ysbang/YsbangStoreJobs.php

@@ -39,19 +39,35 @@ class YsbangStoreJobs implements ShouldQueue
      */
     public function handle()
     {
-
         $list_data = $this->message_data['data'];
         if (empty($list_data)) return true;
+
         try {
-            foreach ($list_data as $key => $item) {
-                $item['queue_now_limit'] =  (int)$key + 1;
-                $item['queue_page'] =  $this->message_data['queue_page'];
-                $item['queue_limit'] =  $this->message_data['queue_limit'];
-                $item['queue_total'] =  $this->message_data['queue_total'];
-                YsbangProductDataJobs::dispatch($item);
-                // YsbangProductDataJobs::dispatchSync($item);
-                unset($item);
+            // 分批处理,每批 50-100 条
+            $chunks = array_chunk($list_data, 50);
+
+            foreach ($chunks as $chunkIndex => $chunk) {
+                foreach ($chunk as $key => $item) {
+                    $globalIndex = $chunkIndex * 50 + $key;
+                    $item['queue_now_limit'] = (int)$globalIndex + 1;
+                    $item['queue_page'] = $this->message_data['queue_page'];
+                    $item['queue_limit'] = $this->message_data['queue_limit'];
+                    $item['queue_total'] = $this->message_data['queue_total'];
+
+                    // 使用延迟分发,避免一次性创建太多任务
+                    YsbangProductDataJobs::dispatch($item)
+                        ->delay(now()->addSeconds($globalIndex * 0.1)); // 每 0.1 秒分发一个
+
+                    unset($item);
+                }
+
+                // 处理完一批后强制垃圾回收
                 gc_collect_cycles();
+
+                // 可选:添加短暂延迟
+                if (count($chunks) > 1 && $chunkIndex < count($chunks) - 1) {
+                    usleep(100000); // 0.1 秒
+                }
             }
         } catch (\Exception $e) {
             Log::info('job_error', '采集数据-药师帮店铺数据同步队列失败', ['error' => $e->getMessage()]);

+ 24 - 9
app/Jobs/Manager/CollectData/Yycheng/YychengStoreJobs.php

@@ -39,20 +39,35 @@ class YychengStoreJobs implements ShouldQueue
      */
     public function handle()
     {
-
         $list_data = $this->message_data['data'];
         if (empty($list_data)) return true;
+
         try {
-            foreach ($list_data as $key => $item) {
+            // 分批处理,每批 50-100 条
+            $chunks = array_chunk($list_data, 50);
+
+            foreach ($chunks as $chunkIndex => $chunk) {
+                foreach ($chunk as $key => $item) {
+                    $globalIndex = $chunkIndex * 50 + $key;
+                    $item['queue_now_limit'] = (int)$globalIndex + 1;
+                    $item['queue_page'] = $this->message_data['queue_page'];
+                    $item['queue_limit'] = $this->message_data['queue_limit'];
+                    $item['queue_total'] = $this->message_data['queue_total'];
 
-                $item['queue_now_limit'] =  (int)$key + 1;
-                $item['queue_page'] =  $this->message_data['queue_page'];
-                $item['queue_limit'] =  $this->message_data['queue_limit'];
-                $item['queue_total'] =  $this->message_data['queue_total'];
-                YychengProductDataJobs::dispatch($item);
-                // YychengProductDataJobs::dispatchSync($item);
-                unset($item);
+                    // 使用延迟分发,避免一次性创建太多任务
+                    YychengProductDataJobs::dispatch($item)
+                        ->delay(now()->addSeconds($globalIndex * 0.1)); // 每 0.1 秒分发一个
+
+                    unset($item);
+                }
+
+                // 处理完一批后强制垃圾回收
                 gc_collect_cycles();
+
+                // 可选:添加短暂延迟
+                if (count($chunks) > 1 && $chunkIndex < count($chunks) - 1) {
+                    usleep(100000); // 0.1 秒
+                }
             }
         } catch (\Exception $e) {
             Log::info('job_error', '采集数据-药师帮店铺数据同步队列失败', ['error' => $e->getMessage()]);