• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

ElasticSearch ( 六 ) 和SpringBoot整合

武飞扬头像
春哥的魔法书
帮助1

6.与springboot整合

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/

6.1.pom.xml 引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

如果当前springboot所默认依赖的版本与es版本不相同

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.15.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

6.2.配置文件

指明服务器的IP

spring.elasticsearch.rest.uris=192.168.46.123:9200

6.3.es操作对象

    @Autowired
    private RestHighLevelClient client;

6.4.操作

6.4.1.实体类

package com.yuan.estest.entity;

import lombok.Data;

@Data
public class StoryEntity {

     private String storyTitle;
     private String storyAuthor;
     private String storyContentEn;
     private String storyContentCn;
     private int storyConut;
}
package com.yuan.estest.entity;

import lombok.Data;

@Data
public class AccountEntity {

    private Integer account_number;
    private Integer balance;
    private String firstname;
    private String lastname;
    private Integer age;
    private String gender;
    private String address;
    private String employer;
    private String email;
    private String city;
    private String state;

}
学新通

6.4.2.Controller



import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.yuan.estest.entity.AccountEntity;
import com.yuan.estest.entity.StoryEntity;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@RestController
@Slf4j
public class Test {

    @Autowired
    private RestHighLevelClient client;

    private String INDEX_NAME = "testdata";

    /***************DATA**********************/

    public List<StoryEntity> getDatasForList(){
        List<StoryEntity> list = new ArrayList<>();
        StoryEntity storyEntity1 = new StoryEntity();
        storyEntity1.setStoryTitle("笑看人生");
        storyEntity1.setStoryAuthor("开心");
        storyEntity1.setStoryContentEn("happy lift");
        storyEntity1.setStoryContentCn("一生无苦");
        storyEntity1.setStoryConut(10);
        list.add(storyEntity1);

        StoryEntity storyEntity2 = new StoryEntity();
        storyEntity2.setStoryTitle("华英雄");
        storyEntity2.setStoryAuthor("黄玉郞");
        storyEntity2.setStoryContentEn("hua hero");
        storyEntity2.setStoryContentCn("一个叫华英雄的英雄");
        storyEntity2.setStoryConut(1079867);
        list.add(storyEntity2);

        StoryEntity storyEntity3 = new StoryEntity();
        storyEntity3.setStoryTitle("英雄一生都欢乐");
        storyEntity3.setStoryAuthor("笑英雄");
        storyEntity3.setStoryContentEn("hero is happy");
        storyEntity3.setStoryContentCn("英雄一生都开心");
        storyEntity3.setStoryConut(1267340);
        list.add(storyEntity3);


        return list;
    }



    /**
     * 新增一条
     */
    @RequestMapping("/indexData")
    public void indexData() throws IOException {



        //信息
        StoryEntity storyEntity = new StoryEntity();
        storyEntity.setStoryTitle("英雄之歌");
        storyEntity.setStoryAuthor("流浪英雄");
        storyEntity.setStoryContentEn("hero's song");
        storyEntity.setStoryContentCn("一首英雄的赞歌");
        storyEntity.setStoryConut(1267340);
        // 将对象转为json
        String data = JSONObject.toJSONString(storyEntity);

        IndexRequest indexRequest = new IndexRequest(INDEX_NAME);
        indexRequest.id("7");
        // 保存, 指明类型
        indexRequest.source(data, XContentType.JSON);
        IndexRequest timeout = indexRequest.timeout("50s");
        System.out.println("timeout = "   timeout);
        // 执行
        IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
        // 获取响应数据
        log.info("创建状态:{}", response.status());
    }



    /**
     * 获取文档信息
     */
    public void getDocument() throws IOException {
        // 创建获取请求对象
        GetRequest getRequest = new GetRequest(INDEX_NAME, "7");
        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
        System.out.println(response.getSourceAsString());
    }



    /**
     * 批量新增
     */
    @RequestMapping("/syncData")
    public String syncData() {

        //批量数据
        List<StoryEntity> list = getDatasForList();

        // 1.创建Request
        BulkRequest request = new BulkRequest();

        //下面尽量控制一下一次bulk的数量,如果数据过大,条数过多可能出现同步不完全的情况
        for (StoryEntity storyEntity : list) {

            String jsonString = JSONObject.toJSONString(storyEntity);
            IndexRequest indexRequest = new IndexRequest(INDEX_NAME);
            indexRequest.source(jsonString, XContentType.JSON);
            // 保存, 指明类型
            request.add(indexRequest);
        }
        try {
            BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
            System.out.println("response = "   response);

            if (response.hasFailures()) {
                exceptionRetry(request, response);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "over";
    }

    /**
     * 不重要
     * 异常捕获用于重试
     */
    private void exceptionRetry(BulkRequest request, BulkResponse response) {
        List<DocWriteRequest<?>> list = request.requests();
        BulkRequest requestRetry = new BulkRequest();
        //下面尽量控制一下一次bulk的数量,如果数据过大,条数过多可能出现同步不完全的情况
        for (BulkItemResponse bir : response) {
            if (bir.isFailed()) {
                int docIndex = bir.getItemId();
                IndexRequest ir = (IndexRequest) list.get(docIndex);
                requestRetry.add(new IndexRequest(INDEX_NAME).source(ir.sourceAsMap(), XContentType.JSON));
            }
        }
        try {
            //遇到错误,休眠1s后重试
            Thread.sleep(1000);
            BulkResponse responseRetry = client.bulk(requestRetry, RequestOptions.DEFAULT);
            //重试仍然失败时记录该数据
            exceptionLog(requestRetry, responseRetry);
        } catch (Exception e) {
            log.error("ES同步重试出错!", e);
        }
    }

    /**
     * 不重要
     * 重试结果判断
     */
    private void exceptionLog(BulkRequest request, BulkResponse response) {
        List<DocWriteRequest<?>> list = request.requests();
        for (BulkItemResponse bir : response) {
            if (bir.isFailed()) {
                int docIndex = bir.getItemId();
                IndexRequest ir = (IndexRequest) list.get(docIndex);
                //记录失败原因及失败数据
                log.error("同步重试失败reason=[{}],data=[{}]", bir.getFailureMessage(), ir.sourceAsMap().toString());
            }
        }
    }





    /**************************************修改*******************************************/
    /**
     * 根据查询将数据更新,唯一主键
     *         "_id" : "7",
     *         "_score" : 1.0,
     *         "_source" : {
     *           "storyTitle" : "英雄故事",
     *           "storyAuthor" : "不是真龙",
     *           "storyContentEn" : "Hero story",
     *           "storyContentCn" : "英雄故事",
     *           "storyConut" : 1234
     *         }
     */
    @RequestMapping("/updateById")
    public String updateById() {

        StoryEntity storyEntity = new StoryEntity();
        storyEntity.setStoryTitle("英雄故事");
        storyEntity.setStoryAuthor("不是真龙也不是英雄");
        storyEntity.setStoryContentEn("Hero story");
        storyEntity.setStoryContentCn("英雄故事");
        storyEntity.setStoryConut(1234);

        if (log.isDebugEnabled()) {
            log.info("es开始更新数据:{}", JSON.toJSONString(storyEntity));
        }

        // 创建索引请求对象
        UpdateRequest request ;
        try {
             String data = JSONObject.toJSONString(storyEntity);
            request = new UpdateRequest(INDEX_NAME, "7").doc(data, XContentType.JSON);
            UpdateResponse response = client.update(request,  RequestOptions.DEFAULT);
            log.info("更新状态:{}", response.getResult());
        } catch (IOException e) {
            log.error("更新写入异常:{}", e.getMessage(), e);
        }
        if (log.isDebugEnabled()) {
            log.info("es更新数据完成");
        }
        return "over";
    }

    /**
     * 删除文档信息
     */
    public void deleteDocument() throws IOException {

        // 创建删除请求对象
        DeleteRequest deleteRequest = new DeleteRequest(INDEX_NAME, "7");
        // 执行删除文档
        DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
        log.info("删除状态:{}", response.status());
    }


//------------------------------------------------------------------------

    /**
     * 获取文档信息
     */
    @RequestMapping("/query")
    public void query() throws IOException {
        // 1.准备Request
        SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
        // 2.准备DSL
        searchRequest.source().query(QueryBuilders.matchAllQuery());
        // 3.发送请求
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println("searchResponse = "   searchResponse);

    }



    @RequestMapping("/queryBySearch")
    public void queryBySearch() throws IOException {

        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices("bank");

        // 设置 查询条件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));

        TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(10);
        sourceBuilder.aggregation(ageAgg);

        AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
        sourceBuilder.aggregation(balanceAvg);

        System.out.println("1:==>sourceBuilder.toString() = "   sourceBuilder);
        searchRequest.source(sourceBuilder);

        // 分析结果
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println("2:==>searchResponse = "   searchResponse);


        // 查询结果
        SearchHits searchHits = searchResponse.getHits();
        System.out.println("hits.getTotalHits() = "   searchHits.getTotalHits());

        SearchHit[] hitss = searchHits.getHits();
        for (SearchHit hit : hitss) {
            String sourceAsString = hit.getSourceAsString();
            System.out.println("sourceAsString = "   sourceAsString);
            AccountEntity accountEntity = JSONObject.parseObject(sourceAsString, new TypeReference<AccountEntity>() {});
            System.out.println("accountEntity = "   accountEntity);
        }

        // 聚合信息
        Aggregations aggregations = searchResponse.getAggregations();

        Terms ageAggData = aggregations.get("ageAgg");
        for (Terms.Bucket bucket : ageAggData.getBuckets()) {
            System.out.println("bucket.getKeyAsString() = "   bucket.getKeyAsString());
        }
        Avg balanceAvgData = aggregations.get("balanceAvg");
        System.out.println("balanceAvgData.getValue() = "   balanceAvgData.getValue());
    }


    @RequestMapping("/testES")
    public void testES() throws IOException {
        List<Node> nodes = client.getLowLevelClient().getNodes();
        nodes.forEach(node -> {
            System.out.println(node.getHost());
        });
        System.out.println(client);

    }
}


学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiecbkc
系列文章
更多 icon
同类精品
更多 icon
继续加载