Skip to content

一、概述

1.1 概述

elasticsearch是一个基于Lucene的高扩展的分布式搜索服务器,支持开箱即用。

elasticsearch隐藏了Lucene的复杂性,对外提供Restful 接口来操作索引、搜索。

优点

扩展性好,可部署上百台服务器集群,处理PB级数据

近实时的去索引数据、搜索数据

功能

  • 分布式的搜索引擎和数据分析引擎
  • 全文检索,结构化检索,数据分析
  • 对海量数据进行近实时的处理

使用场景

  • 搜索,全文检索,高亮显示,搜索推荐
  • 日志数据分析
  • 数据监控
  • BI系统

1.1 原理

下图是ElasticSearch的索引结构,下边黑色部分是物理结构,上边黄色部分是逻辑结构,逻辑结构也是为了更好的 去描述ElasticSearch的工作原理及去使用物理结构中的索引文件。

image-20200921213047622

逻辑结构部分是一个倒排索引表:

1、将要搜索的文档内容分词,所有不重复的词组成分词列表。

2、将搜索的文档最终以Document方式存储起来。

3、每个词和docment都有关联。

1.2 项目结构

bin:脚本目录,包括:启动、停止等可执行脚本
confifig:配置文件目录
data:索引目录,存放索引文件的地方
logs:日志目录
modules:模块目录,包括了es的功能模块
plugins :插件目录,es支持插件机制

1.3 mysql与elasticsearch

  • Mysql:擅长事务类型操作,可以确保数据的安全和一致性

  • Elasticsearch:擅长海量数据的搜索、分析、计算

MySQLElasticsearch说明
TableIndex索引(index),就是文档的集合,类似数据库的表(table)
RowDocument文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式
ColumnField字段(Field),就是JSON文档中的字段,类似数据库中的列(Column)
SchemaMappingMapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema)
SQLDSLDSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD

1.4 核心概念

1 NRT(Near Realtime):近实时

两方面:

  • 写入数据时,过1秒才会被搜索到,因为内部在分词、录入索引。

  • es搜索时:搜索和分析数据需要秒级出结果。

2 Cluster:集群

包含一个或多个启动着es实例的机器群。通常一台机器起一个es实例。同一网络下,集名一样的多个es实例自动组成集群,自动均衡分片等行为。默认集群名为“elasticsearch”。

3 Node:节点

每个es实例称为一个节点。节点名自动分配,也可以手动配置。

4 Index:索引

包含一堆有相似结构的文档数据。

索引创建规则:

  • 仅限小写字母

  • 不能包含\、/、 *、?、"、<、>、|、#以及空格符等特殊符号

  • 从7.0版本开始不再包含冒号

  • 不能以-、_或+开头

  • 不能超过255个字节(注意它是字节,因此多字节字符将计入255个限制)

5 Document:文档

es中的最小数据单元。一个document就像数据库中的一条记录。通常以json格式显示。多个document存储于一个索引(Index)中。

json
book document

{
  "book_id": "1",
  "book_name": "java编程思想",
  "book_desc": "从Java的基础语法到最高级特性(深入的[面向对象](https://baike.baidu.com/item/面向对象)概念、多线程、自动项目构建、单元测试和调试等),本书都能逐步指导你轻松掌握。",
  "category_id": "2",
  "category_name": "java"
}
6 Field:字段

就像数据库中的列(Columns),定义每个document应该有的字段。

7 Type:类型

每个索引里都可以有一个或多个type,type是index中的一个逻辑数据分类,一个type下的document,都有相同的field。

注意:6.0之前的版本有type(类型)概念,type相当于关系数据库的表,ES官方将在ES9.0版本中彻底删除type。本教程typy都为_doc。

8 shard:分片

index数据过大时,将index里面的数据,分为多个shard,分布式的存储在各个服务器上面。可以支持海量数据和高并发,提升性能和吞吐量,充分利用多台机器的cpu。

9 replica:副本

在分布式环境下,任何一台机器都会随时宕机,如果宕机,index的一个分片没有,导致此index不能搜索。所以,为了保证数据的安全,我们会将每个index的分片经行备份,存储在另外的机器上。保证少数机器宕机es集群仍可以搜索。

能正常提供查询和插入的分片我们叫做主分片(primary shard),其余的我们就管他们叫做备份的分片(replica shard)。

es6默认新建索引时,5分片,2副本,也就是一主一备,共10个分片。所以,es集群最小规模为两台。

1.5 倒排索引与正向

正向索引是最传统的,根据id索引的方式。但根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程

倒排索引则相反,是先找到用户要搜索的词条,根据词条得到保护词条的文档的id,然后根据id获取文档。是根据词条找文档的过程

倒排索引

倒排索引中有两个非常重要的概念:

  • 文档(Document):用来搜索的数据,其中的每一条数据就是一个文档。例如一个网页、一个商品信息
  • 词条(Term):对文档数据或用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条。例如:我是中国人,就可以分为:我、是、中国人、中国、国人这样的几个词条

创建倒排索引是对正向索引的一种特殊处理,流程如下:

  • 将每一个文档的数据利用算法分词,得到一个个词条
  • 创建表,每行数据包括词条、词条所在文档id、位置等信息
  • 因为词条唯一性,可以给词条创建索引,例如hash表结构索引

倒排索引的搜索流程如下(以搜索"华为手机"为例):

1)用户输入条件"华为手机"进行搜索。

2)对用户输入内容分词,得到词条:华为手机

3)拿着词条在倒排索引中查找,可以得到包含词条的文档id:1、2、3。

4)拿着文档id到正向索引中查找具体文档。

优缺点

正向索引

  • 优点:
    • 可以给多个字段创建索引
    • 根据索引字段搜索、排序速度非常快
  • 缺点:
    • 根据非索引字段,或者索引字段中的部分词条查找时,只能全表扫描。

倒排索引

  • 优点:
    • 根据词条搜索、模糊搜索时,速度非常快
  • 缺点:
    • 只能给词条创建索引,而不是字段
    • 无法根据字段做排序

1.6 lucene和elasticsearch的关系

Lucene:最先进、功能最强大的搜索库,直接基于lucene开发,非常复杂,api复杂

Elasticsearch:基于lucene,封装了许多lucene底层功能,提供简单易用的restful api接口和许多语言的客户端,如java的高级客户端(Java High Level REST Client)和底层客户端(Java Low Level REST Client

二、应用示例

2.1 Spring Data Elasticsearch

Spring Data项目对Elasticsearch做了支持,其目的就是简化对Elasticsearch的操作。 地址:https://spring.io/projects/spring-data-elasticsearch

ES客户端

TransportClientES提供的传统客户端,官方计划8.0版本删除此客户端。
RestClientRestClient是官方推荐使用的,它包括两种:Java Low Level REST Client和 Java High Level REST Client。
ES在6.0之后提供 Java High Level REST Client, 两种客户端官方更推荐使用 Java High Level REST Client,不过当 前它还处于完善中,有些功能还没有。

依赖

xml
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-client</artifactId>
	<version>6.5.4</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
	<version>6.5.4</version>
</dependency>

配置

properties
spring.application.name = itcast-elasticsearch
spring.data.elasticsearch.cluster-name=es-itcast-cluster
spring.data.elasticsearch.clusternodes=172.16.55.185:9300,172.16.55.185:9301,172.16.55.185:9302
# 这里要注意,使用的端口是9300,而并非9200,原因是9200是RESTful端口,9300是API端口。


<NolebasePageProperties />




spring.elasticsearch.rest.uris: http://localhost:9200

配置类

java
package com.xuecheng.search.config; 
import org.apache.http.HttpHost; 
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class ElasticsearchConfig { 
    @Value("${xuecheng.elasticsearch.hostlist}") 
    private String hostlist;
    @Bean 
    public RestHighLevelClient restHighLevelClient(){
        //解析hostlist配置信息 
        String[] split = hostlist.split(","); 
        //创建HttpHost数组,其中存放es主机和端口的配置信息 
        HttpHost[] httpHostArray = new HttpHost[split.length]; 
        for(int i=0;i<split.length;i++){ 
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], 
                                            Integer.parseInt(item.split(":") [1]), "http");
        }
        //创建RestHighLevelClient客户端 
        return new RestHighLevelClient(RestClient.builder(httpHostArray));
    }
    //项目主要使用RestHighLevelClient,对于低级的客户端暂时不用
    @Bean 
    public RestClient restClient(){
        //解析hostlist配置信息 
        String[] split = hostlist.split(","); 
        //创建HttpHost数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length]; 
        for(int i=0;i<split.length;i++){ 
            String item = split[i]; 
            httpHostArray[i] = new HttpHost(item.split(":")[0], 
                                            Integer.parseInt(item.split(":") [1]), "http");
        }
        return RestClient.builder(httpHostArray).build(); 
    } 
}

启动类

java
@SpringBootApplication 
@EntityScan("com.xuecheng.framework.domain.search")
//扫描实体类
@ComponentScan(basePackages={"com.xuecheng.api"})
//扫描接口
@ComponentScan(basePackages={"com.xuecheng.search"})
//扫描本项目下的所有类 
@ComponentScan(basePackages={"com.xuecheng.framework"})
//扫描common下的所有类 
public class SearchApplication { 
    public static void main(String[] args) throws Exception {
        SpringApplication.run(SearchApplication.class, args);
    }
}

代码

Low Level Client

java
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;

//新增数据
public void testSave(){
    User user = new User();
    user.setId(1001L);
    user.setAge(20);
    user.setName("张三");
    user.setHobby("足球、篮球、听音乐");
    IndexQuery indexQuery = new IndexQueryBuilder().withObject(user).build();
    String index = this.elasticsearchTemplate.index(indexQuery);
    System.out.println(index);
}

//批量插入
public void testBulk() {
    List list = new ArrayList();
    for (int i = 0; i < 5000; i++) {
    User user = new User();
    user.setId(1001L + i);
    user.setAge(i % 50 + 10);
    user.setName("张三" + i);
    user.setHobby("足球、篮球、听音乐");
    IndexQuery indexQuery = newIndexQueryBuilder().withObject(user).build();
    list.add(indexQuery);
    }
    Long start = System.currentTimeMillis();
    this.elasticsearchTemplate.bulkIndex(list);
    System.out.println("用时:" + (System.currentTimeMillis() - start)); //用时:7836
}

//更新数据
public void testUpdate() {
    IndexRequest indexRequest = new IndexRequest();
    indexRequest.source("age", "30");
    UpdateQuery updateQuery = new UpdateQueryBuilder()
        .withId("1001")
        .withClass(User.class)
        .withIndexRequest(indexRequest).build();
    this.elasticsearchTemplate.update(updateQuery);
}

//删除数据
public void testDelete(){
	this.elasticsearchTemplate.delete(User.class, "1001");
}

//搜索
public void testSearch(){
    PageRequest pageRequest = PageRequest.of(1,10); //设置分页参数
    SearchQuery searchQuery = new NativeSearchQueryBuilder()
        .withQuery(QueryBuilders.matchQuery("name", "张三")) // match查询
        .withPageable(pageRequest)
        .build();
    AggregatedPage<User> users = this.elasticsearchTemplate.queryForPage(searchQuery, User.class);
    System.out.println("总页数:" + users.getTotalPages()); //获取总页数
    for (User user : users.getContent()) { // 获取搜索到的数据
    	System.out.println(user);
    }
}

high-level-client 代码

高级别客户端操作是通过发送请求的方式完成所有操作的,ES针对各种不同的操作,设定了各式各样的请求对象。

java
@SpringBootTest
class Springboot18EsApplicationTests {
    @BeforeEach		//在测试类中每个操作运行前运行的方法
    void setUp() {
        HttpHost host = HttpHost.create("http://localhost:9200");
        RestClientBuilder builder = RestClient.builder(host);
        client = new RestHighLevelClient(builder);
    }

    @AfterEach		//在测试类中每个操作运行后运行的方法
    void tearDown() throws IOException {
        client.close();
    }

    private RestHighLevelClient client;

    // 创建索引
    @Test
    void testCreateIndex() throws IOException {
        CreateIndexRequest request = new CreateIndexRequest("books");
        client.indices().create(request, RequestOptions.DEFAULT);
    }
    // 通过IK创建索引
    @Test
    void testCreateIndexByIK() throws IOException {
        CreateIndexRequest request = new CreateIndexRequest("books");
        String json = "{\n" +
                "    \"mappings\":{\n" +
                "        \"properties\":{\n" +
                "            \"id\":{\n" +
                "                \"type\":\"keyword\"\n" +
                "            },\n" +
                "            \"name\":{\n" +
                "                \"type\":\"text\",\n" +
                "                \"analyzer\":\"ik_max_word\",\n" +
                "                \"copy_to\":\"all\"\n" +
                "            },\n" +
                "            \"type\":{\n" +
                "                \"type\":\"keyword\"\n" +
                "            },\n" +
                "            \"description\":{\n" +
                "                \"type\":\"text\",\n" +
                "                \"analyzer\":\"ik_max_word\",\n" +
                "                \"copy_to\":\"all\"\n" +
                "            },\n" +
                "            \"all\":{\n" +
                "                \"type\":\"text\",\n" +
                "                \"analyzer\":\"ik_max_word\"\n" +
                "            }\n" +
                "        }\n" +
                "    }\n" +
                "}";
        //设置请求中的参数
        request.source(json, XContentType.JSON);
        client.indices().create(request, RequestOptions.DEFAULT);
    }
    
    @Test
    //添加文档
    void testCreateDoc() throws IOException {
        Book book = bookDao.selectById(1);
        IndexRequest request = new IndexRequest("books").id(book.getId().toString());
        String json = JSON.toJSONString(book);
        request.source(json,XContentType.JSON);
        client.index(request,RequestOptions.DEFAULT);
    }
    
    @Test
    //批量添加文档
    void testCreateDocAll() throws IOException {
        List<Book> bookList = bookDao.selectList(null);
        BulkRequest bulk = new BulkRequest();
        for (Book book : bookList) {
            IndexRequest request = new IndexRequest("books").id(book.getId().toString());
            String json = JSON.toJSONString(book);
            request.source(json,XContentType.JSON);
            bulk.add(request);
        }
        client.bulk(bulk,RequestOptions.DEFAULT);
    }
    
    @Test
    //按id查询
    void testGet() throws IOException {
        GetRequest request = new GetRequest("books","1");
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        String json = response.getSourceAsString();
        System.out.println(json);
    }
    
    @Test
    //按条件查询
    void testSearch() throws IOException {
        SearchRequest request = new SearchRequest("books");

        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(QueryBuilders.termQuery("all","spring"));
        request.source(builder);

        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            //System.out.println(source);
            Book book = JSON.parseObject(source, Book.class);
            System.out.println(book);
        }
    }
    
}

三、配置

ES的配置文件的地址根据安装形式的不同而不同:

使用zip、tar安装,配置文件的地址在安装目录的confifig下。

使用RPM安装,配置文件在/etc/elasticsearch下。

使用MSI安装,配置文件的地址在安装目录的confifig下,并且会自动将confifig目录地址写入环境变量 ES_PATH_CONF。

配置文件如下:

elasticsearch.yml : 用于配置Elasticsearch运行参数

jvm.options : 用于配置Elasticsearch JVM设置

log4j2.properties: 用于配置Elasticsearch日志

3.1 elasticsearch.yml

配置实例

yaml
cluster.name: xuecheng 
node.name: xc_node_1 
network.host: 0.0.0.0 
http.port: 9200 
transport.tcp.port: 9300 
node.master: true 
node.data: true 
#discovery.zen.ping.unicast.hosts: ["0.0.0.0:9300", "0.0.0.0:9301", "0.0.0.0:9302"] 
discovery.zen.minimum_master_nodes: 1 
bootstrap.memory_lock: false 
node.max_local_storage_nodes: 1 
path.data: D:\ElasticSearch\elasticsearch‐6.2.1\data 
path.logs: D:\ElasticSearch\elasticsearch‐6.2.1\logs 

http.cors.enabled: true 
http.cors.allow‐origin: /.*/
cluster.name配置elasticsearch的集群名称,默认是elasticsearch。建议修改成一个有意义的名称。
node.name节点名,通常一台物理服务器就是一个节点,es会默认随机指定一个名字,建议指定一个有意义的名称,方便管理一个或多个节点组成一个cluster集群,集群是一个逻辑的概念,节点是物理概念.
path.conf设置配置文件的存储路径,tar或zip包安装默认在es根目录下的confifig文件夹,rpm安装默认在/etc/
elasticsearch path.data设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径, 用逗号隔开。
path.logs设置日志文件的存储路径,默认是es根目录下的logs文件夹
path.plugins设置插件的存 放路径,默认是es根目录下的plugins文件夹
bootstrap.memory_lock: true设置为true可以锁住ES使用的内存,避免内存与swap分区交换数据。
network.host设置绑定主机的ip地址,设置为0.0.0.0表示绑定任何ip,允许外网访问,生产环境建议设置为具体 的ip。
http.port: 9200设置对外服务的http端口,默认为9200。
transport.tcp.port: 9300集群结点之间通信端口
node.master指定该节点是否有资格被选举成为master结点,默认是true,如果原来的master宕机会重新选举新 的master。 node.data: 指定该节点是否存储索引数据,默认为true。
discovery.zen.ping.unicast.hosts: ["host1:port", "host2:port", "..."]设置集群中master节点的初始列表。
discovery.zen.ping.timeout: 3s设置ES自动发现节点连接超时的时间,默认为3秒,如果网络延迟高可设置大些。
discovery.zen.minimum_master_nodes主结点数量的最少值 ,此值的公式为:(master_eligible_nodes / 2) + 1 ,比如:有3个符合要求的主结点,那么这 里要设置为2。
node.max_local_storage_nodes单机允许的最大存储结点数,通常单机启动一个结点建议设置为1,开发环境如果单机启动多个节点可设置大于1.

3.2 jvm.options

设置最小及最大的JVM堆内存大小:

在jvm.options中设置 -Xms和-Xmx:

1) 两个值设置为相等

2) 将 Xmx 设置为不超过物理内存的一半。

3.3 log4j2.properties

日志文件设置,ES使用log4j,注意日志级别的配置。

3.4 系统配置

在linux根据系统资源情况,可将每个进程最多允许打开的文件数设置大些。

shell
## 命令方式
# 查询当前文件数
su limit -n 
#使用命令设置limit: 
sudo su 
ulimit ‐n 65536 
su elasticsearch

## 配置文件方式
# 在/etc/security/limits.conf 中添加配置
elasticsearch ‐ nofile 65536

四、ES相关操作

4.1 索引库

ES的索引库是一个逻辑概念,它包括了分词列表及文档列表,同一个索引库中存储了相同类型的文档。它就相当于 MySQL中的表,或相当于Mongodb中的集合。

关于索引这个语:

索引(名词):ES是基于Lucene构建的一个搜索服务,它要从索引库搜索符合条件索引数据。

索引(动词):索引库刚创建起来是空的,将数据添加到索引库的过程称为索引。

索引操作 restful方式

restful接口方式

json
// put http://localhost:9200/索引库名称   创建索引
{ 
    "settings":{ 
    "index":{ 
        "number_of_shards":1,	//number_of_shards:设置分片的数量,在集群中通常设置多个分片,表示一个索引库将拆分成多片分别存储不同 的结点,提高了ES的处理能力和高可用性,入门程序使用单机环境,这里设置为1。
        "number_of_replicas":0	//number_of_replicas:设置副本的数量,设置副本是为了提高ES的高可靠性,单机环境设置为0.
    } 
    } 
}

// GET /索引库名  查询索引

// PUT /索引库名/_mapping  修改索引

// DELETE /索引库名   删除索引

索引操作 java方式

java
//创建索引请求对象,并设置索引名称 
CreateIndexRequest createIndexRequest = new CreateIndexRequest("xc_course"); 
//设置索引参数 
createIndexRequest.settings(Settings.builder().put("number_of_shards",1) .put("number_of_replicas",0)); 


import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;

/**

- @author Administrator

- @version 1.0
  **/
  @SpringBootTest
  @RunWith(SpringRunner.class)
  public class TestIndex {

  @Autowired
  RestHighLevelClient client;

//    @Autowired
//    RestClient restClient;

​```
//创建索引
@Test
public void testCreateIndex() throws IOException {
    //创建索引对象
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("itheima_book");
    //设置参数
    createIndexRequest.settings(Settings.builder().put("number_of_shards", "1").put("number_of_replicas", "0"));
    //指定映射1
    createIndexRequest.mapping(" {\n" +
            " \t\"properties\": {\n" +
            "            \"name\":{\n" +
            "             \"type\":\"keyword\"\n" +
            "           },\n" +
            "           \"description\": {\n" +
            "              \"type\": \"text\"\n" +
            "           },\n" +
            "            \"price\":{\n" +
            "             \"type\":\"long\"\n" +
            "           },\n" +
            "           \"pic\":{\n" +
            "             \"type\":\"text\",\n" +
            "             \"index\":false\n" +
            "           }\n" +
            " \t}\n" +
            "}", XContentType.JSON);

    //指定映射2
​```

//        Map<String, Object> message = new HashMap<>();
//        message.put("type", "text");
//        Map<String, Object> properties = new HashMap<>();
//        properties.put("message", message);
//        Map<String, Object> mapping = new HashMap<>();
//        mapping.put("properties", properties);
//        createIndexRequest.mapping(mapping);

​```
    //指定映射3
​```

//        XContentBuilder builder = XContentFactory.jsonBuilder();
//        builder.startObject();
//        {
//            builder.startObject("properties");
//            {
//                builder.startObject("message");
//                {
//                    builder.field("type", "text");
//                }
//                builder.endObject();
//            }
//            builder.endObject();
//        }
//        builder.endObject();
//        createIndexRequest.mapping(builder);

​```
    //设置别名
    createIndexRequest.alias(new Alias("itheima_index_new"));

    // 额外参数
    //设置超时时间
    createIndexRequest.setTimeout(TimeValue.timeValueMinutes(2));
    //设置主节点超时时间
    createIndexRequest.setMasterTimeout(TimeValue.timeValueMinutes(1));
    //在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示
    createIndexRequest.waitForActiveShards(ActiveShardCount.from(2));
    createIndexRequest.waitForActiveShards(ActiveShardCount.DEFAULT);

    //操作索引的客户端
    IndicesClient indices = client.indices();
    //执行创建索引库
    CreateIndexResponse createIndexResponse = indices.create(createIndexRequest, RequestOptions.DEFAULT);

    //得到响应(全部)
    boolean acknowledged = createIndexResponse.isAcknowledged();
    //得到响应 指示是否在超时前为索引中的每个分片启动了所需数量的碎片副本
    boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();

    System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!" + acknowledged);
    System.out.println(shardsAcknowledged);

}

//异步新增索引
@Test
public void testCreateIndexAsync() throws IOException {
    //创建索引对象
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("itheima_book2");
    //设置参数
    createIndexRequest.settings(Settings.builder().put("number_of_shards", "1").put("number_of_replicas", "0"));
    //指定映射1
    createIndexRequest.mapping(" {\n" +
            " \t\"properties\": {\n" +
            "            \"name\":{\n" +
            "             \"type\":\"keyword\"\n" +
            "           },\n" +
            "           \"description\": {\n" +
            "              \"type\": \"text\"\n" +
            "           },\n" +
            "            \"price\":{\n" +
            "             \"type\":\"long\"\n" +
            "           },\n" +
            "           \"pic\":{\n" +
            "             \"type\":\"text\",\n" +
            "             \"index\":false\n" +
            "           }\n" +
            " \t}\n" +
            "}", XContentType.JSON);

    //监听方法
    ActionListener<CreateIndexResponse> listener =
            new ActionListener<CreateIndexResponse>() {

                @Override
                public void onResponse(CreateIndexResponse createIndexResponse) {
                    System.out.println("!!!!!!!!创建索引成功");
                    System.out.println(createIndexResponse.toString());
                }

                @Override
                public void onFailure(Exception e) {
                    System.out.println("!!!!!!!!创建索引失败");
                    e.printStackTrace();
                }
            };

    //操作索引的客户端
    IndicesClient indices = client.indices();
    //执行创建索引库
    indices.createAsync(createIndexRequest, RequestOptions.DEFAULT, listener);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
​```

​```
}
​```

​```
//删除索引库
@Test
public void testDeleteIndex() throws IOException {
    //删除索引对象
    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("itheima_book2");
    //操作索引的客户端
    IndicesClient indices = client.indices();
    //执行删除索引
    AcknowledgedResponse delete = indices.delete(deleteIndexRequest, RequestOptions.DEFAULT);
    //得到响应
    boolean acknowledged = delete.isAcknowledged();
    System.out.println(acknowledged);

}

//异步删除索引库
@Test
public void testDeleteIndexAsync() throws IOException {
    //删除索引对象
    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("itheima_book2");
    //操作索引的客户端
    IndicesClient indices = client.indices();

    //监听方法
    ActionListener<AcknowledgedResponse> listener =
            new ActionListener<AcknowledgedResponse>() {
                @Override
                public void onResponse(AcknowledgedResponse deleteIndexResponse) {
                    System.out.println("!!!!!!!!删除索引成功");
                    System.out.println(deleteIndexResponse.toString());
                }

                @Override
                public void onFailure(Exception e) {
                    System.out.println("!!!!!!!!删除索引失败");
                    e.printStackTrace();
                }
            };
    //执行删除索引
    indices.deleteAsync(deleteIndexRequest, RequestOptions.DEFAULT, listener);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

// Indices Exists API
@Test
public void testExistIndex() throws IOException {
    GetIndexRequest request = new GetIndexRequest("itheima_book");
    request.local(false);//从主节点返回本地信息或检索状态
    request.humanReadable(true);//以适合人类的格式返回结果
    request.includeDefaults(false);//是否返回每个索引的所有默认设置

    boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
    System.out.println(exists);
}
​```

​```
// Indices Open API
@Test
public void testOpenIndex() throws IOException {
    OpenIndexRequest request = new OpenIndexRequest("itheima_book");

    OpenIndexResponse openIndexResponse = client.indices().open(request, RequestOptions.DEFAULT);
    boolean acknowledged = openIndexResponse.isAcknowledged();
    System.out.println("!!!!!!!!!"+acknowledged);
}

// Indices Close API
@Test
public void testCloseIndex() throws IOException {
    CloseIndexRequest request = new CloseIndexRequest("index");
    AcknowledgedResponse closeIndexResponse = client.indices().close(request, RequestOptions.DEFAULT);
    boolean acknowledged = closeIndexResponse.isAcknowledged();
    System.out.println("!!!!!!!!!"+acknowledged);

}
}

4.2 映射

在索引中每个文档都包括了一个或多个fifield,创建映射就是向索引库中创建fifield的过程,下边是document和fifield 与关系数据库的概念的类比:

文档(Document)----------------Row记录

字段(Field)-------------------Columns 列

注意:6.0之前的版本有type(类型)概念,type相当于关系数据库的表,ES官方将在ES9.0版本中彻底删除type。

映射规则

字段判断映射的规则

JSON typeField type
Boolean: true or false"boolean"
Whole number: 123"long"
Floating point: 123.45"double"
String, valid date: "2014-09-15""date"
String: "foo bar""string"

支持类型

Elasticsearch中支持的类型如下:

类型表示的数据类型
Stringstring , text , keyword
Whole numberbyte , short , integer , long
Floating pointfloat , double
Booleanboolean
Dateda
  • string类型在ElasticSearch 旧版本中使用较多,从ElasticSearch 5.x开始不再支持string,由text和keyword类型替代。
  • text 类型,当一个字段是要被全文搜索的,比如Email内容、产品描述,应该使用text类型。设置text类型以后,字段内容会被分析,在生成倒排索引以前,字符串会被分析器分成一个一个词项。text类型的字段不用于排序,很少用于聚合。
  • keyword类型适用于索引结构化的字段,比如email地址、主机名、状态码和标签。如果字段需要进行过滤(比如查找已发布博客中status属性为published的文章)、排序、聚合。keyword类型的字段只能通过精确值搜索到。

特殊字段类型

  • location:地理坐标,里面包含精度、纬度
  • all:一个组合字段,其目的是将多字段的值 利用copy_to合并,提供给用户搜索

地理坐标说明:

copy_to说明:

映射操作 restful接口

映射操作及各映射的属性配置 restful接口

json
// 创建映射
// post http://localhost:9200/索引库名称/类型名称/_mapping 
{ 
    "properties": {
        "name": { 
            "type": "text", // 字符串包括text和keyword两种类型
            "analyzer":"ik_max_word" ,// 通过analyzer属性指定分词器。
             "search_analyzer":"ik_smart", // 不指定,索引和搜索都使用ik_max_word,指定的搜索的使用search_analyzer
            							// 对于ik分词器建议是索引时使用ik_max_word将搜索内容进行细粒度分词,搜索时使用ik_smart提高搜索精确性
            "index":false,	// 指定是否索引。 默认为index=true,即要进行索引,只有进行索引才可以从索引库搜索到。 不需要进行索引的可以设置为false
			"store": false	// 是否在source之外存储,每个文档索引后会在 ES中保存一份原始文档,存放在"_source"中,一般情况下不需要设置 store为true,
        },
        "timestamp": {
            "type": "date",	// 日期类型不用设置分词器  通常日期类型的字段用于排序
            "format": "yyyy‐MM‐dd HH:mm:ss||yyyy‐MM‐dd" 	// 通过format设置日期格式
        },
        "studymodel": { 
            "type": "keyword" 	//关键字字段,通常搜索keyword是按照整体搜索,不进行分词,如:邮政编码、手机号码、身份证等。该字段通常用于过虑、排序、聚合等。
        },
        "price": { 
            "type": "scaled_float", 	// 数值类型,尽量选择范围小的类型,提高搜索效率 其他类型?
            "scaling_factor": 100 		// 对于浮点数尽量用比例因子  设置为100回将存入的值乘以比例因子存入,仍有小数会取整(四舍五入)
        },
    } 
}

// 查询所有索引的映射
GET: http://localhost:9200/_mapping 

// 更新映射
//映射创建成功可以添加新字段,已有字段不允许更新

// 删除映射 
// 通过删除索引来删除映射。

映射操作 java方式

映射操作 java方式

java
@SpringBootTest 
@RunWith(SpringRunner.class) 
public class TestIndex { 
    @Autowired 
    RestHighLevelClient client; 
    @Autowired 
    RestClient restClient; 
    //创建索引库 
    @Test 
    public void testCreateIndex() throws IOException { 
        //创建索引请求对象,并设置索引名称 
        CreateIndexRequest createIndexRequest = new CreateIndexRequest("xc_course"); 
        //设置索引参数 
        createIndexRequest.settings(Settings.builder().put("number_of_shards",1) .put("number_of_replicas",0)); 
        //设置映射 
        createIndexRequest.mapping("doc",
                                   " {\n" + " \t\"properties\": {\n" 
                                   + " \"name\": {\n" + " \"type\": \"text\",\n" 
                                   + " \"analyzer\":\"ik_max_word\",\n" 
                                   + " \"search_analyzer\":\"ik_smart\"\n" 
                                   + " },\n" + " \"description\": {\n" + " \"type\": \"text\",\n" 
                                   + " \"analyzer\":\"ik_max_word\",\n" 
                                   + " \"search_analyzer\":\"ik_smart\"\n" 
                                   + " },\n" + " \"studymodel\": {\n" + " \"type\": \"keyword\"\n" + " },\n" 
                                   + " \"price\": {\n" + " \"type\": \"float\"\n"
                                   + " }\n" 
                                   + " }\n" 
                                   + "}", 
                                   XContentType.JSON);
        //创建索引操作客户端 
        IndicesClient indices = client.indices(); 
        //创建响应对象 
        CreateIndexResponse createIndexResponse = indices.create(createIndexRequest);
        //得到响应结果 
        boolean acknowledged = createIndexResponse.isAcknowledged(); 
        System.out.println(acknowledged); 
    }
    //删除索引库 
    @Test
    public void testDeleteIndex() throws IOException { 
        //删除索引请求对象
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("xc_course"); 
        //删除索引
        DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); 
        //删除索引响应结果
        boolean acknowledged = deleteIndexResponse.isAcknowledged(); 
        System.out.println(acknowledged);
    } 
}

4.3 文档

ES中的文档相当于MySQL数据库表中的记录。

添加文档 restful

添加文档 restful接口方式

json
// 添加文档
// put 或Post http://localhost:9200/xc_course/doc/id值  如果不指定id,ES会自动生成
{ 
    "name":"Bootstrap开发框架", 
    "description":"Bootstrap是由Twitter推出的一个前台页面开发框架,在行业之中使用较为广泛。此开发框架包 含了大量的CSS、JS程序代码,可以帮助开发者(尤其是不擅长页面开发的程序人员)轻松的实现一个不受浏览器限制的 精美界面效果。",
    "studymodel":"201001"
}

添加文档 java

添加文档 java方式

java
//添加文档
@Test
public void testAddDoc() throws IOException {
	//准备json数据
	Map<String, Object> jsonMap = new HashMap<>();
	jsonMap.put("name", "spring cloud实战");
	jsonMap.put("description", "本课程主要从四个章节进行讲解: 1.微服务架构入门 2.spring cloud基础入门 3.实战Spring Boot 4.注册中心eureka。");
	jsonMap.put("studymodel", "201001");
	SimpleDateFormat dateFormat =new SimpleDateFormat("yyyy‐MM‐dd HH:mm:ss");
	jsonMap.put("timestamp", dateFormat.format(new Date()));
	jsonMap.put("price", 5.6f);
	//索引请求对象
	IndexRequest indexRequest = new IndexRequest("xc_course","doc");
	//指定索引文档内容
	indexRequest.source(jsonMap);
	//索引响应对象
	IndexResponse indexResponse = client.index(indexRequest);
    //获取响应结果
	DocWriteResponse.Result result = indexResponse.getResult();
	System.out.println(result);
    
    
    //        1构建请求
        IndexRequest request=new IndexRequest("test_posts");
        request.id("3");
//        =======================构建文档============================
//        构建方法1
        String jsonString="{\n" +
                "  \"user\":\"tomas J\",\n" +
                "  \"postDate\":\"2019-07-18\",\n" +
                "  \"message\":\"trying out es3\"\n" +
                "}";
        request.source(jsonString, XContentType.JSON);

//        构建方法2
//        Map<String,Object> jsonMap=new HashMap<>();
//        jsonMap.put("user", "tomas");
//        jsonMap.put("postDate", "2019-07-18");
//        jsonMap.put("message", "trying out es2");
//        request.source(jsonMap);

//        构建方法3
//        XContentBuilder builder= XContentFactory.jsonBuilder();
//        builder.startObject();
//        {
//            builder.field("user", "tomas");
//            builder.timeField("postDate", new Date());
//            builder.field("message", "trying out es2");
//        }
//        builder.endObject();
//        request.source(builder);
//        构建方法4
//        request.source("user","tomas",
//                    "postDate",new Date(),
//                "message","trying out es2");
//
//        ========================可选参数===================================
        //设置超时时间
        request.timeout(TimeValue.timeValueSeconds(1));
        request.timeout("1s");

        //自己维护版本号
//        request.version(2);
//        request.versionType(VersionType.EXTERNAL);



//        2执行
        //同步
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
        //异步
//        ActionListener<IndexResponse> listener=new ActionListener<IndexResponse>() {
//            @Override
//            public void onResponse(IndexResponse indexResponse) {
//
//            }
//
//            @Override
//            public void onFailure(Exception e) {
//
//            }
//        };
//        client.indexAsync(request,RequestOptions.DEFAULT, listener );
//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }


//        3获取结果
        String index = indexResponse.getIndex();
        String id = indexResponse.getId();
        //获取插入的类型
        if(indexResponse.getResult()== DocWriteResponse.Result.CREATED){
            DocWriteResponse.Result result=indexResponse.getResult();
            System.out.println("CREATED:"+result);
        }else if(indexResponse.getResult()== DocWriteResponse.Result.UPDATED){
            DocWriteResponse.Result result=indexResponse.getResult();
            System.out.println("UPDATED:"+result);
        }

        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if(shardInfo.getTotal()!=shardInfo.getSuccessful()){
            System.out.println("处理成功的分片数少于总分片!");
        }
        if(shardInfo.getFailed()>0){
           for (ReplicationResponse.ShardInfo.Failure failure:shardInfo.getFailures()) {
               String reason = failure.reason();//处理潜在的失败原因
               System.out.println(reason);
           }
        }
}

查询文档 restful

查询文档 --restful方式

json
// 根据课程id查询文档 
get http://localhost:9200/xc_course/doc/4028e58161bcf7f40161bcf8b77c0000 

//查询所有记录
get http://localhost:9200/xc_course/doc/_search

// 查询名称中包括spring 关键字的的记录 
get http://localhost:9200/xc_course/doc/_search?q=name:bootstrap

//查询学习模式为201001的记录 
get http://localhost:9200/xc_course/doc/_search?q=studymodel:201001

查询文档 java

查询文档 java方式

java
//查询文档
@Test
public void getDoc() throws IOException {
	GetRequest getRequest = new GetRequest(
		"xc_course",
		"doc",
		"4028e581617f945f01617f9dabc40000");
	GetResponse getResponse = client.get(getRequest);
	boolean exists = getResponse.isExists();
	Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
	System.out.println(sourceAsMap);
    //构建请求
        GetRequest getRequest = new GetRequest("test_post", "1");

        //========================可选参数 start======================
        //为特定字段配置_source_include
//        String[] includes = new String[]{"user", "message"};
//        String[] excludes = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
//        getRequest.fetchSourceContext(fetchSourceContext);

        //为特定字段配置_source_excludes
//        String[] includes1 = new String[]{"user", "message"};
//        String[] excludes1 = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes1, excludes1);
//        getRequest.fetchSourceContext(fetchSourceContext1);

        //设置路由
//        getRequest.routing("routing");

        // ========================可选参数 end=====================


        //查询 同步查询
      GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

        //异步查询
//        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
//            //查询成功时的立马执行的方法
//            @Override
//            public void onResponse(GetResponse getResponse) {
//                long version = getResponse.getVersion();
//                String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式)
//                System.out.println(sourceAsString);
//            }
//
//            //查询失败时的立马执行的方法
//            @Override
//            public void onFailure(Exception e) {
//                e.printStackTrace();
//            }
//        };
//        //执行异步请求
//        client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }

        // 获取结果
        if (getResponse.isExists()) {
            long version = getResponse.getVersion();

            String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式)
            System.out.println(sourceAsString);
            byte[] sourceAsBytes = getResponse.getSourceAsBytes();//以字节接受
            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
            System.out.println(sourceAsMap);

        }else {

        }
    
}

查询结果

查询结果

json
{ 
    "took": 1, //本次操作花费的时间,单位为毫秒。 
    "timed_out": false,	//请求是否超时 
    "_shards": { //说明本次操作共搜索了哪些分片 
        "total": 1, 
        "successful": 1,
        "skipped": 0, 
        "failed": 0 
    },
    "hits": { // 搜索命中的记录 
        "total": 1, // 符合条件的文档总数 
        "max_score": 0.2876821,//文档匹配得分,这里为最高分 
        "hits": [ { //匹配度较高的前N个文档 
            "_index": "xc_course", 
            "_type": "doc", 
            "_id": "4028e58161bcf7f40161bcf8b77c0000",
            "_score": 0.2876821, // 每个文档都有一个匹配度得分,按照降序排列。 
            "_source": {	//显示了文档的原始内容。 
                "name": "Bootstrap开发框架", 
                "description": "Bootstrap是由Twitter推出的一个前台页面开发框架,在行业之中使用较 为广泛。此开发框架包含了大量的CSS、JS程序代码,可以帮助开发者(尤其是不擅长页面开发的程序人员)轻松的实现 一个不受浏览器限制的精美界面效果。", 
                "studymodel": "201001" 
            } 
        } 
                ] 
    } 
}

更新文档 restful

更新文档 restful方式

ES更新文档的顺序是:先检索到文档、将原来的文档标记为删除、创建新文档、删除旧文档,创建新文档就会重建索引。更新由两种方式

json
// 完全替换
// Post:http://localhost:9200/xc_test/doc/3
{
"name":"spring cloud实战",
"description":"本课程主要从四个章节进行讲解: 1.微服务架构入门 2.spring cloud 基础入门 3.实战Spring
Boot 4.注册中心eureka。",
"studymodel":"201001"
"price":5.6
}

// 局部更新
// post: http://localhost:9200/xc_test/doc/3/_update
{
"doc":{"price":66.6}
}

更新文档 java

更新文档 java客户端方式

java
//更新文档
// 可以指定文档的部分字段也可以指定完整的文档内容。
@Test
public void updateDoc() throws IOException {
	UpdateRequest updateRequest = new UpdateRequest("xc_course", 			"doc",
		"4028e581617f945f01617f9dabc40000");
	Map<String, String> map = new HashMap<>();
	map.put("name", "spring cloud实战");
	updateRequest.doc(map);
	UpdateResponse update = client.update(updateRequest);
	RestStatus status = update.status();
	System.out.println(status);
    
    //        1构建请求
        UpdateRequest request = new UpdateRequest("test_posts", "3");
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("user", "tomas JJ");
        request.doc(jsonMap);
//===============================可选参数==========================================
        request.timeout("1s");//超时时间

        //重试次数
        request.retryOnConflict(3);

        //设置在继续更新之前,必须激活的分片数
//        request.waitForActiveShards(2);
        //所有分片都是active状态,才更新
//        request.waitForActiveShards(ActiveShardCount.ALL);

//        2执行
//        同步
        UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//        异步

//        3获取数据
        updateResponse.getId();
        updateResponse.getIndex();

        //判断结果
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("CREATED:" + result);
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("UPDATED:" + result);
        }else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("DELETED:" + result);
        }else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP){
            //没有操作
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("NOOP:" + result);
        }
}

删除文档 restful

删除文档 restful方式

json
// 根据id删除
// DELETE /{index}/{type}/{id}

// 搜索匹配删除
// POST /{index}/{type}/_delete_by_query
{
	"query":{
		"term":{
			"studymodel":"201001"
		}
	}
}

删除文档 java

删除文档 java方式

java
//根据id删除文档
@Test
public void testDelDoc() throws IOException {
	//删除文档id
	String id = "eqP_amQBKsGOdwJ4fHiC";
	//删除索引请求对象
	DeleteRequest deleteRequest = new DeleteRequest("xc_course","doc",id);
	//响应对象
	DeleteResponse deleteResponse = client.delete(deleteRequest);
	//获取响应结果
	DocWriteResponse.Result result = deleteResponse.getResult();
	System.out.println(result);
    
    //        1构建请求
        DeleteRequest request =new DeleteRequest("test_posts","3");
        //可选参数


//        2执行
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);


//        3获取数据
        deleteResponse.getId();
        deleteResponse.getIndex();

        DocWriteResponse.Result result = deleteResponse.getResult();
        System.out.println(result);
}

批量操作 restful

批量操作 restful方式

在Elasticsearch中,支持批量的插入、修改、删除操作,都是通过_bulk的api完成的。

json
// POST/{index}/{type}/_bulk
//数据格式
{"create":{"_index":"haoke","_type":"user","_id":2001}}
{"id":2001,"name":"name1","age": 20,"sex": "男"}
{"create":{"_index":"haoke","_type":"user","_id":2002}}
{"id":2002,"name":"name2","age": 20,"sex": "男"}
{"create":{"_index":"haoke","_type":"user","_id":2003}}
{"id":2003,"name":"name3","age": 20,"sex": "男"}

批量操作 java

批量操作 java方式

java
@Test
void testBulkRequest() throws IOException {
    // 批量查询酒店数据
    List<Hotel> hotels = hotelService.list();

    // 1.创建Request
    BulkRequest request = new BulkRequest();
    // 2.准备参数,添加多个新增的Request
    for (Hotel hotel : hotels) {
        // 2.1.转换为文档类型HotelDoc
        HotelDoc hotelDoc = new HotelDoc(hotel);
        // 2.2.创建新增文档的Request对象
        request.add(new IndexRequest("hotel")
                    .id(hotelDoc.getId().toString())
                    .source(JSON.toJSONString(hotelDoc), XContentType.JSON));
    }
    // 3.发送请求
    client.bulk(request, RequestOptions.DEFAULT);
    
    
    //        1创建请求
        BulkRequest request = new BulkRequest();
//        request.add(new IndexRequest("post").id("1").source(XContentType.JSON, "field", "1"));
//        request.add(new IndexRequest("post").id("2").source(XContentType.JSON, "field", "2"));

        request.add(new UpdateRequest("post","2").doc(XContentType.JSON, "field", "3"));
        request.add(new DeleteRequest("post").id("1"));

//        2执行
        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

        for (BulkItemResponse itemResponse : bulkResponse) {
            DocWriteResponse itemResponseResponse = itemResponse.getResponse();

            switch (itemResponse.getOpType()) {
                case INDEX:
                case CREATE:
                    IndexResponse indexResponse = (IndexResponse) itemResponseResponse;
                    indexResponse.getId();
                    System.out.println(indexResponse.getResult());
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) itemResponseResponse;
                    updateResponse.getIndex();
                    System.out.println(updateResponse.getResult());
                    break;
                case DELETE:
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponseResponse;
                    System.out.println(deleteResponse.getResult());
                    break;
            }
        }
}

五、文档搜索

5.1 简单搜索

DSL(Domain Specific Language)是ES提出的基于json的搜索方式,在搜索时传入特定的json格式的数据来完成不同的搜索需求。

DSL比URI搜索方式功能强大,在项目中建议使用DSL方式来完成搜索。

restful接口

json
// 根据课程id查询文档 
get http://localhost:9200/xc_course/doc/4028e58161bcf7f40161bcf8b77c0000 

//查询所有记录
get http://localhost:9200/xc_course/doc/_search

// 查询名称中包括spring 关键字的的记录 
get http://localhost:9200/xc_course/doc/_search?q=name:bootstrap

//查询学习模式为201001的记录 
get http://localhost:9200/xc_course/doc/_search?q=studymodel:201001 

// 查询所有索引库的文档。
post http://localhost: 9200 /search

// 查询指定索引库指定类型下的文档
post http://localhost: 9200 /xc_course/doc/_search
{
	"query": {
		"match_all": {}
	},
	"_source" : ["name","studymodel"]	// source源过虑设置,指定结果中所包括的字段有哪些。
}

java代码

java
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestSearch {
	@Autowired
	RestHighLevelClient client;
	@Autowired
	RestClient restClient;
	//搜索type下的全部记录
	@Test
	public void testSearchAll() throws IOException {
		SearchRequest searchRequest = new SearchRequest("xc_course");
		searchRequest.types("doc");
		SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		searchSourceBuilder.query(QueryBuilders.matchAllQuery());
		//source源字段过虑
		searchSourceBuilder.fetchSource(new String[]{"name","studymodel"}, new String[]{});
		searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest);
		SearchHits hits = searchResponse.getHits();
		SearchHit[] searchHits = hits.getHits();
		for (SearchHit hit : searchHits) {
			String index = hit.getIndex();
			String type = hit.getType();
			String id = hit.getId();
			float score = hit.getScore();
			String sourceAsString = hit.getSourceAsString();
			Map<String, Object> sourceAsMap = hit.getSourceAsMap();
			String name = (String) sourceAsMap.get("name");
			String studymodel = (String) sourceAsMap.get("studymodel");
			String description = (String) sourceAsMap.get("description");
			System.out.println(name);
			System.out.println(studymodel);
			System.out.println(description);
		}
	}
....

5.2 分页查询

查询分页深度较大时,汇总数据过多,对内存和CPU会产生非常大的压力,因此elasticsearch会禁止from+ size 超过10000的请求。

针对深度分页,ES提供了两种解决方案,官方文档

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
  • scroll:原理将排序后的文档id形成快照,保存在内存。官方已经不推荐使用。

三种分页方式优缺点

  • from + size

    • 优点:支持随机翻页
    • 缺点:深度分页问题,默认查询上限(from + size)是10000
    • 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
  • after search

    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:只能向后逐页查询,不支持随机翻页
    • 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
  • scroll

    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:会有额外内存消耗,并且搜索结果是非实时的
    • 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。

restful接口

json
// ES支持分页查询,传入两个参数:from和size。
// form:表示起始文档的下标,从0开始。
// size:查询的文档数量。
// post http://localhost:9200/xc_course/doc/_search
{
	"from" : 0, 
    "size" : 1,
	"query": {
		"match_all": {}
	},
	"_source" : ["name","studymodel"]
}

java代码

java
SearchRequest searchRequest = new SearchRequest("xc_course");
searchRequest.types("xc_course");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
//分页查询,设置起始下标,从0开始
searchSourceBuilder.from(0);
//每页显示个数
searchSourceBuilder.size(10);
//source源字段过虑
searchSourceBuilder.fetchSource(new String[]{"name","studymodel"}, new String[]{});
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest);

5.3 Term Query

json
// Term Query为精确查询,在搜索时会整体匹配关键字,不再将关键字分词。
post http://localhost:9200/xc_course/doc/_search
{
	"query": {
		"term" : {
			"name": "spring"
		}
	},
	"_source" : ["name","studymodel"]
}

java代码

java
SearchRequest searchRequest = new SearchRequest("xc_course");
searchRequest.types("xc_course");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("name","spring"));
//source源字段过虑
searchSourceBuilder.fetchSource(new String[]{"name","studymodel"}, new String[]{});
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest);

5.4 根据id精确匹配

restful

json
// ES提供根据多个id值匹配的方法:
// post: http://127.0.0.1:9200/xc_course/doc/_search
{
	"query": {
		"ids" : {
			"type" : "doc",
			"values" : ["3", "4", "100"]
		}
	}
}

java代码

java
String[] split = new String[]{"1","2"};
List<String> idList = Arrays.asList(split);
searchSourceBuilder.query(QueryBuilders.termsQuery("_id", idList));

5.5 match Query

match Query即全文检索,它的搜索方式是先将搜索字符串分词,再使用各各词条从索引中搜索。

match query与Term query区别是match query在搜索前先将搜索关键字分词,再拿各各词语去索引中搜索。

以下例子执行流程:

1、将“spring开发”分词,分为spring、开发两个词

2、再使用spring和开发两个词去匹配索引中搜索。

3、由于设置了operator为or,只要有一个词匹配成功则就返回该文档。

restful

json
// 
post http://localhost:9200/xc_course/doc/_search
{
	"query": {
		"match" : {
			"description" : {
				"query" : "spring开发",	//搜索的关键字,对于英文关键字如果有多个单词则中间要用半角逗号分隔,而对于中文关键字中间可以用逗号分隔也可以不用。
				"operator" : "or"		// or 表示 只要有一个词在文档中出现则就符合条件,and表示每个词都在文档中出现则才符合条件。
			}
		}
	}
}

java

java
//根据关键字搜索
@Test
public void testMatchQuery() throws IOException {
	SearchRequest searchRequest = new SearchRequest("xc_course");
	searchRequest.types("xc_course");
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	//source源字段过虑
	searchSourceBuilder.fetchSource(new String[]{"name","studymodel"}, new String[]{});
	//匹配关键字
	searchSourceBuilder.query(QueryBuilders.matchQuery("description", "spring开
发").operator(Operator.OR));
	searchRequest.source(searchSourceBuilder);
	SearchResponse searchResponse = client.search(searchRequest);
	SearchHits hits = searchResponse.getHits();
	SearchHit[] searchHits = hits.getHits();
	for (SearchHit hit : searchHits) {
		String index = hit.getIndex();
		String type = hit.getType();
		String id = hit.getId();
		float score = hit.getScore();
		String sourceAsString = hit.getSourceAsString();
		Map<String, Object> sourceAsMap = hit.getSourceAsMap();
		String name = (String) sourceAsMap.get("name");
		String studymodel = (String) sourceAsMap.get("studymodel");
		String description = (String) sourceAsMap.get("description");
		System.out.println(name);
		System.out.println(studymodel);
		System.out.println(description);
	}
}

5.6 minimum_should_match 匹配占比

上边使用的operator = or表示只要有一个词匹配上就得分,如果实现三个词至少有两个词匹配如何实现?

使用minimum_should_match可以指定文档匹配词的占比:

匹配占比 restful

json
{
	"query": {
		"match" : {
			"description" : {
				"query" : "spring开发框架",
				"minimum_should_match": "80%"	//三个词在文档的匹配占比为80%,即3*0.8=2.4,向上取整得2,表示至少有两个词在文档中要匹配成功。
			}
		}
	}
}

匹配占比 java

java
//匹配关键字
MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("description", "前台页面开发框架 架
构").minimumShouldMatch("80%");//设置匹配占比
searchSourceBuilder.query(matchQueryBuilder);

5.7 multi Query

上边学习的termQuery和matchQuery一次只能匹配一个Field,本节学习multiQuery,一次可以匹配多个字段。 1、基本使用 单项匹配是在一个field中去匹配,多项匹配是拿关键字去多个Field中匹配。

restfrul

json
// 拿关键字 “spring css”去匹配name 和description字段。
post http://localhost:9200/xc_course/doc/_search
{
	"query": {
		"multi_match" : {
			"query" : "spring css",
			"minimum_should_match": "50%",
			"fields": [ "name^10", "description" ] // 提升boost 表示权重提升10倍,权重越高,排名越靠前
		}
	}
}

java

java
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery("spring框架",
"name", "description").minimumShouldMatch("50%");
multiMatchQueryBuilder.field("name",10);//提升boost

5.8 布尔查询

布尔查询对应于Lucene的BooleanQuery查询,实现将多个查询组合起来。

restful

json
// must:文档必须匹配must所包括的查询条件,相当于 “AND”
//  should:文档应该匹配should所包括的查询条件其中的一个或多个,相当于 "OR"
// must_not:文档不能匹配must_not所包括的该查询条件,相当于“NOT”
POST http://localhost:9200/xc_course/doc/_search
{
	"_source" : [ "name", "studymodel", "description"],
	"from" : 0, "size" : 1,
	"query": {
		"bool" : {
			"must":[
				{
					"multi_match" : {
						"query" : "spring框架",
						"minimum_should_match": "50%",
						"fields": [ "name^10", "description" ]
					}
				},
				{
					"term":{
						"studymodel" : "201001"
					}
				}
				]
			}
		}
}

java

java
//BoolQuery,将搜索关键字分词,拿分词去索引库搜索
@Test
public void testBoolQuery() throws IOException {
	//创建搜索请求对象
	SearchRequest searchRequest= new SearchRequest("xc_course");
	searchRequest.types("doc");
	//创建搜索源配置对象
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	searchSourceBuilder.fetchSource(new String[]{"name","pic","studymodel"},new String[]{});
	//multiQuery
	String keyword = "spring开发框架";
	MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery("spring框架",
"name", "description").minimumShouldMatch("50%");
	multiMatchQueryBuilder.field("name",10);
	//TermQuery
	TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("studymodel", "201001");
    //布尔查询
	BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
	boolQueryBuilder.must(multiMatchQueryBuilder);
	boolQueryBuilder.must(termQueryBuilder);
	//设置布尔查询对象
	searchSourceBuilder.query(boolQueryBuilder);
	searchRequest.source(searchSourceBuilder);//设置搜索源配置
	SearchResponse searchResponse = client.search(searchRequest);
	SearchHits hits = searchResponse.getHits();
	SearchHit[] searchHits = hits.getHits();
	for(SearchHit hit:searchHits){
		Map<String, Object> sourceAsMap = hit.getSourceAsMap();
		System.out.println(sourceAsMap);
    }
}

5.9 过滤器

过虑是针对搜索的结果进行过虑,过虑器主要判断的是文档是否匹配,不去计算和判断文档的匹配度得分,所以过虑器性能比查询要高,且方便缓存,推荐尽量使用过虑器去实现查询或者过虑器和查询共同使用。

restful

json
{
	"_source" : [ "name", "studymodel", "description","price"],
	"query": {
		"bool" : {
			"must":[
				{
					"multi_match" : {
						"query" : "spring框架",
						"minimum_should_match": "50%",
						"fields": [ "name^10", "description" ]
					}
				}
			],
			"filter": [
				{ "term": { "studymodel": "201001" }},	// term:项匹配过虑,保留studymodel等于"201001"的记录。
				{ "range": { "price": { "gte": 60 ,"lte" : 100}}} //range:范围过虑,保留大于等于60 并且小于等于100的记录。
			]
		}
	}
}

注意:range和term一次只能对一个Field设置范围过虑。

java

java
//布尔查询使用过虑器
@Test
public void testFilter() throws IOException {
	SearchRequest searchRequest = new SearchRequest("xc_course");
	searchRequest.types("doc");
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	//source源字段过虑
	searchSourceBuilder.fetchSource(new String[]{"name","studymodel","price","description"},
new String[]{});
	searchRequest.source(searchSourceBuilder);
	//匹配关键字
	MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery("spring框
架", "name", "description");
	//设置匹配占比
	multiMatchQueryBuilder.minimumShouldMatch("50%");
	//提升另个字段的Boost值
	multiMatchQueryBuilder.field("name",10);
	searchSourceBuilder.query(multiMatchQueryBuilder);
	//布尔查询
	BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
	boolQueryBuilder.must(searchSourceBuilder.query());
	//过虑
	boolQueryBuilder.filter(QueryBuilders.termQuery("studymodel", "201001"));
	boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").gte(60).lte(100));
	SearchResponse searchResponse = client.search(searchRequest);
	SearchHits hits = searchResponse.getHits();
	SearchHit[] searchHits = hits.getHits();
	for (SearchHit hit : searchHits) {
		String index = hit.getIndex();
		String type = hit.getType();
		String id = hit.getId();
		float score = hit.getScore();
		String sourceAsString = hit.getSourceAsString();
		Map<String, Object> sourceAsMap = hit.getSourceAsMap();
		String name = (String) sourceAsMap.get("name");
		String studymodel = (String) sourceAsMap.get("studymodel");
		String description = (String) sourceAsMap.get("description");
		System.out.println(name);
		System.out.println(studymodel);
		System.out.println(description);
	}
 }

5.10 排序

可以在字段上添加一个或多个排序,支持在keyword、date、float等类型上添加,text类型的字段上不允许添加排序。

restful

json
POST http://localhost:9200/xc_course/doc/_search
{
	"_source" : [ "name", "studymodel", "description","price"],
	"query": {
		"bool" : {
			"filter": [
				{ "range": { "price": { "gte": 0 ,"lte" : 100}}}
			]
		}
	},
	"sort" : [  // 先按studymodel降序,再按价格升序
		{
			"studymodel" : "desc"
		},
        {
			"price" : "asc"
		}
	]
}

// 地理坐标排序
GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "_geo_distance" : {
          "FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点
          "order" : "asc", // 排序方式
          "unit" : "km" // 排序的距离单位
      }
    }
  ]
}

java

java
@Test
public void testSort() throws IOException {
	SearchRequest searchRequest = new SearchRequest("xc_course");
	searchRequest.types("doc");
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	//source源字段过虑
	searchSourceBuilder.fetchSource(new String[]{"name","studymodel","price","description"},
new String[]{});
	searchRequest.source(searchSourceBuilder);
	//布尔查询
	BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
	//过虑
	boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").gte(0).lte(100));
    //排序
	searchSourceBuilder.sort(new FieldSortBuilder("studymodel").order(SortOrder.DESC));
	searchSourceBuilder.sort(new FieldSortBuilder("price").order(SortOrder.ASC));
	SearchResponse searchResponse = client.search(searchRequest);
	SearchHits hits = searchResponse.getHits();
	SearchHit[] searchHits = hits.getHits();
	for (SearchHit hit : searchHits) {
		String index = hit.getIndex();
		String type = hit.getType();
		String id = hit.getId();
		float score = hit.getScore();
		String sourceAsString = hit.getSourceAsString();
		Map<String, Object> sourceAsMap = hit.getSourceAsMap();
		String name = (String) sourceAsMap.get("name");
		String studymodel = (String) sourceAsMap.get("studymodel");
		String description = (String) sourceAsMap.get("description");
		System.out.println(name);
		System.out.println(studymodel);
		System.out.println(description);
	}
}

5.11 高亮显示

高亮显示可以将搜索结果一个或多个字突出显示,以便向用户展示匹配关键字的位置。

  • 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
  • 如果要对非搜索字段高亮,则需要添加一个属性:required_field_match=false

restful

json
Post: http://127.0.0.1:9200/xc_course/doc/_search
{
	"_source" : [ "name", "studymodel", "description","price"],
	"query": {
		"bool" : {
			"must":[
				{
					"multi_match" : {
						"query" : "开发框架",
						"minimum_should_match": "50%",
						"fields": [ "name^10", "description" ],
						"type":"best_fields"
					}
				}
			],
			"filter": [
				{ "range": { "price": { "gte": 0 ,"lte" : 100}}}
			]
		}
	},
	"sort" : [
		{
			"price" : "asc"
		}
	],
	"highlight": {// 高亮显示
		"pre_tags": ["<tag1>"],
		"post_tags": ["</tag2>"],
		"fields": {
			"name": {},
			"description":{}
		}
	}
}

java

java
@Test
public void testHighlight() throws IOException {
	SearchRequest searchRequest = new SearchRequest("xc_course");
	searchRequest.types("doc");
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	//source源字段过虑
	searchSourceBuilder.fetchSource(new String[]{"name","studymodel","price","description"},
new String[]{});
	searchRequest.source(searchSourceBuilder);
	//匹配关键字
	MultiMatchQueryBuilder multiMatchQueryBuilder = 				QueryBuilders.multiMatchQuery("开发",
"name", "description");
	searchSourceBuilder.query(multiMatchQueryBuilder);
	//布尔查询
	BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
	boolQueryBuilder.must(searchSourceBuilder.query());
	//过虑
	boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").gte(0).lte(100));
	//排序
	searchSourceBuilder.sort(new 	FieldSortBuilder("studymodel").order(SortOrder.DESC));
	searchSourceBuilder.sort(new FieldSortBuilder("price").order(SortOrder.ASC));
	//高亮设置
	HighlightBuilder highlightBuilder = new HighlightBuilder();
	highlightBuilder.preTags("<tag>");//设置前缀
	highlightBuilder.postTags("</tag>");//设置后缀
	// 设置高亮字段
	highlightBuilder.fields().add(new HighlightBuilder.Field("name"));
	// highlightBuilder.fields().add(new HighlightBuilder.Field("description"));
	searchSourceBuilder.highlighter(highlightBuilder);
	SearchResponse searchResponse = client.search(searchRequest);
    SearchHits hits = searchResponse.getHits();
	SearchHit[] searchHits = hits.getHits();
	for (SearchHit hit : searchHits) {
		Map<String, Object> sourceAsMap = hit.getSourceAsMap();
		//名称
		String name = (String) sourceAsMap.get("name");
		//取出高亮字段内容
		Map<String, HighlightField> highlightFields = hit.getHighlightFields();
		if(highlightFields!=null){
			HighlightField nameField = highlightFields.get("name");
				if(nameField!=null){
					Text[] fragments = nameField.getFragments();
					StringBuffer stringBuffer = new StringBuffer();
					for (Text str : fragments) {
						stringBuffer.append(str.string());
					}
					name = stringBuffer.toString();
				}
			}
		String index = hit.getIndex();
		String type = hit.getType();
		String id = hit.getId();
		float score = hit.getScore();
		String sourceAsString = hit.getSourceAsString();
		String studymodel = (String) sourceAsMap.get("studymodel");
		String description = (String) sourceAsMap.get("description");
		System.out.println(name);
		System.out.println(studymodel);
		System.out.println(description);
	}
}

5.12 range查询

range 过滤允许我们按照指定范围查找一批数据:

kson
// gt :: 大于   gte :: 大于等于  lt :: 小于  lte :: 小于等于
Post: http://127.0.0.1:9200/xc_course/doc/_search
{
	"range": {
        "age": {
            "gte": 20,
            "lt": 30
        }
	}
}

5.13 exists 查询

exists 查询可以用于查找文档中是否包含指定字段或没有某个字段,类似于SQL语句中的IS_NULL 条件

json
POST http://172.16.55.185:9200/haoke/user/_search
{
	"query": {
		"exists": { #必须包含
			"field": "card"
		}
	}
}

5.14 地理坐标查询

矩形范围查询

矩形范围查询,也就是geo_bounding_box查询,查询坐标落在某个矩形范围的所有文档:

查询时,需要指定矩形的左上右下两个点的坐标,然后画出一个矩形,落在该矩形内的都是符合条件的点

json
// geo_bounding_box查询
GET /indexName/_search
{
  "query": {
    "geo_bounding_box": {
      "FIELD": {
        "top_left": { // 左上点
          "lat"31.1,
          "lon"121.5
        },
        "bottom_right": { // 右下点
          "lat"30.9,
          "lon"121.7
        }
      }
    }
  }
}

附近查询

附近查询,也叫做距离查询(geo_distance):查询到指定中心点小于某个距离值的所有文档。

在地图上找一个点作为圆心,以指定距离为半径,画一个圆,落在圆内的坐标都算符合条件

json
// geo_distance 查询
GET /indexName/_search
{
  "query": {
    "geo_distance": {
      "distance""15km", // 半径
      "FIELD""31.21,121.5" // 圆心
    }
  }
}

// 带排序
GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "_geo_distance" : {
          "FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点
          "order" : "asc", // 排序方式
          "unit" : "km" // 排序的距离单位
      }
    }
  ]
}

5.15 算分函数查询

当我们利用match查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列。

在elasticsearch中,早期使用的打分算法是TF-IDF算法,公式如下:

在后来的5.1版本升级中,elasticsearch将算法改进为BM25算法,公式如下:

TF-IDF算法有一各缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而BM25则会让单个词条的算分有一个上限,曲线更加平滑:

小结:elasticsearch会根据词条和文档的相关度做打分,算法由两种:

  • TF-IDF算法
  • BM25算法,elasticsearch5.1版本后采用的算法

TF-IDF算法有一各缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而BM25则会让单个词条的算分有一个上限,曲线更加平滑:

function score 查询中包含四部分内容:

  • 原始查询条件:query部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
  • 过滤条件:filter部分,符合该条件的文档才会重新算分
  • 算分函数:符合filter条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
    • weight:函数结果是常量
    • field_value_factor:以文档中的某个字段值作为函数结果
    • random_score:以随机数作为函数结果
    • script_score:自定义算分函数算法
  • 运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
    • multiply:相乘
    • replace:用function score替换query score
    • 其它,例如:sum、avg、max、min

function score的运行流程如下:

  • 1)根据原始条件查询搜索文档,并且计算相关性算分,称为原始算分(query score)
  • 2)根据过滤条件,过滤文档
  • 3)符合过滤条件的文档,基于算分函数运算,得到函数算分(function score)
  • 4)将原始算分(query score)和函数算分(function score)基于运算模式做运算,得到最终结果,作为相关性算分。

因此,其中的关键点是:

  • 过滤条件:决定哪些文档的算分被修改
  • 算分函数:决定函数算分的算法
  • 运算模式:决定最终算分结果
json
GET /hotel/_search
{
  "query": {
    "function_score": {
      "query": {  .... }, // 原始查询,可以是任意条件
      "functions": [ // 算分函数
        {
          "filter": { // 满足的条件,品牌必须是如家
            "term": {
              "brand""如家"
            }
          },
          "weight"2 // 算分权重为2
        }
      ],
      "boost_mode": "sum" // 加权模式,求和
    }
  }
}

5.16 聚合搜索

**聚合(aggregations**可以让我们极其方便的实现对数据的统计、分析、运算实现。这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近实时搜索效果。

聚合常见的有三类:

  • **桶(Bucket)**聚合:用来对文档做分组

    • TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • **度量(Metric)**聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • **管道(pipeline)**聚合:其它聚合的结果为基础做聚合

**注意:**参加聚合的字段必须是keyword、日期、数值、布尔类型

Bucket聚合

我们要统计所有数据中的酒店品牌有几种,其实就是按照品牌对数据分组。此时可以根据酒店品牌的名称做聚合,也就是Bucket聚合。

限定条件的带排序的聚合

json
GET /hotel/_search
{
    "query": {
        "range": {
            "price": {
                "lte"200 // 只对200元以下的文档聚合
            }
        }
  }, 
  "size"0
  "size"0,  // 设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": { // 定义聚合
    "brandAgg": { //给聚合起个名字
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择term
        "field""brand"// 参与聚合的字段
        "order": {
            "_count""asc" // 按照_count升序排列
        },
        "size"20 // 希望获取的聚合结果数量
      }
    }
  }
}

Metric聚合

对酒店按照品牌分组,形成了一个个桶。现在我们需要对桶内的酒店做运算,获取每个品牌的用户评分的min、max、avg等值。

这就要用到Metric聚合了,例如stat聚合:就可以获取min、max、avg等结果。

.是在brandAgg的聚合内部嵌套的子聚合。因为我们需要在每个桶分别计算。

json
GET /hotel/_search
{
  "size"0
  "aggs": {
    "brandAgg": { // 可以排序
      "terms": { 
        "field""brand"
        "size"20
      },
      "aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
        "score_stats": { // 聚合名称
          "stats": { // 聚合类型,这里stats可以计算min、max、avg等
            "field""score" // 聚合字段,这里是score
          }
        }
      }
    }
  }
}

java代码实现聚合

java
@Override
public Map<String, List<String>> filters(RequestParams params) {
    try {
        // 1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2.准备DSL
        // 2.1.query
        buildBasicQuery(params, request);
        // 2.2.设置size
        request.source().size(0);
        // 2.3.聚合
        buildAggregation(request);
        // 3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析结果
        Map<String, List<String>> result = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        // 4.1.根据品牌名称,获取品牌结果
        List<String> brandList = getAggByName(aggregations, "brandAgg");
        result.put("品牌", brandList);
        // 4.2.根据品牌名称,获取品牌结果
        List<String> cityList = getAggByName(aggregations, "cityAgg");
        result.put("城市", cityList);
        // 4.3.根据品牌名称,获取品牌结果
        List<String> starList = getAggByName(aggregations, "starAgg");
        result.put("星级", starList);

        return result;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

private void buildAggregation(SearchRequest request) {
    request.source().aggregation(AggregationBuilders
                                 .terms("brandAgg")
                                 .field("brand")
                                 .size(100)
                                );
    request.source().aggregation(AggregationBuilders
                                 .terms("cityAgg")
                                 .field("city")
                                 .size(100)
                                );
    request.source().aggregation(AggregationBuilders
                                 .terms("starAgg")
                                 .field("starName")
                                 .size(100)
                                );
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
    // 4.1.根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get(aggName);
    // 4.2.获取buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    // 4.3.遍历
    List<String> brandList = new ArrayList<>();
    for (Terms.Bucket bucket : buckets) {
        // 4.4.获取key
        String key = bucket.getKeyAsString();
        brandList.add(key);
    }
    return brandList;
}

六、分词器

6.1 内置分词器

StandardStandard 标准分词,按单词切分,并且会转化成小写
SimpleSimple分词器,按照非单词切分,并且做小写处理
WhitespaceWhitespace是按照空格切分。
StopStop分词器,是去除Stop Word语气助词,如the、an等。
KeywordKeyword分词器,意思是传入就是关键词,不做分词处理。

测试分词器

在添加文档时会进行分词,索引中存放的就是一个一个的词(term),当你去搜索时就是拿关键字去匹配词,最终 找到词关联的文档。

json
// post 发送:localhost:9200/_analyze 
{"text":"测试分词器,后边是测试内容:spring cloud实战"}

6.2 IK分词器

使用IK分词器可以实现对中文分词的效果。

  • 常规安装

    下载IK分词器:(Github地址:https://github.com/medcl/elasticsearch-analysis-ik)

    解压,并将解压的文件拷贝到ES安装目录的plugins下的ik目录下

  • 通过docker安装

    shell
    #如果使用docker运行
    docker cp /tmp/elasticsearch-analysis-ik-6.5.4.zip
    elasticsearch:/usr/share/elasticsearch/plugins/
    #进入容器
    docker exec -it elasticsearch /bin/bash
    mkdir /usr/share/elasticsearch/plugins/ik
    cd /usr/share/elasticsearch/plugins/ik
    unzip elasticsearch-analysis-ik-6.5.4.zip
    
    # 进入容器内部 在线下载方式
    #docker exec -it elasticsearch /bin/bash
    # 在线下载并安装
    #./bin/elasticsearch-plugin  install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip
    
    #重启容器即可
    docker restart elasticsearch
  • 测试

    json
    //post localhost:9200/_analyze 
    
    {"text":"测试分词器,后边是测试内容:spring cloud实战","analyzer":"ik_max_word" } 
    
    {"text":"中华人民共和国人民大会堂","analyzer":"ik_smart" }

两种分词模式

IK中文分词器由两种分词模式

对于ik分词器建议是索引时使用ik_max_word将搜索内容进行细粒度分词,搜索时使用ik_smart提高搜索精确性

ik_max_word会将文本做最细粒度的拆分,比如会将“中华人民共和国人民大会堂”拆分为“中华人民共和国、中华人民、中华、 华人、人民共和国、人民、共和国、大会堂、大会、会堂等词语。
ik_smart会做最粗粒度的拆分,比如会将“中华人民共和国人民大会堂”拆分为中华人民共和国、人民大会堂。

自定义词库

iK分词器自带一个main.dic的文件,此文件为词库文件。

新建一个my.dic文件(注意文件格式为utf-8(不要选择utf-8 BOM)) ,可以自定义词汇。

需要在在IKAnalyzer.cfg.xml配置文件内容添加:

xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
        <comment>IK Analyzer 扩展配置</comment>
        <!--用户可以在这里配置自己的扩展字典 *** 添加扩展词典-->
        <entry key="ext_dict">my.dic</entry>
</properties>

停用词典

新建停用的词典,在配置文件中配置

xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
        <comment>IK Analyzer 扩展配置</comment>
         <!--用户可以在这里配置自己的扩展停止词字典  *** 添加停用词词典-->
        <entry key="ext_stopwords">stopword.dic</entry>
</properties>

修改配置后需要重启。

6.3 拼音分词器(实现拼音自动补全)

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin。安装方式与IK分词器一致。

默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器。

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

文档分词时会依次由这三部分来处理文档:

声明自定义分词器的语法如下:

json
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { // 自定义分词器
        "my_analyzer": {  // 分词器名称
          "tokenizer""ik_max_word",
          "filter""py"
        }
      },
      "filter": { // 自定义tokenizer filter
        "py": { // 过滤器名称
          "type""pinyin"// 过滤器类型,这里是pinyin
		  "keep_full_pinyin"false,
          "keep_joined_full_pinyin"true,
          "keep_original"true,
          "limit_first_letter_length"16,
          "remove_duplicated_term"true,
          "none_chinese_pinyin_tokenize"false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type""text",
        "analyzer""my_analyzer",
        "search_analyzer""ik_smart"
      }
    }
  }
}

自动补全查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型。
  • 字段的内容一般是用来补全的多个词条形成的数组。
json
// 自动补全查询
GET /test/_search
{
  "suggest": {
    "title_suggest": {
      "text""s"// 关键字
      "completion": {
        "field""title"// 补全查询的字段
        "skip_duplicates"true// 跳过重复的
        "size"10 // 获取前10条结果
      }
    }
  }
}
java
@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2.准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
            "suggestions",
            SuggestBuilders.completionSuggestion("suggestion")
            .prefix(prefix)
            .skipDuplicates(true)
            .size(10)
        ));
        // 3.发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析结果
        Suggest suggest = response.getSuggest();
        // 4.1.根据补全查询名称,获取补全结果
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        // 4.2.获取options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3.遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

七、数据同步

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

常见的数据同步方案有三种:

  • 同步调用

    在业务变更数据时,同时修改elaelasticsearch中的数据。优点:实现简单,粗暴;缺点:业务耦合度高

  • 异步通知

    业务变更数据时,发送消息到mq,由一个服务监听mq更新elaelasticsearch中的数据。优点:低耦合,实现难度一般;缺点:依赖mq的可靠性

  • 监听binlog

    一个服务基于canal监听binlog变化,实时更新elasticsearch中的内容。优点:完全解除服务间耦合;缺点:开启binlog增加数据库负担、实现复杂度高

八、安装

安装要求

新版本要求至少jdk1.8以上。

8.1 通过docker安装

shell
#拉取镜像
docker pull elasticsearch:6.5.4
#创建容器
docker create --name elasticsearch 
# 内存大小
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
# 非集群模式
    -e "discovery.type=single-node" \
    # 绑定数据目录
-v es-data:/usr/share/elasticsearch/data \
#挂载逻辑卷,绑定es的日志目录
#-v es-logs:/usr/share/elasticsearch/logs
# 绑定插件目录
    -v es-plugins:/usr/share/elasticsearch/plugins \
    # 授予逻辑卷访问全新啊
    --privileged \
    # 加入网络
    --network es-net \
    # 端口映射
    -p 9200:9200 \
    -p 9300:9300 \
# 监听地址
-e"network.host=172.16.55.185" elasticsearch:6.5.4
#启动
docker start elasticsearch
#查看日志
docker logs elasticsearch

8.2 常规方式安装

下载地址

sh
#创建elsearch用户,Elasticsearch不支持root用户运行 
useradd elsearch 
#解压安装包 
tar -xvf elasticsearch-6.5.4.tar.gz -C /itcast/es/

#修改配置文件 
vim conf/elasticsearch.yml 
network.host: 0.0.0.0 #设置ip地址,任意网络均可访问 

#说明:在Elasticsearch中如果,network.host不是localhost或者127.0.0.1的话,就会认为是生产环境, 会对环境的要求比较高,我们的测试环境不一定能够满足,一般情况下需要修改2处配置,如下: 
#1:修改jvm启动参数 
vim conf/jvm.options 
-Xms128m #根据自己机器情况修改 
-Xmx128m 
#2:一个进程在VMAs(虚拟内存区域)创建内存映射最大数量 
vim /etc/sysctl.conf 
vm.max_map_count=655360

sysctl -p #配置生效 
#启动ES服务 
su - elsearch 
cd bin 
./elasticsearch 或 ./elasticsearch -d #后台启动

8.3 集群管理

ES通常以集群方式工作,这样做不仅能够提高 ES的搜索能力还可以处理大数据搜索的能力,同时也增加了系统的容错能力及高可用,ES可以实现PB级数据的搜索。

数据备份可以保证高可用,但是每个分片备份一份,所需要的节点数量就会翻一倍,成本实在是太高了!

为了在高可用和成本间寻求平衡,我们可以这样做:

  • 首先对数据分片,存储到不同节点
  • 然后对每个分片进行备份,放到对方节点,完成互相备份

下图是ES集群结构的示意图:

image-20201109230950809

相关概念描述
结点ES集群由多个服务器组成,每个服务器即为一个Node结点(该服务只部署了一个ES进程)。
分片当我们的文档量很大时,由于内存和硬盘的限制,同时也为了提高ES的处理能力、容错能力及高可用能力,
我们将索引分成若干分片,每个分片可以放在不同的服务器,这样就实现了多个服务器共同对外提供索引及搜索服务。
一个搜索请求过来,会分别从各各分片去查询,最后将查询到的数据合并返回给用户。
副本为了提高ES的高可用同时也为了提高搜索的吞吐量,我们将分片复制一份或多份存储在其它的服务器,这样即使当前的服务器挂掉了,
拥有副本的服务器照常可以提供服务。
主结点一个集群中会有一个或多个主结点,主结点的作用是集群管理,比如增加节点,移除节点等,主结点挂掉后ES会重新选一个主结点。
结点转发每个结点都知道其它结点的信息,我们可以对任意一个结点发起请求,接收请求的结点会转发给其它结点查询数据。

搭建集群

下边的例子实现创建一个2结点的集群,并且索引的分片我们设置2片,每片一个副本。

节点类型

master节点配置文件中node.master属性为true(默认为true),就有资格被选为master节点。
master节点用于控制整个集群的操作。比如创建或删除索引,管理其它非master节点,分片分配等。
data节点配置文件中node.data属性为true(默认为true),就有资格被设置成data节点。
data节点主要用于执行数据相关的操作。比如文档的CRUD。
客户端节点配置文件中node.master属性和node.data属性均为false。
该节点不能作为master节点,也不能作为data节点。
可以作为客户端节点,用于响应用户的请求,把请求转发到其他节点
部落节点当一个节点配置tribe.*的时候,它是一个特殊的客户端,它可以连接多个集群,在所有连接的集群上执行搜索和其他操作。
  • master节点:对CPU要求高,但是内存要求第
  • data节点:对CPU和内存要求都高
  • coordinating节点:对网络带宽、CPU要求高

节点配置

node.master:是否允许为主结点
node.data:允许存储数据作为数据结点
node.ingest:是否允许成为协调节点,

节点配置组合

master=true,data=true:即是主结点又是数据结点
master=false,data=true:仅是数据结点
master=true,data=false:仅是主结点,不存储数据
master=false,data=false:即不是主结点也不是数据结点,此时可设置ingest为true表示它是一个客户端。

节点1配置

yml
cluster.name: xuecheng
node.name: xc_node_1
network.host: 0.0.0.0
http.port: 9200
transport.tcp.port: 9300
node.master: true
node.data: true
discovery.zen.ping.unicast.hosts: ["0.0.0.0:9300", "0.0.0.0:9301"]
discovery.zen.minimum_master_nodes: 1
node.ingest: true
node.max_local_storage_nodes: 2
path.data: D:\ElasticSearch\elasticsearch‐6.2.1‐1\data
path.logs: D:\ElasticSearch\elasticsearch‐6.2.1‐1\logs
http.cors.enabled: true
http.cors.allow‐origin: /.*/

节点2配置

yml
cluster.name: xuecheng
node.name: xc_node_2
network.host: 0.0.0.0
http.port: 9201
transport.tcp.port: 9301
node.master: true
node.data: true
discovery.zen.ping.unicast.hosts: ["0.0.0.0:9300", "0.0.0.0:9301"]
discovery.zen.minimum_master_nodes: 1
node.ingest: true
node.max_local_storage_nodes: 2
path.data: D:\ElasticSearch\elasticsearch‐6.2.1‐2\data
path.logs: D:\ElasticSearch\elasticsearch‐6.2.1‐2\logs
http.cors.enabled: true
http.cors.allow‐origin: /.*/

集群的健康

通过访问 GET /_cluster/health 来查看Elasticsearch 的集群健康情况。

json
Get http://localhost:9200/cluster/health
{
	"cluster_name": "xuecheng",
    // green:所有的主分片和副本分片都正常运行。 yellow:所有的主分片都正常运行,但有些副本分片运行不正常。red:存在主分片运行不正常。
	"status": "green",
	"timed_out": false,
	"number_of_nodes": 2,
	"number_of_data_nodes": 2,
	"active_primary_shards": 2,
	"active_shards": 4,
	"relocating_shards": 0,
	"initializing_shards": 0,
	"unassigned_shards": 0,
	"delayed_unassigned_shards": 0,
	"number_of_pending_tasks": 0,
	"number_of_in_flight_fetch": 0,
    "task_max_waiting_in_queue_millis": 0,
	"active_shards_percent_as_number": 100
}

使用docker搭建

shell
mkdir /haoke/es-cluster
cd /haoke/es-cluster
mkdir node01
mkdir node02
#复制安装目录下的elasticsearch.yml、jvm.options文件,做如下修改
#node01的配置:
cluster.name: es-itcast-cluster
node.name: node01
node.master: true
node.data: true
network.host: 172.16.55.185
http.port: 9200
discovery.zen.ping.unicast.hosts: ["172.16.55.185"]
discovery.zen.minimum_master_nodes: 1
http.cors.enabled: true
http.cors.allow-origin: "*"
#node02的配置:
cluster.name: es-itcast-cluster
node.name: node02
node.master: false
node.data: true
network.host: 172.16.55.185
http.port: 9201
discovery.zen.ping.unicast.hosts: ["172.16.55.185"]
discovery.zen.minimum_master_nodes: 1
http.cors.enabled: true
http.cors.allow-origin: "*"
#创建容器
docker create --name es-node01 --net host -v /haoke/escluster/
node01/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
-v /haoke/es-cluster/node01/jvm.options:/usr/share/elasticsearch/config/jvm.options
-v /haoke/es-cluster/node01/data:/usr/share/elasticsearch/data elasticsearch:6.5.4
docker create --name es-node02 --net host -v /haoke/escluster/
node02/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
-v /haoke/es-cluster/node02/jvm.options:/usr/share/elasticsearch/config/jvm.options
-v /haoke/es-cluster/node02/data:/usr/share/elasticsearch/data elasticsearch:6.5.4
#启动容器
docker start es-node01 && docker logs -f es-node01
docker start es-node02 && docker logs -f es-node02
#提示:启动时会报文件无权限操作的错误,需要对node01和node02进行chmod 777 的操作

分片和副本

为了将数据添加到Elasticsearch,我们需要索引(index)——一个存储关联数据的地方。实际上,索引只是一个用来指向一个或多个分片(shards)的“逻辑命名空间(logical namespace)”.

  • 一个分片(shard)是一个最小级别“工作单元(worker unit)”,它只是保存了索引中所有数据的一部分。
  • 我们需要知道是分片就是一个Lucene实例,并且它本身就是一个完整的搜索引擎。应用程序不会和它直接通信。
  • 分片可以是主分片(primary shard)或者是复制分片(replica shard)。
  • 索引中的每个文档属于一个单独的主分片,所以主分片的数量决定了索引最多能存储多少数据。
  • 复制分片只是主分片的一个副本,它可以防止硬件故障导致的数据丢失,同时可以提供读请求,比如搜索或者从别的shard取回文档。
  • 当索引创建完成的时候,主分片的数量就固定了,但是复制分片的数量可以随时调整。

故障转移

为了测试故障转移,需要再向集群中添加一个节点,并且将所有节点的node.master设置为true。

docker create --name es-node03 --net host -v /haoke/escluster/
node03/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v
/haoke/es-cluster/node03/jvm.options:/usr/share/elasticsearch/config/jvm.options -v
/haoke/es-cluster/node03/data:/usr/share/elasticsearch/data elasticsearch:6.5.4
docker stop es-node01 es-node02
docker start es-node01 es-node02 es-node03

当主节点关闭后,集群会重新选举一个master节点。而后之前的主节点启动将加不到集群之中。者就是脑裂问题

解决方案

  • 思路:不能让节点很容易的变成master,必须有多个节点认可后才可以。
  • 设置minimum_master_nodes的大小为2
    • 官方推荐:(N/2)+1,N为集群中节点数

分布式文档

1. 路由

当我们想一个集群保存文档时,,在ELasticsearch中,会采用计算的方式来确定存储到哪个节点,计算公式如下:

shard = hash(routing) % number_of_primary_shards

routing值是一个任意字符串,它默认是_id但也可以自定义。
这个routing字符串通过哈希函数生成一个数字,然后除以主切片的数量得到一个余数(remainder),余数的范围永远是0到number_of_primary_shards - 1,这个数字就是特定文档所在的分片。
2. 文档的写操作

新建、索引和删除请求都是写(write)操作,它们必须在主分片上成功完成才能复制到相关的复制分片上。

在主分片和复制分片上成功新建、索引或删除一个文档必要的顺序步骤:

  1. 客户端给Node 1 发送新建、索引或删除请求。
  2. 节点使用文档的_id 确定文档属于分片0 。它转发请求到Node 3 ,分片0 位于这个节点上。
  3. Node 3 在主分片上执行请求,如果成功,它转发请求到相应的位于Node 1 和Node 2 的复制节点上。当所有的复制节点报告成功, Node 3 报告成功到请求的节点,请求的节点再报告给客户端。
3. 搜索文档

文档能够从主分片或任意一个复制分片被检索。

在主分片或复制分片上检索一个文档必要的顺序步骤:

  1. 客户端给Node 1 发送get请求。
  2. 节点使用文档的_id 确定文档属于分片0 。分片0 对应的复制分片在三个节点上都有。此时,它转发请求到Node 2 。
  3. Node 2 返回文档(document)给Node 1 然后返回给客户端。

对于读请求,为了平衡负载,请求节点会为每个请求选择不同的分片——它会循环所有分片副本。

可能的情况是,一个被索引的文档已经存在于主分片上却还没来得及同步到复制分片上。这时复制分片会报告文档未找到,主分片会成功返回文档。一旦索引请求成功返回给用户,文档则在主分片和复制分片都是可用的。

4. 全文搜索

对于全文搜索而言,文档可能分散在各个节点上,搜索文档方式为:

将全文搜索分为2个阶段,搜索(query)+取回(fetch)。

搜索(query)

查询阶段包含以下三步:

  1. 客户端发送一个search(搜索) 请求给Node 3 , Node 3 创建了一个长度为from+size 的空优先级队
  2. Node 3 转发这个搜索请求到索引中每个分片的原本或副本。每个分片在本地执行这个查询并且结果将结果到一个大小为from+size 的有序本地优先队列里去。
  3. 每个分片返回document的ID和它优先队列里的所有document的排序值给协调节点Node 3 。Node 3 把这些值合并到自己的优先队列里产生全局排序结果。

取回(fetch)

分发阶段由以下步骤构成:

  1. 协调节点辨别出哪个document需要取回,并且向相关分片发出GET 请求。
  2. 每个分片加载document并且根据需要丰富(enrich)它们,然后再将document返回协调节点。
  3. 一旦所有的document都被取回,协调节点会将结果返回给客户端。

8.4 docker-compose方式搭建集群

yaml
version: '2.2'
services:
  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data02:/usr/share/elasticsearch/data
    networks:
      - elastic
  es03:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic

volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

sh
vi /etc/sysctl.conf

添加下面的内容:

sh
vm.max_map_count=262144

然后执行命令,让配置生效:

sh
sysctl -p

通过docker-compose启动集群:

sh
docker-compose up -d

8.5 集群监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

绿色的条,代表集群处于绿色(健康状态)。

利用cerebro还可以创建索引库:

填写索引库信息:

点击右下角的create按钮:

回到首页即可查看索引库分片效果:

8.6 head插件安装

head插件是ES的一个可视化管理插件,用来监视ES的状态,并通过head客户端和ES服务进行交互,比如创建映射、创建索引等,head的项目地址在https://github.com/mobz/elasticsearch-head

  1. 安装Node.js

  2. 下载head并运行git clone git://github.com/mobz/elasticsearch-head.git cd elasticsearch-head npm install npm run start open HTTP://本地主机:9100 /

    sh
    # 通过docker安装
    #拉取镜像 
    docker pull mobz/elasticsearch-head:5 
    #创建容器 
    docker create --name elasticsearch-head -p 9100:9100 mobz/elasticsearch-head:5 
    #启动容器 
    docker start elasticsearch-head

    需要开启elasticsearch允许跨域

    yaml
    #confifig/elasticsearch.yml 后面增加以下参数: 
    #开启cors跨域访问支持,默认为false 
    http.cors.enabled: true 
    #跨域访问允许的域名地址,(允许所有域名)以上使用正则 
    http.cors.allow-origin: /.*/