ELK

这才是吊炸天

Elastic

账号/密码

# 开启x-pack
curl -H "Content-Type:application/json" -XPOST  http://127.0.0.1:9200/_xpack/license/start_trial?acknowledge=true
{"acknowledged":true,"trial_was_started":true,"type":"trial"}
# yml开启x-pack
xpack.security.enabled: true
# 设置密码
battery@battery:~$ ./elasticsearch/bin/elasticsearch-setup-passwords interactive
# 输入密码
Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y


Enter password for [elastic]:
passwords must be at least [6] characters long
Try again.
Enter password for [elastic]:
Reenter password for [elastic]:
Passwords do not match.
Try again.
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [apm_system]:
Reenter password for [apm_system]:
Enter password for [kibana]:
Reenter password for [kibana]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Enter password for [remote_monitoring_user]:
Reenter password for [remote_monitoring_user]:
Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]

#后期要修改密码
curl -H "Content-Type:application/json" -XPOST -u elastic 'http://127.0.0.1:9200/_xpack/security/user/elastic/_password' -d '{ "password" : "123456" }'

倒排索引

分词--->记录单词的文档位置与次数,查找单词时,根据构建的倒排索引(出现频率越高分数越高),进而找到对应的文档.

分词

 character filer: 分词前预处理  过滤掉html标签  特殊符号转换
 tokenizer: 分词
 token filter: 标准化 同义词/大小写/单复数转换

内置分词器:

 standard: 不区分大小写 去除a/an/the... 中文单字切分
 simple: 按非字母字符切分
 whitespace:  区分大小写 仅仅按空格切分 不支持中文
 language: 特定语言分词器  不支持中文

中文分词器 analysis-ik:

     cd your-es-root/plugins/ && mkdir ik
     unzip plugin to folder your-es-root/plugins/ik
     restart elastic

restful api

     # 添加索引, 分片指定后不能修改
     PUT /demo
     {
         "settings": {
             "index":{
             "number_of_shards":2,
             "number_of_replicas":0
             }
         }
     }

     #查看索引
     GET /demo2/(_settings)
     {
         "demo2" : {
         "aliases" : { },
         "mappings" : { },
         "settings" : {
         "index" : {
             "creation_date" : "1562489960528",
             "number_of_shards" : "1",
             "number_of_replicas" : "1",
             "uuid" : "nMVq3GvdQCy59B__kvA6Hw",
             "version" : {
               "created" : "7020099"
             },
             "provided_name" : "demo2"
             }
         }
         }
     }

     # 添加文档, 指定id用put/ 自动生成id用post
     PUT /demo/user/1 #废弃的方式
     {
       "first_name":"jone",
       "last_name":"tt",
       "age":20,
       "about":"demo test xxx",
       "interests":["music","song"]
     }
     #新的方式
     PUT /demo/_doc/1  #_doc 没有了type的概念
     {
       "first_name":"jone",
       "last_name":"tt",
       "age":20,
       "about":"demo test xxx",
       "interests":["music","song"]
     }

     # 查询文档
     GET /demo/_doc/1?_source=age #_source限定查询结果key

     # 更新文档
     PUT /demo/_doc/1 #用一份完整的新数据  覆盖旧数据
     POST /demo/_update/2 # 单独更新某个索引的某些字段
     {
       "doc": {
         "first_name": "jone222"
       }
     }

     # 批量查询 mget
     GET /_mget #指定index与id, 如果索引相同, 可以用GET /demo/_mget, json中不需要指定index
     {
       "docs": [
         {
           "_index": "demo",
           "_id": 1,
           "_source":"first_name"
         },
         {
           "_index": "demo",
           "_id": 2
         }
       ]
     }

     # 更简化的写法
     GET /demo/_mget
     {
       "ids":[1,2]
     }

bulk 批量操作

     # bulk 批量操作
     # 格式
     {action:{metadata}}
     {requestbody}

     action: create/update/index(已存在的文档会更新)/delete
     metadata: _index/_type/_id

     POST /demo/_bulk #必须在一行
     {"index":{"_id":1}}
     {"title":"java","price":99.89}
     {"index":{"_id":2}}
     {"title":"php","price":49.89}

文档类型

GET / demo / _mapping
{
  "demo" : {
    "mappings" : {
      "properties" : {
        "price" : {
          "type" : "float"
        },
        "title" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

test 会进行分词 keyword date不分词 date类型: yyyy-MM-dd

文档类型属性mapping

store: false 是否把该字段从_source中分离
index: true 该字段是否要分词
analyzer: "ik" 指定该字段用的分词器, 默认分词器为standard analyzer
search)analyzer: "ik" 搜索时用的分词器, 默认与analyzer一致
boost: 1.23 该字段索引分数加权
ignore_above: 256 该字段超过256个字符会被忽略,不会被索引了

基本类型: integer long short byte double float boolean range 复杂类型: Array Object Netsted(json 数组) Geo IPV4 Completion(提供自动补全建议) token_count....

手动指定 mappings, dynamic可以限制是否允许有其它字段添加到文档中 PUT /demo

{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 5
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text"
      },
      "age": {
        "type": "integer"
      }
    }
  }
}

# 时间戳类型
{
  "mappings": {
      "properties": {
        "postdate": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        }
      }
  }
}

重建索引

只能新建一个索引, 然后旧 index 数据导入到新 index 中. 旧的 index 起别名: PUT /demo/_alias/demo2, 通过 scroll 从旧 index 中取数据, 然后用 bulk 批量插入. 索引不可变: 会影响缓存 性能 倒排索引....

新索引(test2)别名关联

POST /_alias
{
    "action":[
        {"remove":{"index":"demo","alias":"demo2"}},
        {"add":{"index":"test2","alias":"demo2"}}
    ]
}

search 查询

_search?q=name:lisi #模糊匹配
_search?q=name:lisi&sort=age:desc # age倒序

termterms: 查询条件的值不会进行分词 terms:查询某个字段可以指定多个要匹配的单词

GET /demo/_search
{
    "query":{
        "term":{
            "name":"zhangsan"
        }
    }
}

{
    "query":{
        "terms":{
            "interests":["song","music"]
        }
    }
}

分页

GET /demo/_search
{
    "from":0,
    "size":100,
    "query":{}
}

match : 会对查询提交进行分词查询匹配

GET /demo/_search
{
    "query":{
        "match":{
            "name":"zhangsan lisi"  # 这里会分词处理
        }
    }
}


#查询所有的文档
{
    "query":{
        "match_all":{}
    }
}

# 多个字段查询
{
    "query":{
        "multi_match":{
            "name":"zhangsan lisi",
            "age":20
        }
    }
}

{
    "query":{
        "multi_match":{
            "query":"test",
            "fields":["name","hobby"] #从多个字段中匹配查询
        }
    }
}

# 短语匹配
{
    "query":{
        "match_phrase":{
            "interests":"song,music"  # 要和短语顺序完全一致才能匹配到
        }
    }
}

# 过滤字段
{
    "_source":{
        "includes":"a*",
        "excludes":["b*","c*"]
    },
    "query":{}
}

# 排序
{
    "query":{},
    "sort":[
        {
            "age":{
                "order":"desc"
            }
        }
    ]
}

# 范围
{
    "query":{},
    "range":{
        "birthday":{
            "from":"2019-10-01",
            "to":"2019-10-20",
            "include_lower":true,
            "include_upper":false
        }
    }
}

# 通配符查询 wildcard
{
    "query":{
        "wildcard":{
            "name":"te?t*"
        }
    }
}

#模糊查询 fuzzy
# 可以通过boost min_similarity prefix_length max_expansions等属性控制模糊程度
{
    "query":{
        "fuzzy":{
            "name":"tst" #会进行整词的模糊匹配 test 等等都会匹配上
        }
    }
}

# 前缀匹配 match_phrase_prefix

filter 过滤 速度快 filter bool must should must_not

GET /demo/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "name": "张"
          }
        },
        {
          "term": {
            "age": 20
          }
        }
      ]
    }
  }
}

逻辑判断 lt gt exists

聚合查询 aggs sum max min avg cardinality(基数, 此字段互不相同的文档个数) terms(分组)

{
  "size":0,
  "aggs": {
    "total_age": {
      "sum": {
        "field":"age"
      }
    }
  }
}
返回:
"aggregations" : {
    "total_age" : {
      "value" : 137.0
    }
  }
# 多字段聚合
{
  "size":0,
  "aggs": {
    "group_age": {
      "sum": {
        "field":"age"
      }
    },
    "total_bb":{
      "min":{
        "field":"birthday"
      }
    }
  }
}

分组, 嵌套聚合查询
{
  "size":0,
  "aggs": {
    "group_age": {
      "terms": {
        "field":"birthday",
        "order":{
            "total_age":"desc"
        }
      },
      "aggs":{
        "total_age":{
          "sum":{
            "field":"age"
          }
        }
      }
    }
  }
}

constant_score:忽略相关度分数计算

adjacency_matrix :邻接矩阵查询 🥦

多 index 查询

GET /demo,demo2/_search

deep paging

3 个分片, 6000 条数据, 每个分片 2000 条, 查询第 100 页,每页 10 条的数据. coordinate node负多个分片上数据查询的排序, 耗费资源

scroll 滚动查询

GET /demo/_search?scroll=1m
{
  "query": {
    "match_all": {}
  },
  "sort": ["_doc"],
  "size": 10000
}

指定时间窗口scroll=1m,第一次查询,生成了快照, 会返回_scroll_id, 下一次查询要从这个scroll_id开始查询

{
  "scroll": "1m",
  "scroll_id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}

@version 版本控制

乐观锁, 每次更新操作@version+1, http 请求时,可以加上_version, 不匹配的不执行 外部版本控制: &version_type=external, 只有 http 请求携带的_version 大于文档当前的 version 值时, 才会执行更新操作

es 集群

shard 分片负载均衡

数据路由

es 确定文档在哪个分片上.每个文档的routing值(默认_id 值)进行哈希计算,再与与分片总数取余,余数一定是 0-分片总数-1. 查找文档的时候就可以根据总分片数量确定某个文档在哪个分片上

painless

自定义相关度分数

{
  "query": {
    "function_score": {
      "script_score": {
        "script": {
          "lang": "painless",
          "source": """
            int total = 0;
            for (int i = 0; i < doc['goals'].length; ++i) {
              total += doc['goals'][i];
            }
            return total;
          """
        }
      }
    }
  }
}

script_fields 方式

{
  "query": {
    "match_all": {}
  },
  "script_fields": {
    "total_goals": {
      "script": {
        "lang": "painless",
        "source": """
          int total = 0;
          for (int i = 0; i < doc['goals'].length; ++i) {
            total += doc['goals'][i];
          }
          return total;
        """
      }
    }
  }
}

Logstash

input(datasource)-->filter--->output

mysql 数据同步到 es

  1. 安装 gem gem source -a https://gems.ruby-china.com, 安装 插件 logstash-input-jdbclogstash-output-elasticsearch
  2. mysql 驱动
  3. 配置文件: xxx.conf
 input {
     jdbc {
      jdbc_connection_string => "jdbc:mysql://111.111.111.111:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "/home/battery/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.47-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => true
      jdbc_page_size => "50000"
      jdbc_default_timezone =>"Asia/Shanghai"
      statement_filepath => "/home/battery/logstash/config/jdbc.sql"
      schedule => "* * * * *"
      type => "jdbc"
      record_last_run => true
      use_column_value => true
      tracking_column => "updated_at"
      tracking_column_type => "numeric"
      last_run_metadata_path => "/home/battery/logstash/config/logstash_order_last_update"
      clean_run => false
      lowercase_column_names => false
     }
}
 output {
     elasticsearch {
     hosts => "127.0.0.1:9200"
     index => "demo"
     document_id => "%{id}"
     template_overwrite => true
     }
     stdout {
         codec => json_lines
     }
 }
  1. mysql 驱动无法加载的话, 把驱动拷贝到<logstash install dir>/logstash-core/lib/jars/

从 SpringData 开始

elasticsearchspring-data-elasticsearch 版本要对应

  1. 分页
Page<User> findByLastname(String lastname, Pageable pageable);

Slice<User> findByLastname(String lastname, Pageable pageable);

Slice 仅仅可以知道是否有可用的下一个 Slice, Page 会触发 count 计数

  1. Stream 流处理
Stream<User> readAllByFirstnameNotNull();

一个 Stream 中可能包含底层数据存储的特定资源,所以在使用后必须关闭。可以通过调用 close()方法,也可以使用 java 中的 try-with-resources 块

  1. 异步处理
@Async
Future<User> findByFirstname(String firstname);

@Async
CompletableFuture<User> findOneByFirstname(String firstname);

@Async
ListenableFuture<User> findOneByLastname(String lastname);

方法调用时立刻返回, 任务处理完后 future.get()再继续处理

  1. 查询
public interface BookRepository extends ElasticsearchRepository<Book, String> {
    @Query("{"bool" : {"must" : {"field" : {"name" : "?0"}}}}")
    Page<Book> findByName(String name,Pageable pageable);
}

双引号要不要转义? 按照 spring data elasticsearch 文档, 要转义

过滤

SearchQuery searchQuery = new NativeSearchQueryBuilder()
    .withQuery(matchAllQuery())
    .withFilter(boolFilter().must(termFilter("id", documentId)))
    .build();

Page<Demo> sampleEntities =
    elasticsearchTemplate.queryForPage(searchQuery,Demo.class);
  1. Scan and Scroll

大数据量滚动查询

SearchQuery searchQuery = new NativeSearchQueryBuilder()
    .withQuery(matchAllQuery())
    .withIndices("test-index")
    .withTypes("test-type")
    .withPageable(new PageRequest(0,1))
    .build();
//y	Year
//M	Month
//w	Week
//d	Day
//h	Hour
//m	Minute
//s	Second
String scrollId = elasticsearchTemplate.scan(searchQuery,1000,false);
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
boolean hasRecords = true;
while (hasRecords){
    Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L , new ResultsMapper<SampleEntity>()
    {
        @Override
        public Page<SampleEntity> mapResults(SearchResponse response) {
            List<SampleEntity> chunk = new ArrayList<SampleEntity>();
            for(SearchHit searchHit : response.getHits()){
                if(response.getHits().getHits().length <= 0) {
                    return null;
                }
                SampleEntity user = new SampleEntity();
                user.setId(searchHit.getId());
                user.setMessage((String)searchHit.getSource().get("message"));
                chunk.add(user);
            }
            return new PageImpl<SampleEntity>(chunk);
        }
    });
    if(page != null) {
        sampleEntities.addAll(page.getContent());
        hasRecords = page.hasNextPage();
    }
    else{
        hasRecords = false;
    }
    }
}
上次更新: 3 个月前