Explorar el Código

[智价云] 清洗更新

tangyuanwang hace 1 día
padre
commit
4b6e1e3488

+ 24 - 19
app/Http/Controllers/Manager/Process/LowPriceGoods.php

@@ -18,6 +18,8 @@ use App\Models\Manager\Citys as CitysModel;
 use App\Servers\Aliyun\Oss;
 use App\Jobs\Manager\Other\ExportLowPriceGoodsJobs;
 use Illuminate\Support\Facades\Cache;
+use Illuminate\Support\Carbon;
+use App\Jobs\Manager\Process\ScrapeDataProductJobs;
 
 /**
  * 违规处理-低价商品
@@ -148,7 +150,7 @@ class LowPriceGoods extends Controller
         //多选第一责任人
         if ($first_responsible_person && is_string($first_responsible_person)) {
             $first_responsible_person = explode(',', $first_responsible_person);
-            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $first_responsible_person)->where('duty_type',1)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
+            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $first_responsible_person)->where('duty_type', 1)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
             $LowPriceGoodsModel = $LowPriceGoodsModel->whereIn('id', function ($query1) use ($subQuery) {
                 $query1->select('lowprice_product_logid')->fromSub($subQuery, 'sub1');
             });
@@ -156,7 +158,7 @@ class LowPriceGoods extends Controller
         //多选责任人
         if ($responsible_person && is_string($responsible_person)) {
             $responsible_person = explode(',', $responsible_person);
-            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $responsible_person)->where('duty_type',2)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
+            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $responsible_person)->where('duty_type', 2)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
             $LowPriceGoodsModel = $LowPriceGoodsModel->whereIn('id', function ($query1) use ($subQuery) {
                 $query1->select('lowprice_product_logid')->fromSub($subQuery, 'sub1');
             });
@@ -164,7 +166,7 @@ class LowPriceGoods extends Controller
         //多选溯源责任人
         if ($source_responsible_person && is_string($source_responsible_person)) {
             $source_responsible_person = explode(',', $source_responsible_person);
-            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $source_responsible_person)->where('duty_type',3)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
+            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $source_responsible_person)->where('duty_type', 3)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
             $LowPriceGoodsModel = $LowPriceGoodsModel->whereIn('id', function ($query1) use ($subQuery) {
                 $query1->select('lowprice_product_logid')->fromSub($subQuery, 'sub1');
             });
@@ -203,7 +205,7 @@ class LowPriceGoods extends Controller
             ->where('personnel_employee.id', $user_id)->select(['personnel_employee.id', 'personnel_roles.identity'])
             ->first();
         //角色身份1=普通2=管理员
-        if(!empty($personnel_roles_info) && $personnel_roles_info->identity == 2){ 
+        if (!empty($personnel_roles_info) && $personnel_roles_info->identity == 2) {
             $is_admin = 1;
         }
         if ($is_admin != 1 && $company_id != 0) {
@@ -289,7 +291,7 @@ class LowPriceGoods extends Controller
         $message_data['collection_time_end_time'] = request('collection_time_end_time', '');
         $message_data['online_posting_min_price'] = request('online_posting_min_price', '');
         $message_data['online_posting_max_price'] = request('online_posting_max_price', '');
-        $message_data['is_ultra_low_price'] = request('is_ultra_low_price','');
+        $message_data['is_ultra_low_price'] = request('is_ultra_low_price', '');
         $message_data['merge_city_ids'] = request('merge_city_ids', '');
         $message_data['merge_province_ids'] = request('merge_province_ids', 1);
 
@@ -404,7 +406,7 @@ class LowPriceGoods extends Controller
         //多选第一责任人
         if ($first_responsible_person && is_string($first_responsible_person)) {
             $first_responsible_person = explode(',', $first_responsible_person);
-            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $first_responsible_person)->where('duty_type',1)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
+            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $first_responsible_person)->where('duty_type', 1)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
             $LowPriceGoodsModel = $LowPriceGoodsModel->whereIn('id', function ($query1) use ($subQuery) {
                 $query1->select('lowprice_product_logid')->fromSub($subQuery, 'sub1');
             });
@@ -412,7 +414,7 @@ class LowPriceGoods extends Controller
         //多选责任人
         if ($responsible_person && is_string($responsible_person)) {
             $responsible_person = explode(',', $responsible_person);
-            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $responsible_person)->where('duty_type',2)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
+            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $responsible_person)->where('duty_type', 2)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
             $LowPriceGoodsModel = $LowPriceGoodsModel->whereIn('id', function ($query1) use ($subQuery) {
                 $query1->select('lowprice_product_logid')->fromSub($subQuery, 'sub1');
             });
@@ -420,7 +422,7 @@ class LowPriceGoods extends Controller
         //多选溯源责任人
         if ($source_responsible_person && is_string($source_responsible_person)) {
             $source_responsible_person = explode(',', $source_responsible_person);
-            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $source_responsible_person)->where('duty_type',3)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
+            $subQuery = $LowPriceGoodsMemberModel->whereIn('employee_id', $source_responsible_person)->where('duty_type', 3)->distinct('lowprice_product_logid')->select('lowprice_product_logid');
             $LowPriceGoodsModel = $LowPriceGoodsModel->whereIn('id', function ($query1) use ($subQuery) {
                 $query1->select('lowprice_product_logid')->fromSub($subQuery, 'sub1');
             });
@@ -459,7 +461,7 @@ class LowPriceGoods extends Controller
             ->where('personnel_employee.id', $user_id)->select(['personnel_employee.id', 'personnel_roles.identity'])
             ->first();
         //角色身份1=普通2=管理员
-        if(!empty($personnel_roles_info) && $personnel_roles_info->identity == 2){ 
+        if (!empty($personnel_roles_info) && $personnel_roles_info->identity == 2) {
             $is_admin = 1;
         }
         $result_count = 0;
@@ -475,7 +477,7 @@ class LowPriceGoods extends Controller
             $result_count = $LowPriceGoodsModel->where($map)
                 ->count();
         }
-       
+
         if ($result_count == 0) return json_send(['code' => 'fail', 'msg' => '没有查询到数据', 'data' => '']);
         if ($result_count > 150000) return json_send(['code' => 'fail', 'msg' => '导出数据超过15万条,请缩小导出范围后再试', 'data' => '']);
 
@@ -483,7 +485,7 @@ class LowPriceGoods extends Controller
         $export_data_info = Cache::get($key_name);
         if ($export_data_info) return json_send(['code' => 'fail', 'msg' => '导出任务正在执行中,请稍后再试', 'data' => '']);
         //创建缓存
-        Cache::put($key_name,'1', 60 * 60 * 3);
+        Cache::put($key_name, '1', 60 * 60 * 3);
 
         // 生成唯一文件ID
         $fileId                     = make_snow_flake();
@@ -743,9 +745,12 @@ class LowPriceGoods extends Controller
         } else {
             $company_id  = $admin_company_id;
         }
-        $message_data = ['page' => '1', 'limit' => 50, 'admin_id' => $admin_id, 'is_admin' => $is_admin, 'company_id' => $company_id];
-        LowPriceGoodsJobs::dispatch($message_data);
-        // LowPriceGoodsJobs::dispatchSync($message_data);
+        $start_time =  Carbon::yesterday()->startOfDay()->getTimestamp(); // 昨天开始时间 00:00:00
+        $end_time = Carbon::yesterday()->endOfDay()->getTimestamp(); // 昨天结束时间 23:59:59
+        $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '10000', 'start_time' => $start_time, 'end_time' => $end_time];
+        //执行低价挂网商品数据清洗任务
+        ScrapeDataProductJobs::dispatch($message_data);
+        // ScrapeDataProductJobs::dispatchSync($message_data);
         // 告知结果
         return             json_send(['code' => 'success', 'msg' => '执行成功']);
     }
@@ -1166,8 +1171,8 @@ class LowPriceGoods extends Controller
                     $insert_product_data['company_id']  = $admin_company_id;
                 }
                 $platform_id = isset($platform_data[$item[2]]) ? $platform_data[$item[2]] : '0'; // 平台ID
-                if($platform_id == 0) return json_send(['code' => 'error', 'msg' => "第{$key_num}行平台信息不正确", 'data' => $item]);
-                
+                if ($platform_id == 0) return json_send(['code' => 'error', 'msg' => "第{$key_num}行平台信息不正确", 'data' => $item]);
+
                 $insert_product_data['first_responsible_person'] = !empty($first_responsible_person_id) ? implode(',', $first_responsible_person_id) : ''; //第一责任人ID集合
                 $insert_product_data['responsible_person'] = !empty($responsible_person_id) ? implode(',', $responsible_person_id) : ''; //责任人ID集合
                 $insert_product_data['platform'] = isset($platform_data[$item[2]]) ? $platform_data[$item[2]] : '0'; // 平台
@@ -1183,7 +1188,7 @@ class LowPriceGoods extends Controller
                 $insert_product_data['online_posting_count'] = isset($item[12]) ? $item[12] : 1; // 累计挂网次数
                 $insert_product_data['continuous_listing_count'] = isset($item[13]) ? $item[13] : 1; // 连续挂网次数
                 $insert_product_data['ultra_low_price'] = isset($item[14]) && is_numeric($item[14]) ? $item[14] : '0'; // 超低监控价格
-                $insert_product_data['is_ultra_low_price'] = isset($item[14]) && is_numeric($item[14]) && $item[14] < $item[8] ? 1:0; // 是否超低价0=否1=是
+                $insert_product_data['is_ultra_low_price'] = isset($item[14]) && is_numeric($item[14]) && $item[14] < $item[8] ? 1 : 0; // 是否超低价0=否1=是
                 $insert_product_data['link_url'] = $item[15]; // 链接地址
                 $insert_product_data['store_name'] = $item[16]; // 店铺名称
                 $insert_product_data['anonymous_store_name'] = isset($item[17]) ? $item[17] : ''; // 匿名店铺名称
@@ -1192,7 +1197,7 @@ class LowPriceGoods extends Controller
                 $insert_product_data['social_credit_code'] = isset($item[20]) ? $item[20] : ''; // 信用代码
                 $insert_product_data['province_name'] = isset($item[21]) ? $item[21] : ''; // 省份
                 $insert_product_data['province_id'] = isset($province_id_data[$province_name]) ? $province_id_data[$province_name]['id'] : 0; // 省份ID
-                $insert_product_data['city_name'] =isset( $item[22]) ? $item[22] : ''; // 城市
+                $insert_product_data['city_name'] = isset($item[22]) ? $item[22] : ''; // 城市
                 $insert_product_data['city_id'] = isset($city_id_data[$city_name]) ? $city_id_data[$city_name]['id'] : 0; // 城市ID
                 $insert_product_data['area_info'] = ''; // 详细地址
                 $insert_product_data['source_responsible_person'] = !empty($source_responsible_person_id) ? implode(',', $source_responsible_person_id) : ''; //溯源责任人ID集合
@@ -1205,7 +1210,7 @@ class LowPriceGoods extends Controller
                 $insert_product_data['shipment_city_id'] = $shipment_city_id; // 店铺市id
                 $insert_product_data['shipment_city_name'] = $shipment_city_name; // 店铺市
                 $insert_product_data['collection_time'] = strtotime($item[25]); // 采集时间
-                $insert_product_data['scrape_date'] = date('Y-m-d',strtotime($item[25])); // 检索采集日期
+                $insert_product_data['scrape_date'] = date('Y-m-d', strtotime($item[25])); // 检索采集日期
                 //插入数据
                 $LowPriceGoodsModel->addLowPriceGoods($insert_product_data, true);
             }

+ 8 - 3
app/Http/Controllers/Manager/Process/ViolationProduct.php

@@ -18,6 +18,8 @@ use App\Servers\Aliyun\Oss;
 use App\Jobs\Manager\Other\ExportViolationProductJobs;
 use App\Models\Manager\Other\DownloadTask as DownloadTaskModel;
 use Illuminate\Support\Facades\Cache;
+use Illuminate\Support\Carbon;
+use App\Jobs\Manager\Process\ScrapeDataProductJobs;
 
 /**
  * 违规处理-违规商品
@@ -725,9 +727,12 @@ class ViolationProduct extends Controller
         } else {
             $company_id  = $admin_company_id;
         }
-        $message_data = ['page' => '1', 'limit' => 50, 'admin_id' => $admin_id, 'is_admin' => $is_admin, 'company_id' => $company_id];
-        ViolationProductJobs::dispatch($message_data);
-        // ViolationProductJobs::dispatchSync($message_data);
+        $start_time =  Carbon::yesterday()->startOfDay()->getTimestamp(); // 昨天开始时间 00:00:00
+        $end_time = Carbon::yesterday()->endOfDay()->getTimestamp(); // 昨天结束时间 23:59:59
+        $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '10000', 'start_time' => $start_time, 'end_time' => $end_time];
+        //执行低价挂网商品数据清洗任务
+        ScrapeDataProductJobs::dispatch($message_data);
+        // ScrapeDataProductJobs::dispatchSync($message_data);
         // 告知结果
         return             json_send(['code' => 'success', 'msg' => '执行成功']);
     }

+ 5 - 5
app/Jobs/Manager/Process/LowPriceGoodsDataJobs.php

@@ -88,9 +88,9 @@ class LowPriceGoodsDataJobs implements ShouldQueue
         $company_id = isset($message_data['company_id']) ? $message_data['company_id'] : 0; //品牌方公司ID
         $product_brand = $message_data['product_brand']; //商品品牌名称
         $product_keywords = $message_data['product_keyword']; //关键字
-        $product_data = isset($this->message_data['product_data']) ? $this->message_data['product_data'] : [];
-        $index_number = isset($this->message_data['index_number']) ? $this->message_data['index_number'] : 0;
-        $data_totle = isset($this->message_data['data_totle']) ? $this->message_data['data_totle'] : 0;
+        $product_data = $message_data['product_data'];
+        $index_number = $message_data['index_number'];
+        $data_totle = $message_data['data_totle'];
 
     //-------------------------------------- 清洗规则(开始) --------------------------------------
         //过滤没有链接数据
@@ -104,7 +104,7 @@ class LowPriceGoodsDataJobs implements ShouldQueue
         $hit_product_name = false;
         if (strpos($product_data['product_name'], $product_name) !== false) {
             $hit_product_name = true;
-        }
+        }      
         if ($hit_product_name != true) return true;
         //数据是否命中规格
         $hit_product_specs = false;
@@ -146,7 +146,7 @@ class LowPriceGoodsDataJobs implements ShouldQueue
                 $hit_platform = true;
             }
         }
-        if ($hit_platform != true) return true;
+        if ($hit_platform != true) return true;       
         //-------------------------------------- 清洗规则(结束) --------------------------------------
 
         //-------------------------------------- 处理营业执照地区信息(开始) --------------------------------------

+ 76 - 33
app/Jobs/Manager/Process/ScrapeDataProductJobs.php

@@ -49,54 +49,76 @@ class ScrapeDataProductJobs implements ShouldQueue
     {
         try {
             $ScrapeDataModel = new ScrapeDataModel();
-            $limit = isset($message_data['limit']) ? $message_data['limit'] : 10000;
-            $page = isset($message_data['page']) ? $message_data['page'] : 1;
-            $company_id = isset($message_data['company_id']) ? $message_data['company_id'] : 0; //品牌方公司ID
+            $limit = isset($this->message_data['limit']) ? $this->message_data['limit'] : 10000;
+            $page = isset($this->message_data['page']) ? $this->message_data['page'] : 1;
+            $company_id = isset($this->message_data['company_id']) ? $this->message_data['company_id'] : 0; //品牌方公司ID
 
             $start_time = $this->message_data['start_time'];
             $end_time = $this->message_data['end_time'];
             $start_time_string = date('Y-m-d H:i:s', $start_time);
             $end_time_string = date('Y-m-d H:i:s', $end_time);
 
+            $key_end_name = 'ScrapeDataProductJobs_end_' . $company_id . '_' . $start_time . '_' . $end_time;
+            $is_end_select = Cache::get($key_end_name);
             $key_name = 'ScrapeDataProductJobs_' . $company_id . '_' . $start_time . '_' . $end_time;
-            $product_datas = Cache::get($key_name);
-            if (empty($product_datas)) {
+            $cache_product_datas = Cache::get($key_name) ?? [];
+            //如果查询完毕,但是没有数据则关闭任务
+            if (!empty($is_end_select) && empty($cache_product_datas)) {
+                return true;
+            }
+            if (!empty($is_end_select) && !empty($cache_product_datas)) {
+                //如果查询完毕了,则直接处理
+                $this->cleanData($company_id, $cache_product_datas);
+            } else if (empty($is_end_select) && !empty($is_end_select)) {
+                //如果进行中,则合并数据
+                $where = [];
+                $where[] = ['insert_time', '>=', $start_time_string];
+                $where[] = ['insert_time', '<=', $end_time_string];
+                $where[] = ['enterprise_id', '=', $company_id];
+                $product_data_info = $ScrapeDataModel->select('*')->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray();
+                $select_product_datas = $product_data_info['data'];
+                $last_page = $product_data_info['last_page'];
+                if (!empty($select_product_datas)) {
+                    //如果查询完毕了,则直接处理//合并数据
+                    $product_datas = array_merge($select_product_datas, $cache_product_datas);
+                    Cache::put($key_name, $product_datas, 360); //缓存6分钟
+                    if ($page < $last_page) {
+                        //继续执行下一页
+                        $message_data['page'] = $page + 1;
+                        $message_data['limit'] = $limit;
+                        ScrapeDataProductJobs::dispatch($message_data)->delay(now()->addSeconds(1));
+                    } else if ($page == $last_page) {
+                        Cache::put($key_end_name, '1', 360); //缓存6分钟
+                        $this->cleanData($company_id, $product_datas);
+                    }
+                }
+            } else if (empty($is_end_select) && empty($is_end_select)) {
                 $where = [];
                 $where[] = ['insert_time', '>=', $start_time_string];
                 $where[] = ['insert_time', '<=', $end_time_string];
-                $where[] = ['min_price', '>=', '0.01'];
-                $where[] = ['number', '>=', '1'];
                 $where[] = ['enterprise_id', '=', $company_id];
-                $product_data_info = $ScrapeDataModel->select('*', DB::raw('ROUND(min_price / NULLIF(number, 0), 2) as unit_price'))
-                    ->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray();
-                $product_datas = $product_data_info['data'];
-                if (empty($product_datas) && $page == 1) {
+                $product_data_info = $ScrapeDataModel->select('*')->where($where)->orderbyDesc('id')->paginate($limit, ['*'], 'page', $page)->toarray();
+                $select_product_datas = $product_data_info['data'];
+                $last_page = $product_data_info['last_page'];
+                if (empty($select_product_datas)) {
                     //如果查询第一页为空,则直接返回
+                    Cache::put($key_name, '', 360); //缓存6分钟
+                    Cache::put($key_end_name, 1, 360); //缓存6分钟
                     return true;
-                } else if (empty($product_datas) && $page > 1) {
-                    //如果查询第二页为空,则直接返回,表示查询完毕,则处理数据清洗任务
-                    $data_totle = count($product_datas);
-                    $index_number = 0;
-                    foreach ($product_datas as $key => $product_data) {
-                        $index_number = $key + 1;
-                        $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '50', 'product_data' => $product_data, 'index_number' => $index_number, 'data_totle' => $data_totle];
-                        LowPriceGoodsJobs::dispatch($message_data);
-                        // LowPriceGoodsJobs::dispatchSync($message_data);
-                        ViolationProductJobs::dispatch($message_data);
-                        // ViolationProductJobs::dispatchSync($message_data);
-                        ViolationCompanyJobs::dispatch($message_data);
-                        // ViolationCompanyJobs::dispatchSync($message_data);
+                } else {
+                    //如果查询完毕了,则直接处理//合并数据
+                    $product_datas = array_merge($select_product_datas, $cache_product_datas);
+                    Cache::put($key_name, $product_datas, 360); //缓存6分钟
+                    if ($page < $last_page) {
+                        //继续执行下一页
+                        $message_data['page'] = $page + 1;
+                        $message_data['limit'] = $limit;
+                        ScrapeDataProductJobs::dispatch($message_data)->delay(now()->addSeconds(1));
+                    } else if ($page == $last_page) {
+                        Cache::put($key_end_name, '1', 360); //缓存6分钟
+                        $this->cleanData($company_id, $product_datas);
                     }
                 }
-                return true;
-            } else {
-                //合并数据
-                $product_datas = array_merge($product_datas, Cache::get($key_name));
-                Cache::put($key_name, $product_datas, 360); //缓存6分钟
-                //继续执行下一页
-                $message_data['page'] = $page + 1;
-                $message_data['limit'] = $limit;
-                ScrapeDataProductJobs::dispatch($message_data)->delay(now()->addSeconds(1));
             }
         } catch (\Exception $e) {
             Log::info('job_error', '数据清洗-读取采集商品数据队列失败', ['data' => $this->message_data, 'error' => $e->getMessage()]);
@@ -104,6 +126,27 @@ class ScrapeDataProductJobs implements ShouldQueue
     }
 
 
+    /**
+     * 执行数据清洗任务
+     * 
+     */
+    public function cleanData($company_id, $cache_product_datas)
+    {
+        $data_totle = count($cache_product_datas);
+        $index_number = 0;
+        foreach ($cache_product_datas as $key => $product_data) {
+            $index_number = $key + 1;
+            $message_data = ['company_id' => $company_id, 'page' => '1', 'limit' => '50', 'product_data' => $product_data, 'index_number' => $index_number, 'data_totle' => $data_totle];
+            LowPriceGoodsJobs::dispatch($message_data);
+            // LowPriceGoodsJobs::dispatchSync($message_data);
+            ViolationProductJobs::dispatch($message_data);
+            // ViolationProductJobs::dispatchSync($message_data);
+            ViolationCompanyJobs::dispatch($message_data);
+            // ViolationCompanyJobs::dispatchSync($message_data);
+        }
+    }
+
+
     public function failed(\Throwable $exception)
     {
         Log::info('job_error', '数据清洗-读取采集商品数据队列完全失败', ['data' => $this->message_data, 'error' => $exception->getMessage()]);