华为云Elasticsearch(FusionInsight HD)连接和开发教程03-通过HighLevel RestClient操作ES
典型场景说明
通过典型场景,我们可以快速学习和掌握Elasticsearch的开发过程,并且对关键的接口函数有所了解。
场景说明
假定用户开发一个应用程序,用于搜索所有图书信息,并要求提供关键字搜索关键字相关的图书,并打分按分排序,其中搜索的功能就可以用Elasticsearch来实现,搜索流程如下:
- 客户端连接集群
- 查询集群健康状态
- 检查指定索引是否存在
- 创建指定分片数目的索引
- 写入索引数据
- 批量写入数据
- 查询索引信息
- 删除索引
- 删除索引中的文档
- 刷新索引
- 多线程样例
样例代码
High Level Rest Client样例代码
客户端连接集群
功能简介
获取客户端,通过设置IP和端口连接到特定Elasticsearch集群,是使用Elasticsearch提供的API之前的必要工作。
完成Elasticsearch业务操作后,需要调用“RestHighLevelClient.close()”关闭所申请的资源。
-
public static void main(String[] args) {
-
RestHighLevelClient highLevelClient = null;
-
HwRestClient hwRestClient = new HwRestClient();
-
try {
-
highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
-
/.../
-
} finally {
-
try {
-
if (highLevelClient != null) {
-
highLevelClient.close();
-
}
-
} catch (IOException e) {
-
LOG.error("Failed to close RestHighLevelClient.", e);
-
}
-
}
-
}
HwRestClient 默认从代码运行路径的conf目录下读取配置文件:esParam.properties、krb5.conf 和 user.keytab;
也支持自定义指定,如代码运行在Windows环境,配置文件均在D盘根目录,通过 HwRestClient hwRestClient = new HwRestClient("D:\\"); 来指定配置文件的路径。
查询集群状态信息
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/cluster”目录下的QueryClusterInfo.java,作用是查询Elasticsearch集群的相关信息。
-
/**
-
* Get cluster information
-
*/
-
private static void queryClusterInfo(RestHighLevelClient highLevelClient) {
-
-
try {
-
MainResponse response = highLevelClient.info(RequestOptions.DEFAULT);
-
ClusterName clusterName = response.getClusterName();
-
LOG.info("ClusterName:[{}], clusterUuid:[{}], nodeName:[{}], version:[{}].", new Object[] { clusterName.value(),
-
response.getClusterUuid(), response.getNodeName(), response.getVersion().toString() });
-
} catch (Exception e) {
-
LOG.error("QueryClusterInfo is failed,exception occurred.", e);
-
}
-
}
写入索引数据
支持通过指定不同格式的数据源向指定的索引中写入数据,例如Json格式的字符串、Map格式的数据源、XContentBuilder格式的数据源等。
通过Json格式字符串写入数据
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/index”目录下的IndexByJson.java,作用是通过Json格式的数据源向指定索引中写入数据。
变量“jsonString”即为插入的数据内容,用户可以自定义数据内容。
-
/**
-
* Create or update index by json
-
*/
-
private static void indexByJson(RestHighLevelClient highLevelClient, String index, String type, String id) {
-
try {
-
IndexRequest indexRequest = new IndexRequest(index, type, id);
-
String jsonString = "{" "\"user\":\"kimchy1\"," "\"age\":\"100\"," "\"postDate\":\"2020-01-01\","
-
"\"message\":\"trying out Elasticsearch\"," "\"reason\":\"daily update\","
-
"\"innerObject1\":\"Object1\"," "\"innerObject2\":\"Object2\","
-
"\"innerObject3\":\"Object3\"," "\"uid\":\"11\"" "}";
-
indexRequest.source(jsonString, XContentType.JSON);
-
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-
LOG.info("IndexByJson response is {}.", indexResponse.toString());
-
} catch (Exception e) {
-
LOG.error("IndexByJson is failed,exception occurred.", e);
-
}
-
}
通过Map格式写入数据
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/index”目录下的IndexByMap.java,作用是通过Map格式的数据源向指定索引中写入数据。
变量“dataMap ”即为插入的数据内容,用户可以自定义数据内容。
-
/**
-
* Create or update index by map
-
*/
-
private static void indexByMap(RestHighLevelClient highLevelClient, String index, String type, String id) {
-
try {
-
Map<String, Object> dataMap = new HashMap<>();
-
dataMap.put("user", "kimchy2");
-
dataMap.put("age", "200");
-
dataMap.put("postDate", new Date());
-
dataMap.put("message", "trying out Elasticsearch");
-
dataMap.put("reason", "daily update");
-
dataMap.put("innerObject1", "Object1");
-
dataMap.put("innerObject2", "Object2");
-
dataMap.put("innerObject3", "Object3");
-
dataMap.put("uid", "22");
-
IndexRequest indexRequest = new IndexRequest(index, type, id).source(dataMap);
-
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-
LOG.info("IndexByMap response is {}.", indexResponse.toString());
-
} catch (Exception e) {
-
LOG.error("IndexByMap is failed,exception occurred.", e);
-
}
-
}
通过XContentBuilder格式写入数据
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/index”目录下的IndexByXContentBuilder.java,作用是通过XContentBuilder格式的数据源向指定索引中写入数据。
变量“builder ”即为插入的数据内容,用户可以自定义数据内容。
-
/**
-
* Create or update index by XContentBuilder
-
*/
-
private static void indexByXContentBuilder(RestHighLevelClient highLevelClient, String index, String type,
-
String id) {
-
try {
-
XContentBuilder builder = XContentFactory.jsonBuilder();
-
builder.startObject();
-
{
-
builder.field("user", "kimchy3");
-
builder.field("age", "300");
-
builder.field("postDate", "2020-01-01");
-
builder.field("message", "trying out Elasticsearch");
-
builder.field("reason", "daily update");
-
builder.field("innerObject1", "Object1");
-
builder.field("innerObject2", "Object2");
-
builder.field("innerObject3", "Object3");
-
builder.field("uid", "33");
-
-
}
-
builder.endObject();
-
IndexRequest indexRequest = new IndexRequest(index, type, id).source(builder);
-
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
-
-
LOG.info("IndexByXContentBuilder response is {}", indexResponse.toString());
-
} catch (Exception e) {
-
LOG.error("IndexByXContentBuilder is failed,exception occurred.", e);
-
}
-
}
批量操作
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/bulk”目录下的Bulk.java,作用是执行批量操作,例如建立索引,更新索引或者删除索引。
-
/**
-
* Bulk request can be used to execute multiple index,update or delete
-
* operations using a single request.
-
*/
-
private static void bulk(RestHighLevelClient highLevelClient, String index, String type) {
-
-
try {
-
Map<String, Object> jsonMap = new HashMap<>();
-
for (int i = 1; i <= 100; i ) {
-
BulkRequest request = new BulkRequest();
-
for (int j = 1; j <= 1000; j ) {
-
jsonMap.clear();
-
jsonMap.put("user", "Linda");
-
jsonMap.put("age", ThreadLocalRandom.current().nextInt(18, 100));
-
jsonMap.put("postDate", "2020-01-01");
-
jsonMap.put("height", (float) ThreadLocalRandom.current().nextInt(140, 220));
-
jsonMap.put("weight", (float) ThreadLocalRandom.current().nextInt(70, 200));
-
request.add(new IndexRequest(index, type).source(jsonMap));
-
}
-
BulkResponse bulkResponse = highLevelClient.bulk(request, RequestOptions.DEFAULT);
-
-
if (RestStatus.OK.equals((bulkResponse.status()))) {
-
LOG.info("Bulk is successful");
-
} else {
-
LOG.info("Bulk is failed");
-
}
-
}
-
} catch (Exception e) {
-
LOG.error("Bulk is failed,exception occurred.", e);
-
}
-
}
更新索引信息
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/update”目录下的Update.java,作用是更新索引数据。
-
/**
-
* Update index
-
*/
-
private static void update(RestHighLevelClient highLevelClient, String index, String type, String id) {
-
try {
-
XContentBuilder builder = XContentFactory.jsonBuilder();
-
builder.startObject();
-
{ // update information
-
builder.field("postDate", new Date());
-
builder.field("reason", "update again");
-
}
-
builder.endObject();
-
UpdateRequest request = new UpdateRequest(index, type, id).doc(builder);
-
UpdateResponse updateResponse = highLevelClient.update(request, RequestOptions.DEFAULT);
-
-
LOG.info("Update response is {}.", updateResponse.toString());
-
} catch (Exception e) {
-
LOG.error("Update is failed,exception occurred.", e);
-
}
-
}
查询索引信息
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/search”目录下的GetIndex.java,作用是查询指定index、type、id下的文档信息。
-
/**
-
* Get index information
-
*/
-
private static void getIndex(RestHighLevelClient highLevelClient, String index, String type, String id) {
-
try {
-
GetRequest getRequest = new GetRequest(index, type, id);
-
String[] includes = new String[] { "message", "test*" };
-
String[] excludes = Strings.EMPTY_ARRAY;
-
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
-
getRequest.fetchSourceContext(fetchSourceContext);
-
getRequest.storedFields("message");
-
GetResponse getResponse = highLevelClient.get(getRequest, RequestOptions.DEFAULT);
-
-
LOG.info("GetIndex response is {}", getResponse.toString());
-
} catch (Exception e) {
-
LOG.error("GetIndex is failed,exception occurred.", e);
-
}
-
}
搜索文档信息
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/search”目录下的Search.java,作用是搜索指定索引下的相关文档信息。可以自定义指定排序、过滤、高亮、返回数据范围、超时时间等。
-
/**
-
* Search some information in index
-
*/
-
private static void search(RestHighLevelClient highLevelClient,String index) {
-
try {
-
SearchRequest searchRequest = new SearchRequest(index);
-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-
searchSourceBuilder.query(QueryBuilders.termQuery("user", "kimchy1"));
-
-
-
//Specifying Sorting
-
searchSourceBuilder.sort(new FieldSortBuilder("_doc").order(SortOrder.ASC));
-
-
// Source filter
-
String[] includeFields = new String[] { "message", "user", "innerObject*" };
-
String[] excludeFields = new String[] { "postDate" };
-
searchSourceBuilder.fetchSource(includeFields, excludeFields);// Control which fields get included or
-
// excluded
-
// Request Highlighting
-
HighlightBuilder highlightBuilder = new HighlightBuilder();
-
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user"); // Create a field highlighter for
-
// the user field
-
highlightBuilder.field(highlightUser);
-
searchSourceBuilder.highlighter(highlightBuilder);
-
searchSourceBuilder.from(0);
-
searchSourceBuilder.size(2);
-
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
-
searchRequest.source(searchSourceBuilder);
-
SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
-
-
LOG.info("Search response is {}.", searchResponse.toString());
-
} catch (Exception e) {
-
LOG.error("Search is failed,exception occurred.", e);
-
}
-
}
游标搜索文档信息
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/searchscroll”目录下的SearchScroll.java,作用是通过游标进行搜索指定索引下的相关文档信息。方法返回一个scrollId,可通过scrollId关闭该游标。关闭方法可查看关闭游标。
-
/**
-
* Send a search scroll request
-
*/
-
private static String searchScroll(RestHighLevelClient highLevelClient,String index) {
-
String scrollId = null;
-
try {
-
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
-
SearchRequest searchRequest = new SearchRequest(index);
-
searchRequest.scroll(scroll);
-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-
searchSourceBuilder.query(QueryBuilders.matchQuery("title", "Elasticsearch"));
-
searchRequest.source(searchSourceBuilder);
-
-
SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
-
scrollId = searchResponse.getScrollId();
-
SearchHit[] searchHits = searchResponse.getHits().getHits();
-
LOG.info("SearchHits is {}", searchResponse.toString());
-
-
while (searchHits != null && searchHits.length > 0) {
-
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
-
scrollRequest.scroll(scroll);
-
searchResponse = highLevelClient.searchScroll(scrollRequest, RequestOptions.DEFAULT);
-
scrollId = searchResponse.getScrollId();
-
searchHits = searchResponse.getHits().getHits();
-
LOG.info("SearchHits is {}", searchResponse.toString());
-
}
-
} catch (Exception e) {
-
LOG.error("SearchScroll is failed,exception occured.", e);
-
return null;
-
}
-
return scrollId;
-
}
关闭游标
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/searchscroll”目录下的SearchScroll.java中,包含方法clearScroll(highLevelClient,scroollId),作用是关闭由scroollId指定的游标。
-
/**
-
* Clear a search scroll
-
*/
-
private static void clearScroll(RestHighLevelClient highLevelClient,String scrollId) {
-
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-
clearScrollRequest.addScrollId(scrollId);
-
ClearScrollResponse clearScrollResponse;
-
try {
-
clearScrollResponse = highLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
-
if (clearScrollResponse.isSucceeded()) {
-
LOG.info("ClearScroll is successful.");
-
} else {
-
LOG.error("ClearScroll is failed.");
-
}
-
} catch (IOException e) {
-
LOG.error("ClearScroll is failed,exception occured.", e);
-
}
-
}
删除索引
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/delete”目录下的DeleteIndex.java,作用是删除指定索引。
-
/**
-
* Delete the index
-
*/
-
private static void deleteIndex(RestHighLevelClient highLevelClient, String index) {
-
try {
-
DeleteIndexRequest request = new DeleteIndexRequest(index);
-
DeleteIndexResponse delateResponse = highLevelClient.indices().deleteIndex(request, RequestOptions.DEFAULT);
-
-
if (delateResponse.isAcknowledged()) {
-
LOG.info("Delete index is successful");
-
} else {
-
LOG.info("Delete index is failed");
-
}
-
} catch (Exception e) {
-
LOG.error("Delete index : {} is failed, exception occurred.", index, e);
-
}
-
}
多线程请求
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/multithread”目录下的MultithreadRequest.java,作用是
创建线程类sendRequestThread,重写run方法,实现bulk请求,批量写入数据。
-
/**
-
* The thread that send a bulk request
-
*/
-
public static class sendRequestThread implements Runnable {
-
-
private RestHighLevelClient highLevelClientTh;
-
-
public void run() {
-
LOG.info("Thread begin.");
-
-
-
HwRestClient hwRestClient = new HwRestClient();
-
try {
-
highLevelClientTh = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
-
LOG.info("Thread name: " Thread.currentThread().getName());
-
bulk(highLevelClientTh,"huawei1","type1");
-
} catch (Exception e) {
-
LOG.error("SendRequestThread had exception.", e);
-
} finally {
-
if (highLevelClientTh != null) {
-
try {
-
highLevelClientTh.close();
-
LOG.info("Close the highLevelClient successful in thread : " Thread.currentThread().getName());
-
} catch (IOException e) {
-
LOG.error("Close the highLevelClient failed.", e);
-
}
-
}
-
}
-
}
-
}
在main方法中,创建了线程池fixedThreadPool,通过指定线程池大小threadPoolSize和任务数jobNumber,实现多线程发送bulk请求,并发地批量写入数据。
-
public static void main(String[] args) throws Exception {
-
LOG.info("Start to do bulk request !");
-
-
ExecutorService fixedThreadPool;
-
int threadPoolSize = 2;
-
int jobNumber = 5;
-
if (HighLevelUtils.getConfig()) {
-
try {
-
fixedThreadPool = Executors.newFixedThreadPool(threadPoolSize);
-
for (int i = 0; i < jobNumber; i ) {
-
fixedThreadPool.execute(new sendRequestThread());
-
}
-
fixedThreadPool.shutdown();
-
} catch (Exception e) {
-
LOG.error("SendRequestThread is failed,exception occured.", e);
-
}
-
} else {
-
LOG.error("Failed to get configuration!");
-
}
-
}
BulkProcessor批量入库样例
功能简介
位于“elasticsearch-rest-client-example/src/com/huawei/fusioninsight/elasticsearch/example/highlevel/bulk”目录下的BulkProcessorSample.java,其作用是指导用户使用BulkProcessor来完成批量入库。
BulkProcessor初始化:
-
private static BulkProcessor getBulkProcessor(RestHighLevelClient highLevelClient) {
-
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
-
-
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
-
int numberOfActions = bulkRequest.numberOfActions();
-
LOG.info("Executing bulk {} with {} requests.", executionId, numberOfActions);
-
}
-
-
-
public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
-
if (bulkResponse.hasFailures()) {
-
LOG.warn("Bulk {} executed with failures.", executionId);
-
} else {
-
LOG.info("Bulk {} completed in {} milliseconds.", executionId, bulkResponse.getTook().getMillis());
-
}
-
}
-
-
-
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
-
LOG.error("Failed to execute bulk.", throwable);
-
}
-
};
-
-
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
-
(request, bulkListener) -> highLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
-
-
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
-
.setBulkActions(onceBulkMaxNum)
-
.setBulkSize(new ByteSizeValue(onecBulkMaxSize, ByteSizeUnit.MB))
-
.setConcurrentRequests(concurrentRequestsNum)
-
.setFlushInterval(TimeValue.timeValueSeconds(flushTime))
-
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), maxRetry))
-
.build();
-
-
LOG.info("Init bulkProcess successfully.");
-
-
return bulkProcessor;
-
}
BulkProcessor入库样例:
-
private void singleThreadBulk() {
-
//单线程
-
int bulkTime = 0;
-
while (bulkTime < totalNumberForThread) {
-
Map<String, Object> dataMap = new HashMap<>();
-
dataMap.put("date", "2019/12/9");
-
dataMap.put("text", "the test text");
-
dataMap.put("title", "the title");
-
bulkProcessor.add(new IndexRequest(indexName, indexType).source(dataMap));
-
}
-
LOG.info("This thead bulks successfully, the thread name is {}.", Thread.currentThread().getName());
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhibfigi
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22