• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

华为云Elasticsearch(FusionInsight HD)连接和开发教程03-通过HighLevel RestClient操作ES

武飞扬头像
sunjian286
帮助2

典型场景说明

通过典型场景,我们可以快速学习和掌握Elasticsearch的开发过程,并且对关键的接口函数有所了解。

场景说明

假定用户开发一个应用程序,用于搜索所有图书信息,并要求提供关键字搜索关键字相关的图书,并打分按分排序,其中搜索的功能就可以用Elasticsearch来实现,搜索流程如下:

  1. 客户端连接集群
  2. 查询集群健康状态
  3. 检查指定索引是否存在
  4. 创建指定分片数目的索引
  5. 写入索引数据
  6. 批量写入数据
  7. 查询索引信息
  8. 删除索引
  9. 删除索引中的文档
  10. 刷新索引
  11. 多线程样例

样例代码

High Level Rest Client样例代码

客户端连接集群

功能简介

获取客户端,通过设置IP和端口连接到特定Elasticsearch集群,是使用Elasticsearch提供的API之前的必要工作。

完成Elasticsearch业务操作后,需要调用“RestHighLevelClient.close()”关闭所申请的资源。

  1.  
    public static void main(String[] args) {
  2.  
    RestHighLevelClient highLevelClient = null;
  3.  
    HwRestClient hwRestClient = new HwRestClient();
  4.  
    try {
  5.  
    highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
  6.  
    /.../
  7.  
    } finally {
  8.  
    try {
  9.  
    if (highLevelClient != null) {
  10.  
    highLevelClient.close();
  11.  
    }
  12.  
    } catch (IOException e) {
  13.  
    LOG.error("Failed to close RestHighLevelClient.", e);
  14.  
    }
  15.  
    }
  16.  
    }
学新通

HwRestClient 默认从代码运行路径的conf目录下读取配置文件:esParam.properties、krb5.conf 和 user.keytab;

也支持自定义指定,如代码运行在Windows环境,配置文件均在D盘根目录,通过 HwRestClient hwRestClient = new HwRestClient("D:\\"); 来指定配置文件的路径。

查询集群状态信息

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/cluster”目录下的QueryClusterInfo.java,作用是查询Elasticsearch集群的相关信息。

  1.  
    /**
  2.  
    * Get cluster information
  3.  
    */
  4.  
    private static void queryClusterInfo(RestHighLevelClient highLevelClient) {
  5.  
     
  6.  
    try {
  7.  
    MainResponse response = highLevelClient.info(RequestOptions.DEFAULT);
  8.  
    ClusterName clusterName = response.getClusterName();
  9.  
    LOG.info("ClusterName:[{}], clusterUuid:[{}], nodeName:[{}], version:[{}].", new Object[] { clusterName.value(),
  10.  
    response.getClusterUuid(), response.getNodeName(), response.getVersion().toString() });
  11.  
    } catch (Exception e) {
  12.  
    LOG.error("QueryClusterInfo is failed,exception occurred.", e);
  13.  
    }
  14.  
    }

写入索引数据

支持通过指定不同格式的数据源向指定的索引中写入数据,例如Json格式的字符串、Map格式的数据源、XContentBuilder格式的数据源等。

通过Json格式字符串写入数据

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/index”目录下的IndexByJson.java,作用是通过Json格式的数据源向指定索引中写入数据。

变量jsonString即为插入的数据内容,用户可以自定义数据内容。

  1.  
    /**
  2.  
    * Create or update index by json
  3.  
    */
  4.  
    private static void indexByJson(RestHighLevelClient highLevelClient, String index, String type, String id) {
  5.  
    try {
  6.  
    IndexRequest indexRequest = new IndexRequest(index, type, id);
  7.  
    String jsonString = "{" "\"user\":\"kimchy1\"," "\"age\":\"100\"," "\"postDate\":\"2020-01-01\","
  8.  
    "\"message\":\"trying out Elasticsearch\"," "\"reason\":\"daily update\","
  9.  
    "\"innerObject1\":\"Object1\"," "\"innerObject2\":\"Object2\","
  10.  
    "\"innerObject3\":\"Object3\"," "\"uid\":\"11\"" "}";
  11.  
    indexRequest.source(jsonString, XContentType.JSON);
  12.  
    IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  13.  
     
  14.  
    LOG.info("IndexByJson response is {}.", indexResponse.toString());
  15.  
    } catch (Exception e) {
  16.  
    LOG.error("IndexByJson is failed,exception occurred.", e);
  17.  
    }
  18.  
    }
学新通

通过Map格式写入数据

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/index”目录下的IndexByMap.java,作用是通过Map格式的数据源向指定索引中写入数据。

变量dataMap 即为插入的数据内容,用户可以自定义数据内容。

  1.  
    /**
  2.  
    * Create or update index by map
  3.  
    */
  4.  
    private static void indexByMap(RestHighLevelClient highLevelClient, String index, String type, String id) {
  5.  
    try {
  6.  
    Map<String, Object> dataMap = new HashMap<>();
  7.  
    dataMap.put("user", "kimchy2");
  8.  
    dataMap.put("age", "200");
  9.  
    dataMap.put("postDate", new Date());
  10.  
    dataMap.put("message", "trying out Elasticsearch");
  11.  
    dataMap.put("reason", "daily update");
  12.  
    dataMap.put("innerObject1", "Object1");
  13.  
    dataMap.put("innerObject2", "Object2");
  14.  
    dataMap.put("innerObject3", "Object3");
  15.  
    dataMap.put("uid", "22");
  16.  
    IndexRequest indexRequest = new IndexRequest(index, type, id).source(dataMap);
  17.  
    IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  18.  
     
  19.  
    LOG.info("IndexByMap response is {}.", indexResponse.toString());
  20.  
    } catch (Exception e) {
  21.  
    LOG.error("IndexByMap is failed,exception occurred.", e);
  22.  
    }
  23.  
    }
学新通

通过XContentBuilder格式写入数据

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/index”目录下的IndexByXContentBuilder.java,作用是通过XContentBuilder格式的数据源向指定索引中写入数据。

变量builder 即为插入的数据内容,用户可以自定义数据内容。

  1.  
    /**
  2.  
    * Create or update index by XContentBuilder
  3.  
    */
  4.  
    private static void indexByXContentBuilder(RestHighLevelClient highLevelClient, String index, String type,
  5.  
    String id) {
  6.  
    try {
  7.  
    XContentBuilder builder = XContentFactory.jsonBuilder();
  8.  
    builder.startObject();
  9.  
    {
  10.  
    builder.field("user", "kimchy3");
  11.  
    builder.field("age", "300");
  12.  
    builder.field("postDate", "2020-01-01");
  13.  
    builder.field("message", "trying out Elasticsearch");
  14.  
    builder.field("reason", "daily update");
  15.  
    builder.field("innerObject1", "Object1");
  16.  
    builder.field("innerObject2", "Object2");
  17.  
    builder.field("innerObject3", "Object3");
  18.  
    builder.field("uid", "33");
  19.  
     
  20.  
    }
  21.  
    builder.endObject();
  22.  
    IndexRequest indexRequest = new IndexRequest(index, type, id).source(builder);
  23.  
    IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  24.  
     
  25.  
    LOG.info("IndexByXContentBuilder response is {}", indexResponse.toString());
  26.  
    } catch (Exception e) {
  27.  
    LOG.error("IndexByXContentBuilder is failed,exception occurred.", e);
  28.  
    }
  29.  
    }
学新通

批量操作

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/bulk”目录下的Bulk.java,作用是执行批量操作,例如建立索引,更新索引或者删除索引。

  1.  
    /**
  2.  
    * Bulk request can be used to execute multiple index,update or delete
  3.  
    * operations using a single request.
  4.  
    */
  5.  
    private static void bulk(RestHighLevelClient highLevelClient, String index, String type) {
  6.  
     
  7.  
    try {
  8.  
    Map<String, Object> jsonMap = new HashMap<>();
  9.  
    for (int i = 1; i <= 100; i ) {
  10.  
    BulkRequest request = new BulkRequest();
  11.  
    for (int j = 1; j <= 1000; j ) {
  12.  
    jsonMap.clear();
  13.  
    jsonMap.put("user", "Linda");
  14.  
    jsonMap.put("age", ThreadLocalRandom.current().nextInt(18, 100));
  15.  
    jsonMap.put("postDate", "2020-01-01");
  16.  
    jsonMap.put("height", (float) ThreadLocalRandom.current().nextInt(140, 220));
  17.  
    jsonMap.put("weight", (float) ThreadLocalRandom.current().nextInt(70, 200));
  18.  
    request.add(new IndexRequest(index, type).source(jsonMap));
  19.  
    }
  20.  
    BulkResponse bulkResponse = highLevelClient.bulk(request, RequestOptions.DEFAULT);
  21.  
     
  22.  
    if (RestStatus.OK.equals((bulkResponse.status()))) {
  23.  
    LOG.info("Bulk is successful");
  24.  
    } else {
  25.  
    LOG.info("Bulk is failed");
  26.  
    }
  27.  
    }
  28.  
    } catch (Exception e) {
  29.  
    LOG.error("Bulk is failed,exception occurred.", e);
  30.  
    }
  31.  
    }
学新通

更新索引信息

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/update”目录下的Update.java,作用是更新索引数据。

  1.  
    /**
  2.  
    * Update index
  3.  
    */
  4.  
    private static void update(RestHighLevelClient highLevelClient, String index, String type, String id) {
  5.  
    try {
  6.  
    XContentBuilder builder = XContentFactory.jsonBuilder();
  7.  
    builder.startObject();
  8.  
    { // update information
  9.  
    builder.field("postDate", new Date());
  10.  
    builder.field("reason", "update again");
  11.  
    }
  12.  
    builder.endObject();
  13.  
    UpdateRequest request = new UpdateRequest(index, type, id).doc(builder);
  14.  
    UpdateResponse updateResponse = highLevelClient.update(request, RequestOptions.DEFAULT);
  15.  
     
  16.  
    LOG.info("Update response is {}.", updateResponse.toString());
  17.  
    } catch (Exception e) {
  18.  
    LOG.error("Update is failed,exception occurred.", e);
  19.  
    }
  20.  
    }
学新通

查询索引信息

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/search”目录下的GetIndex.java,作用是查询指定index、type、id下的文档信息。

  1.  
    /**
  2.  
    * Get index information
  3.  
    */
  4.  
    private static void getIndex(RestHighLevelClient highLevelClient, String index, String type, String id) {
  5.  
    try {
  6.  
    GetRequest getRequest = new GetRequest(index, type, id);
  7.  
    String[] includes = new String[] { "message", "test*" };
  8.  
    String[] excludes = Strings.EMPTY_ARRAY;
  9.  
    FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  10.  
    getRequest.fetchSourceContext(fetchSourceContext);
  11.  
    getRequest.storedFields("message");
  12.  
    GetResponse getResponse = highLevelClient.get(getRequest, RequestOptions.DEFAULT);
  13.  
     
  14.  
    LOG.info("GetIndex response is {}", getResponse.toString());
  15.  
    } catch (Exception e) {
  16.  
    LOG.error("GetIndex is failed,exception occurred.", e);
  17.  
    }
  18.  
    }
学新通

搜索文档信息

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/search”目录下的Search.java,作用是搜索指定索引下的相关文档信息。可以自定义指定排序、过滤、高亮、返回数据范围、超时时间等。

  1.  
    /**
  2.  
    * Search some information in index
  3.  
    */
  4.  
    private static void search(RestHighLevelClient highLevelClient,String index) {
  5.  
    try {
  6.  
    SearchRequest searchRequest = new SearchRequest(index);
  7.  
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  8.  
    searchSourceBuilder.query(QueryBuilders.termQuery("user", "kimchy1"));
  9.  
     
  10.  
     
  11.  
    //Specifying Sorting
  12.  
    searchSourceBuilder.sort(new FieldSortBuilder("_doc").order(SortOrder.ASC));
  13.  
     
  14.  
    // Source filter
  15.  
    String[] includeFields = new String[] { "message", "user", "innerObject*" };
  16.  
    String[] excludeFields = new String[] { "postDate" };
  17.  
    searchSourceBuilder.fetchSource(includeFields, excludeFields);// Control which fields get included or
  18.  
    // excluded
  19.  
    // Request Highlighting
  20.  
    HighlightBuilder highlightBuilder = new HighlightBuilder();
  21.  
    HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user"); // Create a field highlighter for
  22.  
    // the user field
  23.  
    highlightBuilder.field(highlightUser);
  24.  
    searchSourceBuilder.highlighter(highlightBuilder);
  25.  
    searchSourceBuilder.from(0);
  26.  
    searchSourceBuilder.size(2);
  27.  
    searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
  28.  
    searchRequest.source(searchSourceBuilder);
  29.  
    SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  30.  
     
  31.  
    LOG.info("Search response is {}.", searchResponse.toString());
  32.  
    } catch (Exception e) {
  33.  
    LOG.error("Search is failed,exception occurred.", e);
  34.  
    }
  35.  
    }
学新通

游标搜索文档信息

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/searchscroll”目录下的SearchScroll.java,作用是通过游标进行搜索指定索引下的相关文档信息。方法返回一个scrollId,可通过scrollId关闭该游标。关闭方法可查看关闭游标

  1.  
    /**
  2.  
    * Send a search scroll request
  3.  
    */
  4.  
    private static String searchScroll(RestHighLevelClient highLevelClient,String index) {
  5.  
    String scrollId = null;
  6.  
    try {
  7.  
    final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
  8.  
    SearchRequest searchRequest = new SearchRequest(index);
  9.  
    searchRequest.scroll(scroll);
  10.  
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  11.  
    searchSourceBuilder.query(QueryBuilders.matchQuery("title", "Elasticsearch"));
  12.  
    searchRequest.source(searchSourceBuilder);
  13.  
     
  14.  
    SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  15.  
    scrollId = searchResponse.getScrollId();
  16.  
    SearchHit[] searchHits = searchResponse.getHits().getHits();
  17.  
    LOG.info("SearchHits is {}", searchResponse.toString());
  18.  
     
  19.  
    while (searchHits != null && searchHits.length > 0) {
  20.  
    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
  21.  
    scrollRequest.scroll(scroll);
  22.  
    searchResponse = highLevelClient.searchScroll(scrollRequest, RequestOptions.DEFAULT);
  23.  
    scrollId = searchResponse.getScrollId();
  24.  
    searchHits = searchResponse.getHits().getHits();
  25.  
    LOG.info("SearchHits is {}", searchResponse.toString());
  26.  
    }
  27.  
    } catch (Exception e) {
  28.  
    LOG.error("SearchScroll is failed,exception occured.", e);
  29.  
    return null;
  30.  
    }
  31.  
    return scrollId;
  32.  
    }
学新通

关闭游标

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/searchscroll”目录下的SearchScroll.java中,包含方法clearScroll(highLevelClient,scroollId),作用是关闭由scroollId指定的游标。

  1.  
    /**
  2.  
    * Clear a search scroll
  3.  
    */
  4.  
    private static void clearScroll(RestHighLevelClient highLevelClient,String scrollId) {
  5.  
    ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
  6.  
    clearScrollRequest.addScrollId(scrollId);
  7.  
    ClearScrollResponse clearScrollResponse;
  8.  
    try {
  9.  
    clearScrollResponse = highLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
  10.  
    if (clearScrollResponse.isSucceeded()) {
  11.  
    LOG.info("ClearScroll is successful.");
  12.  
    } else {
  13.  
    LOG.error("ClearScroll is failed.");
  14.  
    }
  15.  
    } catch (IOException e) {
  16.  
    LOG.error("ClearScroll is failed,exception occured.", e);
  17.  
    }
  18.  
    }
学新通

删除索引

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/delete”目录下的DeleteIndex.java,作用是删除指定索引。

  1.  
    /**
  2.  
    * Delete the index
  3.  
    */
  4.  
    private static void deleteIndex(RestHighLevelClient highLevelClient, String index) {
  5.  
    try {
  6.  
    DeleteIndexRequest request = new DeleteIndexRequest(index);
  7.  
    DeleteIndexResponse delateResponse = highLevelClient.indices().deleteIndex(request, RequestOptions.DEFAULT);
  8.  
     
  9.  
    if (delateResponse.isAcknowledged()) {
  10.  
    LOG.info("Delete index is successful");
  11.  
    } else {
  12.  
    LOG.info("Delete index is failed");
  13.  
    }
  14.  
    } catch (Exception e) {
  15.  
    LOG.error("Delete index : {} is failed, exception occurred.", index, e);
  16.  
    }
  17.  
    }
学新通

多线程请求

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/multithread”目录下的MultithreadRequest.java,作用是

创建线程类sendRequestThread,重写run方法,实现bulk请求,批量写入数据。

  1.  
    /**
  2.  
    * The thread that send a bulk request
  3.  
    */
  4.  
    public static class sendRequestThread implements Runnable {
  5.  
     
  6.  
    private RestHighLevelClient highLevelClientTh;
  7.  
    @Override
  8.  
    public void run() {
  9.  
    LOG.info("Thread begin.");
  10.  
     
  11.  
     
  12.  
    HwRestClient hwRestClient = new HwRestClient();
  13.  
    try {
  14.  
    highLevelClientTh = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
  15.  
    LOG.info("Thread name: " Thread.currentThread().getName());
  16.  
    bulk(highLevelClientTh,"huawei1","type1");
  17.  
    } catch (Exception e) {
  18.  
    LOG.error("SendRequestThread had exception.", e);
  19.  
    } finally {
  20.  
    if (highLevelClientTh != null) {
  21.  
    try {
  22.  
    highLevelClientTh.close();
  23.  
    LOG.info("Close the highLevelClient successful in thread : " Thread.currentThread().getName());
  24.  
    } catch (IOException e) {
  25.  
    LOG.error("Close the highLevelClient failed.", e);
  26.  
    }
  27.  
    }
  28.  
    }
  29.  
    }
  30.  
    }
学新通

在main方法中,创建了线程池fixedThreadPool,通过指定线程池大小threadPoolSize和任务数jobNumber,实现多线程发送bulk请求,并发地批量写入数据。

  1.  
    public static void main(String[] args) throws Exception {
  2.  
    LOG.info("Start to do bulk request !");
  3.  
     
  4.  
    ExecutorService fixedThreadPool;
  5.  
    int threadPoolSize = 2;
  6.  
    int jobNumber = 5;
  7.  
    if (HighLevelUtils.getConfig()) {
  8.  
    try {
  9.  
    fixedThreadPool = Executors.newFixedThreadPool(threadPoolSize);
  10.  
    for (int i = 0; i < jobNumber; i ) {
  11.  
    fixedThreadPool.execute(new sendRequestThread());
  12.  
    }
  13.  
    fixedThreadPool.shutdown();
  14.  
    } catch (Exception e) {
  15.  
    LOG.error("SendRequestThread is failed,exception occured.", e);
  16.  
    }
  17.  
    } else {
  18.  
    LOG.error("Failed to get configuration!");
  19.  
    }
  20.  
    }
学新通

BulkProcessor批量入库样例

功能简介

位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/bulk”目录下的BulkProcessorSample.java,其作用是指导用户使用BulkProcessor来完成批量入库。

BulkProcessor初始化:

  1.  
    private static BulkProcessor getBulkProcessor(RestHighLevelClient highLevelClient) {
  2.  
    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
  3.  
    @Override
  4.  
    public void beforeBulk(long executionId, BulkRequest bulkRequest) {
  5.  
    int numberOfActions = bulkRequest.numberOfActions();
  6.  
    LOG.info("Executing bulk {} with {} requests.", executionId, numberOfActions);
  7.  
    }
  8.  
     
  9.  
    @Override
  10.  
    public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
  11.  
    if (bulkResponse.hasFailures()) {
  12.  
    LOG.warn("Bulk {} executed with failures.", executionId);
  13.  
    } else {
  14.  
    LOG.info("Bulk {} completed in {} milliseconds.", executionId, bulkResponse.getTook().getMillis());
  15.  
    }
  16.  
    }
  17.  
     
  18.  
    @Override
  19.  
    public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
  20.  
    LOG.error("Failed to execute bulk.", throwable);
  21.  
    }
  22.  
    };
  23.  
     
  24.  
    BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
  25.  
    (request, bulkListener) -> highLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
  26.  
     
  27.  
    BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
  28.  
    .setBulkActions(onceBulkMaxNum)
  29.  
    .setBulkSize(new ByteSizeValue(onecBulkMaxSize, ByteSizeUnit.MB))
  30.  
    .setConcurrentRequests(concurrentRequestsNum)
  31.  
    .setFlushInterval(TimeValue.timeValueSeconds(flushTime))
  32.  
    .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), maxRetry))
  33.  
    .build();
  34.  
     
  35.  
    LOG.info("Init bulkProcess successfully.");
  36.  
     
  37.  
    return bulkProcessor;
  38.  
    }
学新通

BulkProcessor入库样例:

  1.  
    private void singleThreadBulk() {
  2.  
    //单线程
  3.  
    int bulkTime = 0;
  4.  
    while (bulkTime < totalNumberForThread) {
  5.  
    Map<String, Object> dataMap = new HashMap<>();
  6.  
    dataMap.put("date", "2019/12/9");
  7.  
    dataMap.put("text", "the test text");
  8.  
    dataMap.put("title", "the title");
  9.  
    bulkProcessor.add(new IndexRequest(indexName, indexType).source(dataMap));
  10.  
    }
  11.  
    LOG.info("This thead bulks successfully, the thread name is {}.", Thread.currentThread().getName());
  12.  
    }

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhibfigi
系列文章
更多 icon
同类精品
更多 icon
继续加载