'','type'=>'','body'=>[]]; private $client = null; private $searchResult = null; // 分组参数 private $groupBy = []; // 排序参数 private $orderBy = []; /** * 指定索引 * * @param \Elasticsearch\Client $client ES链接 * @param String $index 索引名称 * @param String $type 类型 * @return $this * */ public function __construct($client,$index,$type='_doc'){ // 终端 $this->client = $client; // 指定索引 $this->query['index'] = $index; $this->query['type'] = $type; // 链式操作 return $this; } /** * 忽略错误 * * @param Array $ignore 忽略的错误代码 * * @return $this */ public function ignore( array $ignore=[400,401,403,404,408,409]){ // 更新忽略错误 $this->query['client'] = ['ignore' => $ignore]; // 链式操作 return $this; } /** * 组合查询 * * @param Array $column 查询条件 [["field","operator","value"]] | ["field"=>"value"] * @param String $searchType 查询类型,and | or * * @return $this */ public function where($column,$searchType='and'){ // 统一小写 $searchType = strtolower($searchType); // 如果是关联数组 if( count(array_filter(array_keys($column), 'is_string')) > 0 ) { // 索引数组,循环处理 foreach ($column as $field=>$value) { // 调用插入 $searchType == 'or' ? $this->should($field,$value,'=') : $this->must($field,$value,'='); } // 返回当前链接 return $this; } // 索引数组,循环处理 foreach ($column as $value) { // 切割数组 list($field,$operator,$value) = $value; // 统一小写 $operator = strtolower($operator); // 如果有or 调用的是 should if( $searchType == 'or' ) { // 调用或查询 $this->should($field,$value,$operator); // 跳出本次循环 continue; } // 判断操作符号,= in like,调用must if( in_array($operator,['in','like','=','exists']) ) $this->must($field,$value,$operator); // 判断操作符号,!= / not in / 调用的是 must_not if( in_array($operator,['!=','not in','<>','not exists','not between']) ) $this->must_not($field,$value,$operator); // 判断操作符号,> < BETWEEN 调用的是 filter if( in_array($operator,['>=','<=','<','>','between']) ) $this->filter($field,$value,$operator); } // 链式操作 return $this; } /** * Or查询 * * @param String $field 查询字段 * @param String $value 查询值 * @param String $operator 操作符 = in like < > != * * * @return $this */ public function should($field,$value,$operator){ // 如果没有对应的参数,赋值一个空数组,避免插入参数报错 if( !isset($this->query['body']['query']['bool']['should']) ) $this->query['body']['query']['bool']['should'] = []; // 赋值 $this->query['body']['query']['bool']['should'][] = $this->operatorToType($field,$value,$operator); // 链式操作 return $this; } /** * And查询 * * @param String $field 查询字段 * @param String $value 查询值 * @param String $operator 操作符 = in like < > != * * * @return $this */ public function must($field,$value,$operator){ // 如果没有对应的参数,赋值一个空数组,避免插入参数报错 if( !isset($this->query['body']['query']['bool']['must']) ) $this->query['body']['query']['bool']['must'] = []; // 赋值 $this->query['body']['query']['bool']['must'][] = $this->operatorToType($field,$value,$operator); // 链式操作 return $this; } /** * NOT IN 与 != 查询 * * @param String $field 查询字段 * @param String $value 查询值 * @param String $operator 操作符 = in like < > != * * * @return $this */ public function must_not($field,$value,$operator){ // 如果没有对应的参数,赋值一个空数组,避免插入参数报错 if( !isset($this->query['body']['query']['bool']['must_not']) ) $this->query['body']['query']['bool']['must_not'] = []; // 赋值 $this->query['body']['query']['bool']['must_not'][] = $this->operatorToType($field,$value,$operator); // 链式操作 return $this; } /** * 字段筛选 * * @param String $field 查询字段 * @param String $value 查询值 * @param String $operator 操作符 = in like < > != * * * @return $this */ public function filter($field,$value,$operator){ // 如果没有对应的参数,赋值一个空数组,避免插入参数报错 if( !isset($this->query['body']['query']['bool']['filter']) ) $this->query['body']['query']['bool']['filter'] = []; // 赋值 $this->query['body']['query']['bool']['filter'][] = $this->operatorToType($field,$value,$operator); // 链式操作 return $this; } /** * 处理操作符转换查询类型 * * @param String $field 查询字段 * @param String $value 查询值 * @param String $operator 操作符 = in like < > != * */ private function operatorToType($field,$value,$operator){ // 转换大小写 $operator = strtolower($operator); // in = <> != if( in_array($operator,['in','=','<>','!=','not in']) ) return is_array($value) ? ['terms'=>[$field=>$value]] : ['term'=>[$field=>$value]]; // 存在不存在 if( in_array($operator,['exists','not exists']) ) return ['exists'=>['field'=>$field]]; // like if( $operator == 'like' ) return ['match'=>[$field=>$value]]; // 区间、等于 if( $operator == '>' ) return ['range'=>[$field=>['gt'=>$value]]]; if( $operator == '<' ) return ['range'=>[$field=>['lt'=>$value]]]; if( $operator == '>=' ) return ['range'=>[$field=>['gte'=>$value]]]; if( $operator == '<=' ) return ['range'=>[$field=>['lte'=>$value]]]; if( $operator == 'between' ) { // 如果是区间,数值切割 $value = is_array($value) ? $value : explode(',',$value); // 排序,避免区间错误 sort($value); // 返回结果 return ['range'=>[$field=>['gte'=>$value[0],'lte'=>$value[1]]]]; } if( $operator=='not between' ) { // 如果是区间,数值切割 $value = is_array($value) ? $value : explode(',',$value); // 排序,避免区间错误 sort($value); // 返回结果 return ['range'=>[$field=>['gte'=>$value[0],'lte'=>$value[1]]]]; } // 默认是等于 return ['term'=>[$field=>$value]]; } /** * 排序 * * @param String $column 查询字段 * @param String $direction 排序规则 asc | desc * * @return $this */ public function orderBy($column,$direction='asc'){ // 分组 $this->orderBy[] = ['column'=>$column,'direction'=>$direction]; // 获取结果数据 return $this; } /** * 倒序排序 * * @param String $column 查询字段,排序规则desc * * @return $this */ public function orderByDesc($column){ // 分组 $this->orderBy[] = ['column'=>$column,'direction'=>'desc']; // 链式操作 return $this; } /** * 排序数据 * */ private function orderByToParam(){ // 没有分组 if( !$this->groupBy ){ // 如果没有对应的参数,赋值一个空数组,避免插入参数报错 if( !isset($this->query['body']['sort']) ) $this->query['body']['sort'] = []; // 循环分组参数 foreach ( $this->orderBy as $key=>$value ) { // 增加排序 $this->query['body']['sort'][] = [$value['column']=>['order'=>$value['direction']]]; // 删除已添加的 unset($this->orderBy[$key]); } } // 返回数据 return $this; } /** * 分组 * * @param String $column 分组字段 * @param String $alias 分组字段设置别名 * * @return $this */ public function groupBy($column='id',$alias=''){ // 分组 $this->groupBy[] = ['column'=>$column,'alias'=>$alias]; // 获取结果数据 return $this; } /** * 分组数据 * */ private function groupByToParam(){ // 循环分组参数 foreach ( $this->groupBy as $key=>$value ) { // 分组追加 $aggs = $this->paramsPush($value['column'],$value['alias'],$this->query['body'],'terms'); // // 组合条件 $this->query['body'] = $aggs; // 删除已添加的 unset($this->groupBy[$key]); } // 取消分组数据 $this->orderBy = []; // 返回结果 return $this; } /** * 获取字段 * @param Array $columns 查询字段 * * @return $this */ public function select($columns=[]){ // 获取对应的字段 if( $columns ) $this->query['body']['_source'] = $columns; // 链式操作 return $this; } /** * 页码获取 * @param Int $size 每页条数 * @param Int $page 页码,大于0 * * @return $this */ public function page($size,$page = 0){ // 每页条数 $this->query['body']['size'] = $size; // 如果有页码 if( $page > 0 ) $this->query['body']['from'] = ($page-1) * $size; // 链式操作 return $this; } /** * 页码获取 * @param Int $size 每页条数 * * @return $this */ public function limit($size){ // 每页条数 $this->query['body']['size'] = $size; // 链式操作 return $this; } /** * 执行搜索 * */ private function search(){ // 追加排序 $this->orderByToParam(); // 追加分组 $this->groupByToParam(); // 执行搜索后返回结果并添加到属性 $this->searchResult = $this->client->search($this->query); // 返回结果 return $this->searchResult; } /** * 获取列表 * * @param Array $columns 查询字段 * */ public function get($columns=[]){ // 指定字段的话,添加参数 if( $columns ) $this->select($columns); // 执行搜索并获取结果 $result = $this->search(); // 获取结果 $result = isset($result['hits']['hits']) ? array_column($result['hits']['hits'],'_source') : []; // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回结果 return $result; } /** * 获取列表 * * @param Int $perPage 每页数量 * @param Array $columns 查询字段 * * @return \Illuminate\Contracts\Pagination\LengthAwarePaginator * */ public function paginate($perPage = 15, $columns = [], $pageName = 'page', $page = null){ // 指定字段的话,添加参数 if( $columns ) $this->select($columns); // 页码 $page = $page ?: Paginator::resolveCurrentPage($pageName); // 只获取一条 $this->page($perPage,$page); // 执行搜索并获取结果 $result = $this->search(); // 总数 $total = isset($result['hits']['total']['value']) ? $result['hits']['total']['value'] : 0; // 获取结果 $result = isset($result['hits']['hits']) ? array_column($result['hits']['hits'],'_source') : []; // 构建分页 $paginator = new LengthAwarePaginator($result,$total,$perPage, $page, ['path'=>Paginator::resolveCurrentPath(),'pageName'=>$pageName]); // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回结果 return $paginator; } /** * 获取一条数据 * * @param Array $columns 查询字段 * * @return Mixed 结果 * */ public function first($columns=[]){ // 只获取一条 $this->page(1,1); // 指定字段的话,添加参数 if( $columns ) $this->select($columns); // 执行搜索并获取结果 $result = $this->search(); // 获取结果 $result = isset($result['hits']['hits']) ? array_column($result['hits']['hits'],'_source') : []; // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回一条 return isset($result[0]) ? $result[0] : []; } /** * 分组追加 * * @param String $column 分组字段 * @param String $alias 分组字段设置别名 * * @return $this * */ private function paramsPush($column,$alias,$aggs,$type){ // 定义参数 $params = [$type=>['field'=>$column]]; // 第一层增加 if( !empty($this->query['body']['size']) && $type == 'terms' ) $params[$type]['size'] = $this->query['body']['size']; // 排序 foreach ( $this->orderBy as $key => $value) { // 同一个字段使用排序 if( $type == 'terms' ){ // 添加排序 $params[$type]['order'] = [$value['column']=>$value['direction']]; // 已用的排序删除 unset($this->orderBy[$key]); } } // 先判断是否有参数 if( isset($aggs['aggregations']) ){ // 如果有参数,判断是否有分组 foreach ( $aggs['aggregations'] as $key => $value) { // 如果有分组,获取下一层 if( $value ) { // 追加到数据 $temp = $this->paramsPush($column,$alias,$value,$type); // 如果有值 if( isset($temp['aggregations']) ) $aggs['aggregations'][$key] = $temp; }else{ // 组合数据 $aggs['aggregations'][($alias?$alias:$column)] = $params; } } }else{ // 没条件参数的,直接新增一条 $aggs['aggregations'][($alias?$alias:$column)] = $params; } // 返回结果 return $aggs; } /** * 聚合数据处理 * * @param Array $result 最终结果 * @param String $column 结果字段 * * @return mixed */ private function aggsDataResult($result,$column){ // 判断是否是聚合数据,如果值单个统计,返回 if( isset($result[$column]['value']) ) return (int) $result[$column]['value']; // 针对stats,返回多个结果 if( isset($result[$column]['sum']) && isset($result[$column]['count']) && isset($result[$column]['avg']) ) return $result[$column]; // 没有桶数据,返回空数组 if( !array_column($result,'buckets') ) return []; // 列表 return $this->bucketsList($result); } /** * 聚合桶数据递归处理 * @param Array $buckets 最终结果 * @param String $data 临时存储数据 * @param String $list 列表数据 */ private function bucketsList($buckets,$data=[],&$list=[]){ // 循环数据 foreach ( $buckets as $key => $value ) { // 如果还有下一层 if( isset($value['buckets']) ) { foreach ($value['buckets'] as $vv) { // 获取字段值 $data[$key] = $vv['key']; // 继续下一层 $list = $this->bucketsList($vv,$data,$list); } } // 不是值的数据 if( !isset($value['value'] ) && !isset($value['sum']) ) continue; // 获取数据 if( isset($value['value']) ) $data[$key] = $value['value']; // 获取数据 if( isset($value['sum']) ) $data[$key] = $value; // 追加数据 $list[] = $data; } // 返回结果 return $list; } /** * 聚合数据处理 * * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * * @return mixed */ private function aggsDataHandler($column){ // 查询结果 $result = $this->search(); // 判断是否有结果 if( empty($result['aggregations']) ) return []; // 进行查询 $result = $this->aggsDataResult($result['aggregations'],$column); // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回结果 return $result; } /** * 统计个数 * * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * * @return mixed */ public function count($column='id',$alias=''){ // 追加分组 $this->groupByToParam(); // 组合条件 $this->query['body'] = $this->paramsPush($column,$alias,$this->query['body'],'cardinality'); $this->query['body']['size'] = 0; // 执行搜索并获取结果 $result = $this->aggsDataHandler($alias?$alias:$column); // 获取结果数据 return $result; } /** * 最大数据 * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * @return mixed * */ public function max($column,$alias=''){ // 追加分组 $this->groupByToParam(); // 组合条件 $this->query['body'] = $this->paramsPush($column,$alias,$this->query['body'],'max'); $this->query['body']['size'] = 0; // 执行搜索并获取结果 $result = $this->aggsDataHandler($alias?$alias:$column); // 获取结果数据 return $result; } /** * 最小数 * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * @return mixed */ public function min($column,$alias=''){ // 追加分组 $this->groupByToParam(); // 组合条件 $this->query['body'] = $this->paramsPush($column,$alias,$this->query['body'],'min'); $this->query['body']['size'] = 0; // 执行搜索并获取结果 $result = $this->aggsDataHandler($alias?$alias:$column); // 获取结果数据 return $result; } /** * 平均数 * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * @return mixed */ public function avg($column,$alias=''){ // 追加分组 $this->groupByToParam(); // 组合条件 $this->query['body'] = $this->paramsPush($column,$alias,$this->query['body'],'avg'); $this->query['body']['size'] = 0; // 执行搜索并获取结果 $result = $this->aggsDataHandler($alias?$alias:$column); // 获取结果数据 return $result; } /** * 计算总数 * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * @return mixed */ public function sum($column,$alias=''){ // 追加分组 $this->groupByToParam(); // 组合条件 $this->query['body'] = $this->paramsPush($column,$alias,$this->query['body'],'sum'); $this->query['body']['size'] = 0; // 执行搜索并获取结果 $result = $this->aggsDataHandler($alias?$alias:$column); // 获取结果数据 return $result; } /** * 聚合组合 * @param String $column 统计字段 * @param String $alias 统计字段设置别名 * @return mixed */ public function stats($column,$alias=''){ // 追加分组 $this->groupByToParam(); // 组合条件 $this->query['body'] = $this->paramsPush($column,$alias,$this->query['body'],'stats'); $this->query['body']['size'] = 0; // 执行搜索并获取结果 $result = $this->aggsDataHandler($alias?$alias:$column); // 获取结果数据 return $result; } /** * 查询指定ID数据 * * @param Int $id ID * * @return array * */ public function find($id){ // 去除查询参数 unset($this->query['body']); // 拼接参数 $this->query['id'] = $id; // 执行搜索并获取结果 $result = $this->client->get($this->query); // 未查询到数据 if( !isset($result['found']) || !$result['found'] ) return []; // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回一条 return $result['_source']; } /** * SQL查询 * * @param String $query 查询sql语句 * @param String $format 简短标头如json,yaml,smile,cbor,txt,csv,tsv * @param Array $body 更多参数 * String $cursor 游标 * Int $fetch_size 分页条数 */ public function sql($query,$body=[],$format='json'){ // 简短标头,如json,yaml,smile,cbor,txt,csv,tsv $params = ['format'=>$format]; // 查询sql语句 $params['body']['query'] = $query; // 游标 if( !empty($body['cursor']) ) $params['body']['cursor'] = $body['cursor']; // 条数 if( !empty($body['fetch_size']) ) $params['body']['fetch_size'] = $body['fetch_size']; // 查询结果 $result = $this->client->sql()->query($params); // 最终数据 $data = []; // 循环获取数据 foreach ($result['rows'] as $row) { // 重新赋值 $temp = []; // 匹配字段 foreach ($row as $key => $value) { // 获取字段 $temp[$result['columns'][$key]['name']] = $value; } // 获取数据 $data[] = $temp; } // 获取数据 $data = ['data'=>$data]; // 获取数据 if( !empty($body['fetch_size']) ) $data['cursor'] = $result['cursor']; // 进行查询 return $data; } /** * SQL查询分页 * */ public function sqlPaginate($query,$body=[],$format='json',$perPage = 15,$pageName = 'page', $page = null){ // 查询数据 $data = $this->sql($query,$body,$format)['data']; // 总数 $total = count($data); // 页码 $page = $page ?: Paginator::resolveCurrentPage($pageName); // 从哪里开始 $from = ($page-1) * $perPage; // 截取数组的一部分 $data = array_slice($data,$from,$perPage); // 构建分页 $paginator = new LengthAwarePaginator($data,$total,$perPage, $page, ['path'=>Paginator::resolveCurrentPath(),'pageName'=>$pageName]); // 返回结果 return $paginator; } /** * SQL查询分页 * */ public function sqlFirst($query,$body=[],$format='json'){ // 获取总积分 $data = $this->sql($query,$body,$format); // 只获取一条 $data = array_shift($data['data']); // 返回结果 return $data; } /** * SQL查询分页 * */ public function sqlValue($query,$body=[],$format='json'){ // 获取总积分 $data = $this->sql($query,$body,$format); // 只获取一条 $data = array_shift($data['data']); // 取值 $data = array_shift($data); // 返回结果 return $data; } /** * SQL转DSL查询 * * @param String $query 查询sql语句 * */ public function translate($query){ // 查询sql语句 $params['body']['query'] = $query; // 查询结果 $result = $this->client->sql()->translate($params); // 进行查询 return $result; } /** * 添加文档 * * @param Array $columns 修改字段数据 * @param Mixed $id id * @return Mixed 结果 * */ public function insert($columns,$id=null){ // 没有需要修改的数据 if( !$columns ) return false; // 如果指定了ID if( !is_null($id) ) $this->query['id'] = $id; // 修改数据的字段 $this->query['body'] = $columns; // 执行结果 $response = $this->client->index($this->query); // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回一条 return $response; } /** * 删除文档 * * @param Array $columns 修改字段数据 * @param Mixed $id id * @return Mixed 结果 * */ public function delete($id=null){ // 如果指定了ID if( !is_null($id) ) { // 指定ID $this->query['id'] = $id; // 去除查询参数 unset($this->query['body']); } // 执行结果 $response = is_null($id) ? $this->client->deleteByQuery($this->query) : $this->client->delete($this->query); // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回一条 return $response; } /** * 修改文档 * * @param Array $columns 修改字段数据 * @param Mixed $id id * @return Mixed 结果 * */ public function update($columns=[],$id=null){ // 如果指定了ID if( !is_null($id) ) { // 指定ID $this->query['id'] = $id; // 去除查询参数 $this->query['body'] = isset($this->query['body']['script']) ? ['script'=>$this->query['body']['script']] : []; } // 是否存在字段 $this->query['body']['script'] = isset($this->query['body']['script']) ? $this->query['body']['script'] : ''; // 拼接数据 foreach ($columns as $key => $value) { // 拼接参数 $this->query['body']['script'] .= 'ctx._source.'.$key.' = '.$value.';'; } // 执行更新操作 $result = is_null($id) ? $this->client->updateByQuery($this->query) : $this->client->update($this->query); // 清除参数与搜索结果 $this->cleanQuery();$this->cleanSearchResult(); // 返回一条 return $result; } /** * 自增 * * @param String $columns 修改字段 * @param Int $step 步长 * */ public function increment($columns,$step=1){ // 没有需要修改的数据 if( !$columns ) return false; // 是否存在字段 $this->query['body']['script'] = isset($this->query['body']['script']) ? $this->query['body']['script'] : ''; // 拼接参数 $this->query['body']['script'] .= 'ctx._source.'.$columns.' += '.$step.';'; // 返回一条 return $this; } /** * 自减 * * @param String $columns 修改字段 * @param Int $step 步长 * */ public function decrement($columns,$step=1){ // 没有需要修改的数据 if( !$columns ) return false; // 是否存在字段 $this->query['body']['script'] = isset($this->query['body']['script']) ? $this->query['body']['script'] : ''; // 拼接参数 $this->query['body']['script'] .= 'ctx._source.'.$columns.' -= '.$step.';'; // 返回一条 return $this; } /** * 清除参数 * * @return $this * */ private function cleanQuery(){ // 参数赋值空数组 $this->query = ['index'=>'','type'=>'','body'=>[]]; // 链式操作 return $this; } /** * 清除搜索结果 * * @return $this * */ private function cleanSearchResult(){ // 参数赋值空数组 $this->searchResult = null; // 链式操作 return $this; } }