BaseElasticService.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. package com.aijia.kmt.es.service.base;
  2. import com.aijia.core.utils.StringUtil;
  3. import com.aijia.kmt.es.entiy.ElasticEntity;
  4. import com.aijia.kmt.es.utils.ElasticUtil;
  5. import com.aijia.kmt.po.base.IdWorker;
  6. import com.alibaba.fastjson.JSON;
  7. import com.alibaba.fastjson.TypeReference;
  8. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  9. import org.elasticsearch.action.bulk.BulkRequest;
  10. import org.elasticsearch.action.delete.DeleteRequest;
  11. import org.elasticsearch.action.index.IndexRequest;
  12. import org.elasticsearch.action.search.SearchRequest;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.action.support.IndicesOptions;
  15. import org.elasticsearch.action.support.WriteRequest;
  16. import org.elasticsearch.client.RequestOptions;
  17. import org.elasticsearch.client.RestHighLevelClient;
  18. import org.elasticsearch.client.core.CountRequest;
  19. import org.elasticsearch.client.core.CountResponse;
  20. import org.elasticsearch.client.indices.CreateIndexRequest;
  21. import org.elasticsearch.client.indices.CreateIndexResponse;
  22. import org.elasticsearch.client.indices.GetIndexRequest;
  23. import org.elasticsearch.common.settings.Settings;
  24. import org.elasticsearch.common.xcontent.XContentType;
  25. import org.elasticsearch.index.query.IdsQueryBuilder;
  26. import org.elasticsearch.index.query.QueryBuilder;
  27. import org.elasticsearch.index.query.QueryBuilders;
  28. import org.elasticsearch.index.reindex.DeleteByQueryRequest;
  29. import org.elasticsearch.index.reindex.UpdateByQueryRequest;
  30. import org.elasticsearch.script.Script;
  31. import org.elasticsearch.search.SearchHit;
  32. import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
  33. import org.elasticsearch.search.builder.SearchSourceBuilder;
  34. import org.slf4j.Logger;
  35. import org.slf4j.LoggerFactory;
  36. import org.springframework.beans.factory.annotation.Autowired;
  37. import org.springframework.data.domain.*;
  38. import org.springframework.stereotype.Component;
  39. import java.io.IOException;
  40. import java.util.*;
  41. /**
  42. * Elasticsearch 的公共Base类,已实现增删改查,子类中可直接继承使用
  43. * @author Admin
  44. */
  45. @Component
  46. public class BaseElasticService {
  47. protected Logger log = LoggerFactory.getLogger(this.getClass());
  48. @Autowired
  49. RestHighLevelClient restHighLevelClient;
  50. /**
  51. * @See
  52. * @param idxName 索引名称
  53. * @param idxSQL 索引描述
  54. * @return void
  55. * @throws
  56. * @since
  57. */
  58. public void createIndex(String idxName, String idxSQL){
  59. try {
  60. if (!this.indexExist(idxName)) {
  61. log.error(" idxName={} 已经存在,idxSql={}",idxName,idxSQL);
  62. return;
  63. }
  64. CreateIndexRequest request = new CreateIndexRequest(idxName);
  65. buildSetting(request);
  66. request.mapping(idxSQL, XContentType.JSON);
  67. // request.settings() 手工指定Setting
  68. CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
  69. if (!res.isAcknowledged()) {
  70. throw new RuntimeException("初始化失败");
  71. }
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. System.exit(0);
  75. }
  76. }
  77. /** 判断某个index是否存在
  78. * @See
  79. * @param idxName index名
  80. * @return boolean
  81. * @throws
  82. * @since
  83. */
  84. public boolean indexExist(String idxName) throws Exception {
  85. GetIndexRequest request = new GetIndexRequest(idxName);
  86. request.local(false);
  87. request.humanReadable(true);
  88. request.includeDefaults(false);
  89. request.indicesOptions(IndicesOptions.lenientExpandOpen());
  90. return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  91. }
  92. /** 判断某个index是否存在
  93. * @See
  94. * @param idxName index名
  95. * @return boolean
  96. * @throws
  97. * @since
  98. */
  99. public boolean isExistsIndex(String idxName) throws Exception {
  100. return restHighLevelClient.indices().exists(new GetIndexRequest(idxName),RequestOptions.DEFAULT);
  101. }
  102. /** 设置分片
  103. * @See
  104. * @param request
  105. * @return void
  106. * @throws
  107. * @since
  108. */
  109. public void buildSetting(CreateIndexRequest request){
  110. request.settings(Settings.builder().put("index.number_of_shards",3)
  111. .put("index.number_of_replicas",2));
  112. }
  113. /**
  114. * @See
  115. * @param idxName index
  116. * @param entity 对象
  117. * @param openRetry
  118. * @return void
  119. * @throws
  120. * @since
  121. */
  122. public void insertOrUpdateOne(String idxName, ElasticEntity entity, boolean openRetry) {
  123. IndexRequest request = new IndexRequest(idxName);
  124. log.debug("Data : id={},entity={}",entity.getId(),JSON.toJSONString(entity.getData()));
  125. request.id(entity.getId());
  126. // request.source(entity.getData(), XContentType.JSON);
  127. request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
  128. boolean result = false;
  129. int retryCount = 0;
  130. try {
  131. do {
  132. try {
  133. restHighLevelClient.index(request, RequestOptions.DEFAULT);
  134. result = true;
  135. } catch (IOException e) {
  136. if (!openRetry || retryCount >= BaseSearchService.MAX_RETRIES) {
  137. throw e;
  138. }
  139. retryCount ++;
  140. }
  141. } while (openRetry && !result && retryCount <= BaseSearchService.MAX_RETRIES);
  142. } catch (Exception e) {
  143. throw new RuntimeException(e);
  144. }
  145. }
  146. /**
  147. * @See
  148. * @param idxName index
  149. * @param entity 对象
  150. * @param openRetry
  151. * @return void
  152. * @throws
  153. * @since
  154. */
  155. public void insertOrUpdateOneImmediate(String idxName, ElasticEntity entity, boolean openRetry) {
  156. IndexRequest request = new IndexRequest(idxName);
  157. log.debug("Data : id={},entity={}",entity.getId(),JSON.toJSONString(entity.getData()));
  158. request.id(entity.getId());
  159. request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
  160. request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  161. boolean result = false;
  162. int retryCount = 0;
  163. try {
  164. do {
  165. try {
  166. restHighLevelClient.index(request, RequestOptions.DEFAULT);
  167. result = true;
  168. } catch (IOException e) {
  169. if (!openRetry || retryCount >= BaseSearchService.MAX_RETRIES) {
  170. throw e;
  171. }
  172. retryCount ++;
  173. }
  174. } while (openRetry && !result && retryCount <= BaseSearchService.MAX_RETRIES);
  175. } catch (Exception e) {
  176. throw new RuntimeException(e);
  177. }
  178. }
  179. public <T> void insertBatchES(String idxName, List<T> list) throws Exception {
  180. BulkRequest request = new BulkRequest();
  181. for (int i = 0,lengh=list.size(); i < lengh; i++) {
  182. Map<String, Object> map= JSON.parseObject(JSON.toJSONString(list.get(i)), new TypeReference<Map<String, Object>>() {});
  183. String id = IdWorker.generateId();
  184. if(map.containsKey("id")){
  185. id = StringUtil.isNotBlank(map.get("id").toString()) ? map.get("id").toString():id;
  186. }
  187. IndexRequest indexRequest = new IndexRequest(idxName).id(id);
  188. indexRequest.source(map);
  189. request.add(indexRequest);
  190. }
  191. /*
  192. for (int i = 0,lengh=list.size(); i < lengh; i++) {
  193. request.add(new IndexRequest(idxName).id(worker.nextId()).source(JSON.toJSONString(list.get(i)), XContentType.JSON));
  194. }*/
  195. try {
  196. restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  197. } catch (Exception e) {
  198. throw new RuntimeException(e);
  199. }
  200. }
  201. /**
  202. * 更新es
  203. * @param indexName
  204. * @param queryBuilder
  205. * @param script
  206. */
  207. public void updteByQuery (String indexName, QueryBuilder queryBuilder, Script script) {
  208. UpdateByQueryRequest request = new UpdateByQueryRequest(indexName);
  209. request.setQuery(queryBuilder);
  210. request.setScript(script);
  211. request.setBatchSize(10000);
  212. request.setRefresh(true);
  213. try {
  214. restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
  215. } catch (Exception e) {
  216. throw new RuntimeException(e);
  217. }
  218. }
  219. /**
  220. * @See
  221. * @param idxName index
  222. * @param entity 对象
  223. * @return void
  224. * @throws
  225. * @since
  226. */
  227. public void deleteOne(String idxName, ElasticEntity entity) {
  228. DeleteRequest request = new DeleteRequest(idxName);
  229. request.id(entity.getId());
  230. try {
  231. restHighLevelClient.delete(request,RequestOptions.DEFAULT);
  232. } catch (Exception e) {
  233. throw new RuntimeException(e);
  234. }
  235. }
  236. /** 批量插入数据
  237. * @See
  238. * @param idxName index
  239. * @param list 带插入列表
  240. * @return void
  241. * @throws
  242. * @since
  243. */
  244. public void insertBatch(String idxName, List<ElasticEntity> list) {
  245. BulkRequest request = new BulkRequest();
  246. list.forEach(item -> request.add(new IndexRequest(idxName).id(item.getId())
  247. .source(JSON.toJSONString(item.getData()), XContentType.JSON)));
  248. try {
  249. restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  250. } catch (Exception e) {
  251. throw new RuntimeException(e);
  252. }
  253. }
  254. /** 批量插入数据
  255. * @See
  256. * @param idxName index
  257. * @param list 带插入列表
  258. * @return void
  259. * @throws
  260. * @since
  261. */
  262. public void insertBatchTrueObj(String idxName, List<ElasticEntity> list) {
  263. BulkRequest request = new BulkRequest();
  264. list.forEach(item -> request.add(new IndexRequest(idxName).id(item.getId())
  265. .source(item.getData(), XContentType.JSON)));
  266. try {
  267. restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  268. } catch (Exception e) {
  269. throw new RuntimeException(e);
  270. }
  271. }
  272. /** 批量删除
  273. * @See
  274. * @param idxName index
  275. * @param idList 待删除列表
  276. * @return void
  277. * @throws
  278. * @since
  279. */
  280. public <T> void deleteBatch(String idxName, Collection<T> idList) {
  281. BulkRequest request = new BulkRequest();
  282. idList.forEach(item -> request.add(new DeleteRequest(idxName, item.toString())));
  283. try {
  284. restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  285. } catch (Exception e) {
  286. throw new RuntimeException(e);
  287. }
  288. }
  289. /**
  290. * @See
  291. * @param idxName index
  292. * @param builder 查询参数
  293. * @param c 结果类对象
  294. * @return java.util.List<T>
  295. * @throws
  296. * @since
  297. */
  298. public <T> List<T> search(String idxName, SearchSourceBuilder builder, Class<T> c) {
  299. long startTime = System.currentTimeMillis();
  300. SearchRequest request = new SearchRequest(idxName);
  301. request.source(builder);
  302. try {
  303. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  304. long endTime = System.currentTimeMillis();
  305. SearchHit[] hits = response.getHits().getHits();
  306. List<T> res = new ArrayList<>(hits.length);
  307. for (SearchHit hit : hits) {
  308. res.add(JSON.parseObject(hit.getSourceAsString(), c));
  309. }
  310. long endTimeend = System.currentTimeMillis();
  311. return res;
  312. } catch (Exception e) {
  313. throw new RuntimeException(e);
  314. }
  315. }
  316. /**
  317. * @See
  318. * @param idxName index
  319. * @param builder 查询参数
  320. * @param c 结果类对象
  321. * @return map
  322. * @throws
  323. * @since
  324. */
  325. public <T> Page<T> searchPage(String idxName, SearchSourceBuilder builder, Class<T> c, Pageable pageable) {
  326. long startTime = System.currentTimeMillis();
  327. Page<T> page=new PageImpl<T>(new ArrayList<T>());
  328. if(pageable==null) {
  329. pageable= PageRequest.of(1, 500);//默认为第一页,一共500条。
  330. }
  331. try {
  332. long total=count(idxName, builder);
  333. if(total>0) {
  334. SearchRequest request = new SearchRequest(idxName);
  335. builder.from(pageable.getPageNumber());
  336. builder.size(pageable.getPageSize());
  337. request.source(builder);
  338. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  339. long endTime = System.currentTimeMillis();
  340. log.error("es返回之后="+(endTime - startTime));
  341. SearchHit[] hits = response.getHits().getHits();
  342. List<T> data = new ArrayList<>(hits.length);
  343. for (SearchHit hit : hits) {
  344. data.add(JSON.parseObject(hit.getSourceAsString(), c));
  345. }
  346. page=new PageImpl<T>(data,pageable,total);
  347. long endTimeend = System.currentTimeMillis();
  348. log.error("组装时间="+(endTimeend - endTime));
  349. log.error("组装返回返回之后="+(endTimeend - startTime));
  350. }
  351. return page;
  352. } catch (Exception e) {
  353. throw new RuntimeException(e);
  354. }
  355. }
  356. /**
  357. * @See
  358. * @param idxName index
  359. * @param builder 查询参数
  360. * @param c 结果类对象
  361. * @return map
  362. * @throws
  363. * @since
  364. */
  365. public <T> Map<String, Object> searchDataAndCount(String idxName, SearchSourceBuilder builder, Class<T> c) {
  366. SearchRequest request = new SearchRequest(idxName);
  367. request.source(builder);
  368. try {
  369. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  370. SearchHit[] hits = response.getHits().getHits();
  371. List<T> data = new ArrayList<>(hits.length);
  372. Map<String, Object> res = new HashMap<>();
  373. for (SearchHit hit : hits) {
  374. data.add(JSON.parseObject(hit.getSourceAsString(), c));
  375. }
  376. res.put("data", data);
  377. res.put("total", response.getHits().getTotalHits().value);
  378. return res;
  379. } catch (Exception e) {
  380. throw new RuntimeException(e);
  381. }
  382. }
  383. /**
  384. * @See
  385. * @param idxName index
  386. * @param builder 查询参数
  387. * @param c 结果类对象
  388. * @return map
  389. * @throws
  390. * @since
  391. */
  392. public <T> Map<String, Object> searchDataAndCount(String idxName, SearchSourceBuilder builder, Class<T> c, String aggName) {
  393. SearchRequest request = new SearchRequest(idxName);
  394. request.source(builder);
  395. try {
  396. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  397. SearchHit[] hits = response.getHits().getHits();
  398. List<T> data = new ArrayList<>(hits.length);
  399. Map<String, Object> res = new HashMap<>();
  400. for (SearchHit hit : hits) {
  401. data.add(JSON.parseObject(hit.getSourceAsString(), c));
  402. }
  403. res.put("data", data);
  404. res.put("total", ((ParsedCardinality)response.getAggregations().get(aggName)).getValue());
  405. return res;
  406. } catch (Exception e) {
  407. throw new RuntimeException(e);
  408. }
  409. }
  410. /**
  411. * @See
  412. * @param idxName index
  413. * @param builder 查询参数
  414. * @return map
  415. * @throws
  416. * @since
  417. */
  418. public SearchResponse searchResponse(String idxName, SearchSourceBuilder builder) {
  419. SearchRequest request = new SearchRequest(idxName);
  420. request.source(builder);
  421. try {
  422. return restHighLevelClient.search(request, RequestOptions.DEFAULT);
  423. } catch (Exception e) {
  424. throw new RuntimeException(e);
  425. }
  426. }
  427. /**
  428. * @See
  429. * @param idxName index
  430. * @param builder 查询参数
  431. * @return Long
  432. * @throws
  433. * @since
  434. */
  435. public Long count(String idxName, SearchSourceBuilder builder) {
  436. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  437. sourceBuilder.query(builder.query());
  438. CountRequest countRequest = new CountRequest(idxName);
  439. countRequest.source(sourceBuilder);
  440. try {
  441. CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
  442. return countResponse.getCount();
  443. } catch (Exception e) {
  444. throw new RuntimeException(e);
  445. }
  446. }
  447. /** 删除index
  448. * @See
  449. * @param idxName
  450. * @return void
  451. * @throws
  452. * @since
  453. */
  454. public void deleteIndex(String idxName) {
  455. try {
  456. if (!this.indexExist(idxName)) {
  457. log.error(" idxName={} 已经存在",idxName);
  458. return;
  459. }
  460. restHighLevelClient.indices().delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
  461. } catch (Exception e) {
  462. throw new RuntimeException(e);
  463. }
  464. }
  465. /**
  466. * @See
  467. * @param idxName
  468. * @param builder
  469. * @return void
  470. * @throws
  471. * @since
  472. */
  473. public void deleteByQuery(String idxName, QueryBuilder builder) {
  474. DeleteByQueryRequest request = new DeleteByQueryRequest(idxName);
  475. request.setQuery(builder);
  476. //设置批量操作数量,最大为10000
  477. request.setBatchSize(10000);
  478. request.setConflicts("proceed");
  479. try {
  480. restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
  481. } catch (Exception e) {
  482. throw new RuntimeException(e);
  483. }
  484. }
  485. /**
  486. * @See
  487. * @param idxName
  488. * @return void
  489. * @throws
  490. * @since
  491. */
  492. public <T> T getById(String idxName, String id, Class<T> c) {
  493. try {
  494. IdsQueryBuilder idsQueryBuilder=QueryBuilders.idsQuery();
  495. idsQueryBuilder.addIds(id);
  496. QueryBuilder queryBuilder = QueryBuilders.boolQuery()
  497. .must(idsQueryBuilder);
  498. SearchSourceBuilder builder = ElasticUtil.initSearchSourceBuilder(queryBuilder);
  499. SearchRequest request = new SearchRequest(idxName);
  500. request.source(builder);
  501. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  502. SearchHit[] hits = response.getHits().getHits();
  503. if(hits.length>0) {
  504. return JSON.parseObject(hits[0].getSourceAsString(), c);
  505. }
  506. return null;
  507. } catch (Exception e) {
  508. throw new RuntimeException(e);
  509. }
  510. }
  511. protected boolean bulk(BulkRequest bulkRequest) {
  512. try {
  513. restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  514. } catch (Exception e) {
  515. log.error("批量插入或更新练习册题目失败");
  516. return false;
  517. }
  518. return true;
  519. }
  520. }