Skip to content

Instantly share code, notes, and snippets.

@Chairo
Last active October 3, 2019 12:12
Show Gist options
  • Save Chairo/1b0d06d45e5155b134b7b43abb3d84f6 to your computer and use it in GitHub Desktop.
Save Chairo/1b0d06d45e5155b134b7b43abb3d84f6 to your computer and use it in GitHub Desktop.
Elastic Spring Boot Starter
package com.au92.common.elastic;
/**
* @author: p_x_c
*/
public class ElasticConstant {
/**
* 配置文件中多cluster分割符号
*/
public static final String CLUSTER_SPLIT = ",";
/**
* 配置文件中es url分割符号
*/
public static final String SCHEME_SPLIT = ":";
/**
* 创建Index的请求体
*/
public static final String CREATE_INDEX = "{\n" +
" \"properties\": {\n" +
" \"id\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"userId\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\",\n" +
" \"search_analyzer\": \"ik_smart\"\n" +
" },\n" +
" \"publishTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"url\": {\n" +
" \"type\": \"text\",\n" +
" \"index\": true,\n" +
" \"analyzer\": \"ik_max_word\",\n" +
" \"search_analyzer\": \"ik_smart\"\n" +
" }\n" +
" }\n" +
"}";
/**
* ES shards数量
*/
public static final Integer SHARDS = 3;
/**
* replicas 数量
*/
public static final Integer REPLICAS = 2;
/**
* 批量操作一次最大数量
*/
public static final Integer MAX = 9999;
/**
* 超时设置
*/
public static final int CONNECT_TIME_OUT = 1000;
public static final int SOCKET_TIME_OUT = 30000;
public static final int CONNECTION_REQUEST_TIME_OUT = 500;
/**
* 连接数设置
*/
public static final int MAX_CONNECT_NUM = 100;
public static final int MAX_CONNECT_PER_ROUTE = 100;
}
package com.au92.common.elastic;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE;
/**
* @author: p_x_c
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(ElasticSearchProperties.class)
public class ElasticSearchAutoConfiguration implements DisposableBean {
private RestHighLevelClient client;
@Resource
private ElasticSearchProperties properties;
@Override
public void destroy() throws Exception {
log.info("销毁ES连接");
if (client != null) {
client.close();
}
}
/**
* 创建client
*
* @return
* @throws IOException
*/
@Bean
@ConditionalOnMissingBean(RestHighLevelClient.class)
public RestHighLevelClient client() throws IOException {
if (!Objects.equals(null, client)) {
log.warn("client对象没有正确关闭");
client.close();
}
RestClientBuilder builder = RestClient.builder(setting());
setConnectTimeOutConfig(builder);
setMutiConnectConfig(builder);
client = new RestHighLevelClient(builder);
return client;
}
/**
* Elastic Service
*
* @return
*/
@Bean
@Scope(value = SCOPE_PROTOTYPE)
@ConditionalOnMissingBean(IElasticSearchService.class)
public IElasticSearchService service() {
return new ElasticSearchServiceImpl();
}
/**
* 解析配置文件
*
* @return
*/
private HttpHost[] setting() {
return Arrays.stream(properties.getClusterNodes().split(ElasticConstant.CLUSTER_SPLIT))
.map(x -> {
String[] addressPortPairs = x.split(ElasticConstant.SCHEME_SPLIT);
String schemeName = addressPortPairs[0].toLowerCase();
String hostname = addressPortPairs[1].substring(2);
Integer port = Integer.valueOf(addressPortPairs[2]);
return new HttpHost(hostname, port, schemeName);
})
.collect(Collectors.toList())
.toArray(new HttpHost[0]);
}
/**
* 超时设置
*
* @param builder
*/
private void setConnectTimeOutConfig(RestClientBuilder builder) {
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(ElasticConstant.CONNECT_TIME_OUT)
.setSocketTimeout(ElasticConstant.SOCKET_TIME_OUT)
.setConnectionRequestTimeout(ElasticConstant.CONNECTION_REQUEST_TIME_OUT);
return requestConfigBuilder;
});
}
/**
* 并发连接数设置
*
* @param builder
*/
private void setMutiConnectConfig(RestClientBuilder builder) {
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(ElasticConstant.MAX_CONNECT_NUM)
.setMaxConnPerRoute(ElasticConstant.MAX_CONNECT_PER_ROUTE);
return httpClientBuilder;
});
}
}
package com.au92.common.elastic;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author: p_x_c
*/
@Data
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchProperties {
/**
* es集群地址,多个用英文逗号分开
*/
private String clusterNodes = "http://127.0.0.1:9200";
/**
* es 用户名
*/
private String userName = StringUtils.EMPTY;
/**
* es 用户密码
*/
private String password = StringUtils.EMPTY;
/**
* 默认es索引名称
*/
private String index = "default";
}
package com.au92.common.elastic;
import com.alibaba.fastjson.JSON;
import com.au92.common.elastic.page.Page;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author: p_x_c
*/
@Slf4j
public class ElasticSearchServiceImpl implements IElasticSearchService {
@Resource
private RestHighLevelClient client;
@Resource
private ElasticSearchProperties properties;
/**
* 已存在的索引列表
*/
private volatile static List<String> existIndexs = Lists.newArrayList();
/**
* 查询的索引名称
*/
private ThreadLocal<String> index = new ThreadLocal<>();
/**
* 索引Mapping
*/
private String mapping;
@Override
public IElasticSearchService of(String index) {
this.index.set(index);
return this;
}
@Override
public IndexResponse save(EsEntity entity) throws IOException {
indexCheck();
IndexRequest request = new IndexRequest(index.get())
.id(entity.getId())
.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
IndexResponse result = client.index(request, RequestOptions.DEFAULT);
clear();
return result;
}
@Override
public BulkResponse batchSave(List<EsEntity> entities) throws IOException {
indexCheck();
BulkRequest request = new BulkRequest();
entities.forEach(x -> {
IndexRequest req = new IndexRequest(index.get())
.id(x.getId())
.source(JSON.toJSONString(x.getData()), XContentType.JSON);
request.add(req);
});
BulkResponse result = client.bulk(request, RequestOptions.DEFAULT);
clear();
return result;
}
@Override
public <T> BulkResponse batchDelete(Collection<T> ids) throws IOException {
indexCheck();
BulkRequest request = new BulkRequest();
ids.forEach(x -> request.add(new DeleteRequest(index.get(), x.toString())));
BulkResponse result = client.bulk(request, RequestOptions.DEFAULT);
clear();
return result;
}
@Override
public <T> T one(Class<T> cls, Object id) throws IOException {
List<T> res = search(cls, QueryBuilders.termQuery("id", id));
if (res.isEmpty()) {
return null;
}
return res.get(0);
}
@Override
public <T> List<T> mutil(Class<T> cls, Object... ids) throws IOException {
return search(cls, QueryBuilders.termsQuery("id", ids));
}
@Override
public <T> List<T> search(Class<T> cls, QueryBuilder query) throws IOException {
return search(cls, query, new Page()).getData();
}
@Override
public <T> Page<T> search(Class<T> cls, QueryBuilder query, Page page) throws IOException {
indexCheck();
query = Optional.ofNullable(query).orElse(new BoolQueryBuilder());
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(query)
.from((page.getPageNumber() - 1) * page.getPageSize())
.size(page.getPageSize() + 1)
.sort(page.getSort(), page.getSortOrder());
SearchRequest request = new SearchRequest(index.get()).source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits all = response.getHits();
SearchHit[] hits = all.getHits();
LinkedList<T> res = Arrays.stream(hits)
.map(x -> JSON.parseObject(x.getSourceAsString(), cls))
.collect(Collectors.toCollection(Lists::newLinkedList));
Boolean isHasNext = res.size() > page.getPageSize();
if (isHasNext) {
res.removeLast();
}
clear();
return new Page<T>()
.setData(res)
.setPageNumber(page.getPageNumber())
.setPageSize(page.getPageSize())
.setTotal(all.getTotalHits().value)
.setHasNext(isHasNext);
}
@Override
public AcknowledgedResponse surePhysicalRemoveIndex() throws IOException {
indexCheck();
AcknowledgedResponse result = client.indices().delete(new DeleteIndexRequest(index.get()), RequestOptions.DEFAULT);
existIndexs.remove(index.get());
clear();
return result;
}
@Override
public BulkByScrollResponse deleteByQuery(QueryBuilder builder) throws IOException {
indexCheck();
DeleteByQueryRequest request = new DeleteByQueryRequest(index.get())
.setQuery(builder)
.setBatchSize(ElasticConstant.MAX);
request.setConflicts("proceed");
BulkByScrollResponse result = client.deleteByQuery(request, RequestOptions.DEFAULT);
clear();
return result;
}
@Override
public IElasticSearchService mapping(String mapping) {
this.mapping = mapping;
return this;
}
/**
* 创建索引
*
* @throws IOException
*/
private void create() throws IOException {
String map = Strings.isEmpty(mapping) ? ElasticConstant.CREATE_INDEX : mapping;
CreateIndexRequest request = new CreateIndexRequest(index.get())
.settings(Settings.builder()
.put("index.number_of_shards", ElasticConstant.SHARDS)
.put("index.number_of_replicas", ElasticConstant.REPLICAS))
.mapping(map, XContentType.JSON);
CreateIndexResponse res = client.indices().create(request, RequestOptions.DEFAULT);
if (!res.isAcknowledged()) {
throw new RuntimeException("索引初始化失败");
}
existIndexs.add(index.get());
}
/**
* 从配置文件中加载默认索引Index名称
*
* @return 索引名称
*/
private String defaultIndex() {
return properties.getIndex();
}
/**
* 请求ES判断是否存在Index
*
* @return 是 OR 否
* @throws IOException
*/
private Boolean juageIndexExist() throws IOException {
GetIndexRequest request = new GetIndexRequest(index.get());
request.local(false);
request.humanReadable(true);
request.includeDefaults(false);
return client.indices().exists(request, RequestOptions.DEFAULT);
}
/**
* index检查,不存在则创建
*
* @throws Exception
*/
private void indexCheck() throws IOException {
if (StringUtils.isBlank(index.get())) {
String temp = defaultIndex();
log.warn("没有设置索引,使用默认索引 [" + temp + "]");
index.set(temp);
}
if (existIndexs.contains(index.get())) {
return;
}
if (juageIndexExist()) {
existIndexs.add(index.get());
return;
}
// 如果索引不存在,创建新的索引
create();
}
/**
* 执行完后清理ThreadLocal避免OOM
*/
private void clear() {
this.index.remove();
}
}
package com.au92.common.elastic;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
* @author: p_x_c
*/
@Data
@NoArgsConstructor
@Accessors(chain = true)
public final class EsEntity<T> {
/**
* 文档ID
*/
private String id;
/**
* 文档内容
*/
private T data;
}
package com.au92.common.elastic;
import com.au92.common.elastic.page.Page;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/**
* @author: p_x_c
*/
public interface IElasticSearchService {
/**
* 设置ES索引名称
*
* @param index 索引名称
* @return
*/
IElasticSearchService of(String index);
/**
* 保存、更新单个文档
*
* @param entity 文档内容
* @throws IOException
*/
IndexResponse save(EsEntity entity) throws IOException;
/**
* 查询单条文档
*
* @param id 文档ID
* @return
* @throws IOException
*/
<T> T one(Class<T> cls, Object id) throws IOException;
/**
* 根据ID查询多个文档
*
* @param cls 文档class
* @param ids 文档ID
* @return
* @throws IOException
*/
<T> List<T> mutil(Class<T> cls, Object... ids) throws IOException;
/**
* 批量保存、更新文档
*
* @param entities 要保存的文档
* @return
* @throws IOException
*/
BulkResponse batchSave(List<EsEntity> entities) throws IOException;
/**
* 批量删除文档
*
* @param ids 要删除的ID列表
* @return
* @throws IOException
*/
<T> BulkResponse batchDelete(Collection<T> ids) throws IOException;
/**
* 根据条件删除文档
*
* @param query 要删除的查询条件
* @return
* @throws IOException
*/
BulkByScrollResponse deleteByQuery(QueryBuilder query) throws IOException;
/**
* 从ES中查询
*
* @param cls 返回的文档class
* @param query 查询条件
* @return
* @throws IOException
*/
<T> List<T> search(Class<T> cls, QueryBuilder query) throws IOException;
/**
* 分页查询
*
* @param cls 返回的文档class
* @param query 查询条件
* @param page 分页
* @return
* @throws IOException
*/
<T> Page<T> search(Class<T> cls, QueryBuilder query, Page page) throws IOException;
/**
* <span style='color:red;font-weight:900:'>删除索引</span>
* <p>
* <span style='color:red;font-weight:900:'>请谨慎使用!!!删除后不可恢复!!!</span>
*
* @return
* @throws IOException
*/
AcknowledgedResponse surePhysicalRemoveIndex() throws IOException;
/**
* 设置索引mapping,设置一次即可
*
* @param mapping 索引mapping
* @return
*/
IElasticSearchService mapping(String mapping);
}
package com.au92.common.elastic.page;
import lombok.Data;
import lombok.experimental.Accessors;
import org.elasticsearch.search.sort.SortOrder;
import java.util.List;
/**
* @author: p_x_c
*/
@Data
@Accessors(chain = true)
public class Page<T> {
/**
* 数据总数
*/
private Long total = 0L;
/**
* 页码从1开始
*/
private Integer pageNumber = 1;
/**
* 一页数据条数
*/
private Integer pageSize = 20;
/**
* 排序方式
*/
private SortOrder sortOrder = SortOrder.ASC;
/**
* 排序字段
*/
private String sort = "id";
/**
* 返回数据
*/
private List<T> data;
/**
* 是否有下一页
*/
private Boolean hasNext = false;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment