| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570 |
- package com.aijia.kmt.es.service.base;
- import com.aijia.core.utils.StringUtil;
- import com.aijia.kmt.es.entiy.ElasticEntity;
- import com.aijia.kmt.es.utils.ElasticUtil;
- import com.aijia.kmt.po.base.IdWorker;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.TypeReference;
- import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.action.support.IndicesOptions;
- import org.elasticsearch.action.support.WriteRequest;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.core.CountRequest;
- import org.elasticsearch.client.core.CountResponse;
- import org.elasticsearch.client.indices.CreateIndexRequest;
- import org.elasticsearch.client.indices.CreateIndexResponse;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.query.IdsQueryBuilder;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.index.reindex.DeleteByQueryRequest;
- import org.elasticsearch.index.reindex.UpdateByQueryRequest;
- import org.elasticsearch.script.Script;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.domain.*;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- import java.util.*;
- /**
- * Elasticsearch 的公共Base类,已实现增删改查,子类中可直接继承使用
- * @author Admin
- */
- @Component
- public class BaseElasticService {
-
- protected Logger log = LoggerFactory.getLogger(this.getClass());
- @Autowired
- RestHighLevelClient restHighLevelClient;
- /**
- * @See
- * @param idxName 索引名称
- * @param idxSQL 索引描述
- * @return void
- * @throws
- * @since
- */
- public void createIndex(String idxName, String idxSQL){
- try {
- if (!this.indexExist(idxName)) {
- log.error(" idxName={} 已经存在,idxSql={}",idxName,idxSQL);
- return;
- }
- CreateIndexRequest request = new CreateIndexRequest(idxName);
- buildSetting(request);
- request.mapping(idxSQL, XContentType.JSON);
- // request.settings() 手工指定Setting
- CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
- if (!res.isAcknowledged()) {
- throw new RuntimeException("初始化失败");
- }
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(0);
- }
- }
- /** 判断某个index是否存在
- * @See
- * @param idxName index名
- * @return boolean
- * @throws
- * @since
- */
- public boolean indexExist(String idxName) throws Exception {
- GetIndexRequest request = new GetIndexRequest(idxName);
- request.local(false);
- request.humanReadable(true);
- request.includeDefaults(false);
- request.indicesOptions(IndicesOptions.lenientExpandOpen());
- return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- }
- /** 判断某个index是否存在
- * @See
- * @param idxName index名
- * @return boolean
- * @throws
- * @since
- */
- public boolean isExistsIndex(String idxName) throws Exception {
- return restHighLevelClient.indices().exists(new GetIndexRequest(idxName),RequestOptions.DEFAULT);
- }
- /** 设置分片
- * @See
- * @param request
- * @return void
- * @throws
- * @since
- */
- public void buildSetting(CreateIndexRequest request){
- request.settings(Settings.builder().put("index.number_of_shards",3)
- .put("index.number_of_replicas",2));
- }
- /**
- * @See
- * @param idxName index
- * @param entity 对象
- * @param openRetry
- * @return void
- * @throws
- * @since
- */
- public void insertOrUpdateOne(String idxName, ElasticEntity entity, boolean openRetry) {
- IndexRequest request = new IndexRequest(idxName);
- log.debug("Data : id={},entity={}",entity.getId(),JSON.toJSONString(entity.getData()));
- request.id(entity.getId());
- // request.source(entity.getData(), XContentType.JSON);
- request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
- boolean result = false;
- int retryCount = 0;
- try {
- do {
- try {
- restHighLevelClient.index(request, RequestOptions.DEFAULT);
- result = true;
- } catch (IOException e) {
- if (!openRetry || retryCount >= BaseSearchService.MAX_RETRIES) {
- throw e;
- }
- retryCount ++;
- }
- } while (openRetry && !result && retryCount <= BaseSearchService.MAX_RETRIES);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName index
- * @param entity 对象
- * @param openRetry
- * @return void
- * @throws
- * @since
- */
- public void insertOrUpdateOneImmediate(String idxName, ElasticEntity entity, boolean openRetry) {
- IndexRequest request = new IndexRequest(idxName);
- log.debug("Data : id={},entity={}",entity.getId(),JSON.toJSONString(entity.getData()));
- request.id(entity.getId());
- request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
- request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- boolean result = false;
- int retryCount = 0;
- try {
- do {
- try {
- restHighLevelClient.index(request, RequestOptions.DEFAULT);
- result = true;
- } catch (IOException e) {
- if (!openRetry || retryCount >= BaseSearchService.MAX_RETRIES) {
- throw e;
- }
- retryCount ++;
- }
- } while (openRetry && !result && retryCount <= BaseSearchService.MAX_RETRIES);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- public <T> void insertBatchES(String idxName, List<T> list) throws Exception {
- BulkRequest request = new BulkRequest();
- for (int i = 0,lengh=list.size(); i < lengh; i++) {
- Map<String, Object> map= JSON.parseObject(JSON.toJSONString(list.get(i)), new TypeReference<Map<String, Object>>() {});
- String id = IdWorker.generateId();
- if(map.containsKey("id")){
- id = StringUtil.isNotBlank(map.get("id").toString()) ? map.get("id").toString():id;
- }
- IndexRequest indexRequest = new IndexRequest(idxName).id(id);
- indexRequest.source(map);
- request.add(indexRequest);
- }
- /*
- for (int i = 0,lengh=list.size(); i < lengh; i++) {
- request.add(new IndexRequest(idxName).id(worker.nextId()).source(JSON.toJSONString(list.get(i)), XContentType.JSON));
- }*/
- try {
- restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * 更新es
- * @param indexName
- * @param queryBuilder
- * @param script
- */
- public void updteByQuery (String indexName, QueryBuilder queryBuilder, Script script) {
- UpdateByQueryRequest request = new UpdateByQueryRequest(indexName);
- request.setQuery(queryBuilder);
- request.setScript(script);
- request.setBatchSize(10000);
- request.setRefresh(true);
- try {
- restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName index
- * @param entity 对象
- * @return void
- * @throws
- * @since
- */
- public void deleteOne(String idxName, ElasticEntity entity) {
- DeleteRequest request = new DeleteRequest(idxName);
- request.id(entity.getId());
- try {
- restHighLevelClient.delete(request,RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /** 批量插入数据
- * @See
- * @param idxName index
- * @param list 带插入列表
- * @return void
- * @throws
- * @since
- */
- public void insertBatch(String idxName, List<ElasticEntity> list) {
- BulkRequest request = new BulkRequest();
- list.forEach(item -> request.add(new IndexRequest(idxName).id(item.getId())
- .source(JSON.toJSONString(item.getData()), XContentType.JSON)));
- try {
- restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /** 批量插入数据
- * @See
- * @param idxName index
- * @param list 带插入列表
- * @return void
- * @throws
- * @since
- */
- public void insertBatchTrueObj(String idxName, List<ElasticEntity> list) {
- BulkRequest request = new BulkRequest();
- list.forEach(item -> request.add(new IndexRequest(idxName).id(item.getId())
- .source(item.getData(), XContentType.JSON)));
- try {
- restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /** 批量删除
- * @See
- * @param idxName index
- * @param idList 待删除列表
- * @return void
- * @throws
- * @since
- */
- public <T> void deleteBatch(String idxName, Collection<T> idList) {
- BulkRequest request = new BulkRequest();
- idList.forEach(item -> request.add(new DeleteRequest(idxName, item.toString())));
- try {
- restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName index
- * @param builder 查询参数
- * @param c 结果类对象
- * @return java.util.List<T>
- * @throws
- * @since
- */
- public <T> List<T> search(String idxName, SearchSourceBuilder builder, Class<T> c) {
- long startTime = System.currentTimeMillis();
- SearchRequest request = new SearchRequest(idxName);
- request.source(builder);
- try {
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- long endTime = System.currentTimeMillis();
- SearchHit[] hits = response.getHits().getHits();
- List<T> res = new ArrayList<>(hits.length);
- for (SearchHit hit : hits) {
- res.add(JSON.parseObject(hit.getSourceAsString(), c));
- }
- long endTimeend = System.currentTimeMillis();
- return res;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-
- /**
- * @See
- * @param idxName index
- * @param builder 查询参数
- * @param c 结果类对象
- * @return map
- * @throws
- * @since
- */
- public <T> Page<T> searchPage(String idxName, SearchSourceBuilder builder, Class<T> c, Pageable pageable) {
- long startTime = System.currentTimeMillis();
- Page<T> page=new PageImpl<T>(new ArrayList<T>());
- if(pageable==null) {
- pageable= PageRequest.of(1, 500);//默认为第一页,一共500条。
- }
- try {
- long total=count(idxName, builder);
- if(total>0) {
- SearchRequest request = new SearchRequest(idxName);
- builder.from(pageable.getPageNumber());
- builder.size(pageable.getPageSize());
- request.source(builder);
-
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- long endTime = System.currentTimeMillis();
- log.error("es返回之后="+(endTime - startTime));
- SearchHit[] hits = response.getHits().getHits();
- List<T> data = new ArrayList<>(hits.length);
- for (SearchHit hit : hits) {
- data.add(JSON.parseObject(hit.getSourceAsString(), c));
- }
- page=new PageImpl<T>(data,pageable,total);
- long endTimeend = System.currentTimeMillis();
- log.error("组装时间="+(endTimeend - endTime));
- log.error("组装返回返回之后="+(endTimeend - startTime));
- }
- return page;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * @See
- * @param idxName index
- * @param builder 查询参数
- * @param c 结果类对象
- * @return map
- * @throws
- * @since
- */
- public <T> Map<String, Object> searchDataAndCount(String idxName, SearchSourceBuilder builder, Class<T> c) {
- SearchRequest request = new SearchRequest(idxName);
- request.source(builder);
- try {
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- SearchHit[] hits = response.getHits().getHits();
- List<T> data = new ArrayList<>(hits.length);
- Map<String, Object> res = new HashMap<>();
- for (SearchHit hit : hits) {
- data.add(JSON.parseObject(hit.getSourceAsString(), c));
- }
- res.put("data", data);
- res.put("total", response.getHits().getTotalHits().value);
- return res;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName index
- * @param builder 查询参数
- * @param c 结果类对象
- * @return map
- * @throws
- * @since
- */
- public <T> Map<String, Object> searchDataAndCount(String idxName, SearchSourceBuilder builder, Class<T> c, String aggName) {
- SearchRequest request = new SearchRequest(idxName);
- request.source(builder);
- try {
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- SearchHit[] hits = response.getHits().getHits();
- List<T> data = new ArrayList<>(hits.length);
- Map<String, Object> res = new HashMap<>();
- for (SearchHit hit : hits) {
- data.add(JSON.parseObject(hit.getSourceAsString(), c));
- }
- res.put("data", data);
- res.put("total", ((ParsedCardinality)response.getAggregations().get(aggName)).getValue());
- return res;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName index
- * @param builder 查询参数
- * @return map
- * @throws
- * @since
- */
- public SearchResponse searchResponse(String idxName, SearchSourceBuilder builder) {
- SearchRequest request = new SearchRequest(idxName);
- request.source(builder);
- try {
- return restHighLevelClient.search(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName index
- * @param builder 查询参数
- * @return Long
- * @throws
- * @since
- */
- public Long count(String idxName, SearchSourceBuilder builder) {
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(builder.query());
-
- CountRequest countRequest = new CountRequest(idxName);
- countRequest.source(sourceBuilder);
- try {
- CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
- return countResponse.getCount();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /** 删除index
- * @See
- * @param idxName
- * @return void
- * @throws
- * @since
- */
- public void deleteIndex(String idxName) {
- try {
- if (!this.indexExist(idxName)) {
- log.error(" idxName={} 已经存在",idxName);
- return;
- }
- restHighLevelClient.indices().delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName
- * @param builder
- * @return void
- * @throws
- * @since
- */
- public void deleteByQuery(String idxName, QueryBuilder builder) {
- DeleteByQueryRequest request = new DeleteByQueryRequest(idxName);
- request.setQuery(builder);
- //设置批量操作数量,最大为10000
- request.setBatchSize(10000);
- request.setConflicts("proceed");
- try {
- restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @See
- * @param idxName
- * @return void
- * @throws
- * @since
- */
- public <T> T getById(String idxName, String id, Class<T> c) {
- try {
- IdsQueryBuilder idsQueryBuilder=QueryBuilders.idsQuery();
- idsQueryBuilder.addIds(id);
- QueryBuilder queryBuilder = QueryBuilders.boolQuery()
- .must(idsQueryBuilder);
- SearchSourceBuilder builder = ElasticUtil.initSearchSourceBuilder(queryBuilder);
- SearchRequest request = new SearchRequest(idxName);
- request.source(builder);
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- SearchHit[] hits = response.getHits().getHits();
- if(hits.length>0) {
- return JSON.parseObject(hits[0].getSourceAsString(), c);
- }
- return null;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- protected boolean bulk(BulkRequest bulkRequest) {
- try {
- restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- } catch (Exception e) {
- log.error("批量插入或更新练习册题目失败");
- return false;
- }
- return true;
- }
- }
|