Last active
October 3, 2019 12:12
-
-
Save Chairo/1b0d06d45e5155b134b7b43abb3d84f6 to your computer and use it in GitHub Desktop.
Elastic Spring Boot Starter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic; | |
/** | |
* @author: p_x_c | |
*/ | |
public class ElasticConstant { | |
/** | |
* 配置文件中多cluster分割符号 | |
*/ | |
public static final String CLUSTER_SPLIT = ","; | |
/** | |
* 配置文件中es url分割符号 | |
*/ | |
public static final String SCHEME_SPLIT = ":"; | |
/** | |
* 创建Index的请求体 | |
*/ | |
public static final String CREATE_INDEX = "{\n" + | |
" \"properties\": {\n" + | |
" \"id\": {\n" + | |
" \"type\": \"integer\"\n" + | |
" },\n" + | |
" \"userId\": {\n" + | |
" \"type\": \"integer\"\n" + | |
" },\n" + | |
" \"name\": {\n" + | |
" \"type\": \"text\",\n" + | |
" \"analyzer\": \"ik_max_word\",\n" + | |
" \"search_analyzer\": \"ik_smart\"\n" + | |
" },\n" + | |
" \"publishTime\": {\n" + | |
" \"type\": \"date\"\n" + | |
" },\n" + | |
" \"url\": {\n" + | |
" \"type\": \"text\",\n" + | |
" \"index\": true,\n" + | |
" \"analyzer\": \"ik_max_word\",\n" + | |
" \"search_analyzer\": \"ik_smart\"\n" + | |
" }\n" + | |
" }\n" + | |
"}"; | |
/** | |
* ES shards数量 | |
*/ | |
public static final Integer SHARDS = 3; | |
/** | |
* replicas 数量 | |
*/ | |
public static final Integer REPLICAS = 2; | |
/** | |
* 批量操作一次最大数量 | |
*/ | |
public static final Integer MAX = 9999; | |
/** | |
* 超时设置 | |
*/ | |
public static final int CONNECT_TIME_OUT = 1000; | |
public static final int SOCKET_TIME_OUT = 30000; | |
public static final int CONNECTION_REQUEST_TIME_OUT = 500; | |
/** | |
* 连接数设置 | |
*/ | |
public static final int MAX_CONNECT_NUM = 100; | |
public static final int MAX_CONNECT_PER_ROUTE = 100; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.http.HttpHost; | |
import org.elasticsearch.client.RestClient; | |
import org.elasticsearch.client.RestClientBuilder; | |
import org.elasticsearch.client.RestHighLevelClient; | |
import org.springframework.beans.factory.DisposableBean; | |
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; | |
import org.springframework.boot.context.properties.EnableConfigurationProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Scope; | |
import javax.annotation.Resource; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import java.util.Objects; | |
import java.util.stream.Collectors; | |
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_PROTOTYPE; | |
/** | |
* @author: p_x_c | |
*/ | |
@Slf4j | |
@Configuration | |
@EnableConfigurationProperties(ElasticSearchProperties.class) | |
public class ElasticSearchAutoConfiguration implements DisposableBean { | |
private RestHighLevelClient client; | |
@Resource | |
private ElasticSearchProperties properties; | |
@Override | |
public void destroy() throws Exception { | |
log.info("销毁ES连接"); | |
if (client != null) { | |
client.close(); | |
} | |
} | |
/** | |
* 创建client | |
* | |
* @return | |
* @throws IOException | |
*/ | |
@Bean | |
@ConditionalOnMissingBean(RestHighLevelClient.class) | |
public RestHighLevelClient client() throws IOException { | |
if (!Objects.equals(null, client)) { | |
log.warn("client对象没有正确关闭"); | |
client.close(); | |
} | |
RestClientBuilder builder = RestClient.builder(setting()); | |
setConnectTimeOutConfig(builder); | |
setMutiConnectConfig(builder); | |
client = new RestHighLevelClient(builder); | |
return client; | |
} | |
/** | |
* Elastic Service | |
* | |
* @return | |
*/ | |
@Bean | |
@Scope(value = SCOPE_PROTOTYPE) | |
@ConditionalOnMissingBean(IElasticSearchService.class) | |
public IElasticSearchService service() { | |
return new ElasticSearchServiceImpl(); | |
} | |
/** | |
* 解析配置文件 | |
* | |
* @return | |
*/ | |
private HttpHost[] setting() { | |
return Arrays.stream(properties.getClusterNodes().split(ElasticConstant.CLUSTER_SPLIT)) | |
.map(x -> { | |
String[] addressPortPairs = x.split(ElasticConstant.SCHEME_SPLIT); | |
String schemeName = addressPortPairs[0].toLowerCase(); | |
String hostname = addressPortPairs[1].substring(2); | |
Integer port = Integer.valueOf(addressPortPairs[2]); | |
return new HttpHost(hostname, port, schemeName); | |
}) | |
.collect(Collectors.toList()) | |
.toArray(new HttpHost[0]); | |
} | |
/** | |
* 超时设置 | |
* | |
* @param builder | |
*/ | |
private void setConnectTimeOutConfig(RestClientBuilder builder) { | |
builder.setRequestConfigCallback(requestConfigBuilder -> { | |
requestConfigBuilder.setConnectTimeout(ElasticConstant.CONNECT_TIME_OUT) | |
.setSocketTimeout(ElasticConstant.SOCKET_TIME_OUT) | |
.setConnectionRequestTimeout(ElasticConstant.CONNECTION_REQUEST_TIME_OUT); | |
return requestConfigBuilder; | |
}); | |
} | |
/** | |
* 并发连接数设置 | |
* | |
* @param builder | |
*/ | |
private void setMutiConnectConfig(RestClientBuilder builder) { | |
builder.setHttpClientConfigCallback(httpClientBuilder -> { | |
httpClientBuilder.setMaxConnTotal(ElasticConstant.MAX_CONNECT_NUM) | |
.setMaxConnPerRoute(ElasticConstant.MAX_CONNECT_PER_ROUTE); | |
return httpClientBuilder; | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic; | |
import lombok.Data; | |
import org.apache.commons.lang3.StringUtils; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
/** | |
* @author: p_x_c | |
*/ | |
@Data | |
@ConfigurationProperties(prefix = "elasticsearch") | |
public class ElasticSearchProperties { | |
/** | |
* es集群地址,多个用英文逗号分开 | |
*/ | |
private String clusterNodes = "http://127.0.0.1:9200"; | |
/** | |
* es 用户名 | |
*/ | |
private String userName = StringUtils.EMPTY; | |
/** | |
* es 用户密码 | |
*/ | |
private String password = StringUtils.EMPTY; | |
/** | |
* 默认es索引名称 | |
*/ | |
private String index = "default"; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic; | |
import com.alibaba.fastjson.JSON; | |
import com.au92.common.elastic.page.Page; | |
import com.google.common.collect.Lists; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.commons.lang3.StringUtils; | |
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; | |
import org.elasticsearch.action.bulk.BulkRequest; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.delete.DeleteRequest; | |
import org.elasticsearch.action.index.IndexRequest; | |
import org.elasticsearch.action.index.IndexResponse; | |
import org.elasticsearch.action.search.SearchRequest; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.support.master.AcknowledgedResponse; | |
import org.elasticsearch.client.RequestOptions; | |
import org.elasticsearch.client.RestHighLevelClient; | |
import org.elasticsearch.client.indices.CreateIndexRequest; | |
import org.elasticsearch.client.indices.CreateIndexResponse; | |
import org.elasticsearch.client.indices.GetIndexRequest; | |
import org.elasticsearch.common.Strings; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.xcontent.XContentType; | |
import org.elasticsearch.index.query.BoolQueryBuilder; | |
import org.elasticsearch.index.query.QueryBuilder; | |
import org.elasticsearch.index.query.QueryBuilders; | |
import org.elasticsearch.index.reindex.BulkByScrollResponse; | |
import org.elasticsearch.index.reindex.DeleteByQueryRequest; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.search.SearchHits; | |
import org.elasticsearch.search.builder.SearchSourceBuilder; | |
import javax.annotation.Resource; | |
import java.io.IOException; | |
import java.util.*; | |
import java.util.stream.Collectors; | |
/** | |
* @author: p_x_c | |
*/ | |
@Slf4j | |
public class ElasticSearchServiceImpl implements IElasticSearchService { | |
@Resource | |
private RestHighLevelClient client; | |
@Resource | |
private ElasticSearchProperties properties; | |
/** | |
* 已存在的索引列表 | |
*/ | |
private volatile static List<String> existIndexs = Lists.newArrayList(); | |
/** | |
* 查询的索引名称 | |
*/ | |
private ThreadLocal<String> index = new ThreadLocal<>(); | |
/** | |
* 索引Mapping | |
*/ | |
private String mapping; | |
@Override | |
public IElasticSearchService of(String index) { | |
this.index.set(index); | |
return this; | |
} | |
@Override | |
public IndexResponse save(EsEntity entity) throws IOException { | |
indexCheck(); | |
IndexRequest request = new IndexRequest(index.get()) | |
.id(entity.getId()) | |
.source(JSON.toJSONString(entity.getData()), XContentType.JSON); | |
IndexResponse result = client.index(request, RequestOptions.DEFAULT); | |
clear(); | |
return result; | |
} | |
@Override | |
public BulkResponse batchSave(List<EsEntity> entities) throws IOException { | |
indexCheck(); | |
BulkRequest request = new BulkRequest(); | |
entities.forEach(x -> { | |
IndexRequest req = new IndexRequest(index.get()) | |
.id(x.getId()) | |
.source(JSON.toJSONString(x.getData()), XContentType.JSON); | |
request.add(req); | |
}); | |
BulkResponse result = client.bulk(request, RequestOptions.DEFAULT); | |
clear(); | |
return result; | |
} | |
@Override | |
public <T> BulkResponse batchDelete(Collection<T> ids) throws IOException { | |
indexCheck(); | |
BulkRequest request = new BulkRequest(); | |
ids.forEach(x -> request.add(new DeleteRequest(index.get(), x.toString()))); | |
BulkResponse result = client.bulk(request, RequestOptions.DEFAULT); | |
clear(); | |
return result; | |
} | |
@Override | |
public <T> T one(Class<T> cls, Object id) throws IOException { | |
List<T> res = search(cls, QueryBuilders.termQuery("id", id)); | |
if (res.isEmpty()) { | |
return null; | |
} | |
return res.get(0); | |
} | |
@Override | |
public <T> List<T> mutil(Class<T> cls, Object... ids) throws IOException { | |
return search(cls, QueryBuilders.termsQuery("id", ids)); | |
} | |
@Override | |
public <T> List<T> search(Class<T> cls, QueryBuilder query) throws IOException { | |
return search(cls, query, new Page()).getData(); | |
} | |
@Override | |
public <T> Page<T> search(Class<T> cls, QueryBuilder query, Page page) throws IOException { | |
indexCheck(); | |
query = Optional.ofNullable(query).orElse(new BoolQueryBuilder()); | |
SearchSourceBuilder builder = new SearchSourceBuilder() | |
.query(query) | |
.from((page.getPageNumber() - 1) * page.getPageSize()) | |
.size(page.getPageSize() + 1) | |
.sort(page.getSort(), page.getSortOrder()); | |
SearchRequest request = new SearchRequest(index.get()).source(builder); | |
SearchResponse response = client.search(request, RequestOptions.DEFAULT); | |
SearchHits all = response.getHits(); | |
SearchHit[] hits = all.getHits(); | |
LinkedList<T> res = Arrays.stream(hits) | |
.map(x -> JSON.parseObject(x.getSourceAsString(), cls)) | |
.collect(Collectors.toCollection(Lists::newLinkedList)); | |
Boolean isHasNext = res.size() > page.getPageSize(); | |
if (isHasNext) { | |
res.removeLast(); | |
} | |
clear(); | |
return new Page<T>() | |
.setData(res) | |
.setPageNumber(page.getPageNumber()) | |
.setPageSize(page.getPageSize()) | |
.setTotal(all.getTotalHits().value) | |
.setHasNext(isHasNext); | |
} | |
@Override | |
public AcknowledgedResponse surePhysicalRemoveIndex() throws IOException { | |
indexCheck(); | |
AcknowledgedResponse result = client.indices().delete(new DeleteIndexRequest(index.get()), RequestOptions.DEFAULT); | |
existIndexs.remove(index.get()); | |
clear(); | |
return result; | |
} | |
@Override | |
public BulkByScrollResponse deleteByQuery(QueryBuilder builder) throws IOException { | |
indexCheck(); | |
DeleteByQueryRequest request = new DeleteByQueryRequest(index.get()) | |
.setQuery(builder) | |
.setBatchSize(ElasticConstant.MAX); | |
request.setConflicts("proceed"); | |
BulkByScrollResponse result = client.deleteByQuery(request, RequestOptions.DEFAULT); | |
clear(); | |
return result; | |
} | |
@Override | |
public IElasticSearchService mapping(String mapping) { | |
this.mapping = mapping; | |
return this; | |
} | |
/** | |
* 创建索引 | |
* | |
* @throws IOException | |
*/ | |
private void create() throws IOException { | |
String map = Strings.isEmpty(mapping) ? ElasticConstant.CREATE_INDEX : mapping; | |
CreateIndexRequest request = new CreateIndexRequest(index.get()) | |
.settings(Settings.builder() | |
.put("index.number_of_shards", ElasticConstant.SHARDS) | |
.put("index.number_of_replicas", ElasticConstant.REPLICAS)) | |
.mapping(map, XContentType.JSON); | |
CreateIndexResponse res = client.indices().create(request, RequestOptions.DEFAULT); | |
if (!res.isAcknowledged()) { | |
throw new RuntimeException("索引初始化失败"); | |
} | |
existIndexs.add(index.get()); | |
} | |
/** | |
* 从配置文件中加载默认索引Index名称 | |
* | |
* @return 索引名称 | |
*/ | |
private String defaultIndex() { | |
return properties.getIndex(); | |
} | |
/** | |
* 请求ES判断是否存在Index | |
* | |
* @return 是 OR 否 | |
* @throws IOException | |
*/ | |
private Boolean juageIndexExist() throws IOException { | |
GetIndexRequest request = new GetIndexRequest(index.get()); | |
request.local(false); | |
request.humanReadable(true); | |
request.includeDefaults(false); | |
return client.indices().exists(request, RequestOptions.DEFAULT); | |
} | |
/** | |
* index检查,不存在则创建 | |
* | |
* @throws Exception | |
*/ | |
private void indexCheck() throws IOException { | |
if (StringUtils.isBlank(index.get())) { | |
String temp = defaultIndex(); | |
log.warn("没有设置索引,使用默认索引 [" + temp + "]"); | |
index.set(temp); | |
} | |
if (existIndexs.contains(index.get())) { | |
return; | |
} | |
if (juageIndexExist()) { | |
existIndexs.add(index.get()); | |
return; | |
} | |
// 如果索引不存在,创建新的索引 | |
create(); | |
} | |
/** | |
* 执行完后清理ThreadLocal避免OOM | |
*/ | |
private void clear() { | |
this.index.remove(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic; | |
import lombok.Data; | |
import lombok.NoArgsConstructor; | |
import lombok.experimental.Accessors; | |
/** | |
* @author: p_x_c | |
*/ | |
@Data | |
@NoArgsConstructor | |
@Accessors(chain = true) | |
public final class EsEntity<T> { | |
/** | |
* 文档ID | |
*/ | |
private String id; | |
/** | |
* 文档内容 | |
*/ | |
private T data; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic; | |
import com.au92.common.elastic.page.Page; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.index.IndexResponse; | |
import org.elasticsearch.action.support.master.AcknowledgedResponse; | |
import org.elasticsearch.index.query.QueryBuilder; | |
import org.elasticsearch.index.reindex.BulkByScrollResponse; | |
import java.io.IOException; | |
import java.util.Collection; | |
import java.util.List; | |
/** | |
* @author: p_x_c | |
*/ | |
public interface IElasticSearchService { | |
/** | |
* 设置ES索引名称 | |
* | |
* @param index 索引名称 | |
* @return | |
*/ | |
IElasticSearchService of(String index); | |
/** | |
* 保存、更新单个文档 | |
* | |
* @param entity 文档内容 | |
* @throws IOException | |
*/ | |
IndexResponse save(EsEntity entity) throws IOException; | |
/** | |
* 查询单条文档 | |
* | |
* @param id 文档ID | |
* @return | |
* @throws IOException | |
*/ | |
<T> T one(Class<T> cls, Object id) throws IOException; | |
/** | |
* 根据ID查询多个文档 | |
* | |
* @param cls 文档class | |
* @param ids 文档ID | |
* @return | |
* @throws IOException | |
*/ | |
<T> List<T> mutil(Class<T> cls, Object... ids) throws IOException; | |
/** | |
* 批量保存、更新文档 | |
* | |
* @param entities 要保存的文档 | |
* @return | |
* @throws IOException | |
*/ | |
BulkResponse batchSave(List<EsEntity> entities) throws IOException; | |
/** | |
* 批量删除文档 | |
* | |
* @param ids 要删除的ID列表 | |
* @return | |
* @throws IOException | |
*/ | |
<T> BulkResponse batchDelete(Collection<T> ids) throws IOException; | |
/** | |
* 根据条件删除文档 | |
* | |
* @param query 要删除的查询条件 | |
* @return | |
* @throws IOException | |
*/ | |
BulkByScrollResponse deleteByQuery(QueryBuilder query) throws IOException; | |
/** | |
* 从ES中查询 | |
* | |
* @param cls 返回的文档class | |
* @param query 查询条件 | |
* @return | |
* @throws IOException | |
*/ | |
<T> List<T> search(Class<T> cls, QueryBuilder query) throws IOException; | |
/** | |
* 分页查询 | |
* | |
* @param cls 返回的文档class | |
* @param query 查询条件 | |
* @param page 分页 | |
* @return | |
* @throws IOException | |
*/ | |
<T> Page<T> search(Class<T> cls, QueryBuilder query, Page page) throws IOException; | |
/** | |
* <span style='color:red;font-weight:900:'>删除索引</span> | |
* <p> | |
* <span style='color:red;font-weight:900:'>请谨慎使用!!!删除后不可恢复!!!</span> | |
* | |
* @return | |
* @throws IOException | |
*/ | |
AcknowledgedResponse surePhysicalRemoveIndex() throws IOException; | |
/** | |
* 设置索引mapping,设置一次即可 | |
* | |
* @param mapping 索引mapping | |
* @return | |
*/ | |
IElasticSearchService mapping(String mapping); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.au92.common.elastic.page; | |
import lombok.Data; | |
import lombok.experimental.Accessors; | |
import org.elasticsearch.search.sort.SortOrder; | |
import java.util.List; | |
/** | |
* @author: p_x_c | |
*/ | |
@Data | |
@Accessors(chain = true) | |
public class Page<T> { | |
/** | |
* 数据总数 | |
*/ | |
private Long total = 0L; | |
/** | |
* 页码从1开始 | |
*/ | |
private Integer pageNumber = 1; | |
/** | |
* 一页数据条数 | |
*/ | |
private Integer pageSize = 20; | |
/** | |
* 排序方式 | |
*/ | |
private SortOrder sortOrder = SortOrder.ASC; | |
/** | |
* 排序字段 | |
*/ | |
private String sort = "id"; | |
/** | |
* 返回数据 | |
*/ | |
private List<T> data; | |
/** | |
* 是否有下一页 | |
*/ | |
private Boolean hasNext = false; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment