1.导入es相关jar包
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>4.2.11</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
2.增加es配置
elasticsearch:
hosts: es-service.xiot-v2.svc.cluster.local:port
storage-strategy: NONE #分库规则 按日分 DAY, 按月分 MONTH, 按年分 YEAR, 不分 NONE
shard: 1 #分片数量
3.读取es相关配置
@Configuration
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
@Value("${elasticsearch.hosts}")
private String HOST;
@Override
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(HOST)
.build();
return RestClients.create(clientConfiguration).rest();
}
}
4.创建es实体类,与es mapping设计一致
@Data
@Document(indexName = "data",createIndex = false)
public class ESData {
@Id
@Field(type = FieldType.Text)
@JSONField(name = "id")
private String id;
/**
* 数据时间点: 相对于 UTC-0 1970-1-1 0 点的毫秒数
*/
@Field(name="ts",type = FieldType.Long)
@JSONField(name = "ts")
private Long ts;
//设备说明
@Field(name= "signature.type",type = FieldType.Keyword)
@JSONField(name = "signature.type")
private String signatureType;
}
5.创建es结构
1.新建index:data
put http://ip:port/data
2.新建mapping
put http://ip:port/data/_mapping
{
"properties": {
"ts": {
"type": "long"
},
"name": {
"type": "keyword"
},
"test": {
"type": "nested",
"properties": {
"voltage": {
"type": "nested",
"properties": {
"avg": {
"type": "keyword"
},
"min": {
"type": "keyword"
},
"max": {
"type": "keyword"
}
}
}
}
}
6.创建类继承 ElasticsearchRepository 实现通过api保存实体类ESData到es
public interface ESRepository extends ElasticsearchRepository<ESData, String> {
}
7.保存实体类ESData到es
//1.注入
@Autowired
private ESRepository esRepository;
public void insertData(){
//2.初始化实体类,插入数据
ESData data = new ESData();
....
//3.通过实体类保存es
esRepository.save(data);
}
8.es查询 多字段匹配查询,分组查询,分组后聚合文章来源:https://uudwc.com/A/6zjxk
@Service
public class EsSearchService {
@Autowired
private RestHighLevelClient client;
/**
先查询 后分组 再统计
*/
public List<HashMap> searchBills(LocalDateTime begin, LocalDateTime end,String boxName){
//设置查询es的index
SearchRequest searchRequest = new SearchRequest("data");
//2.创建 SearchSourceBuilder条件构造。
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//若时间未传查询过去七日数据
Long beginTime = null;
Long endTime = null;
if(begin==null){
beginTime =
LocalDateTime.now().plusDays(-7).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
}else{
beginTime = begin.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
}
if(end==null){
endTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli() ;
}else{
endTime = end.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
}
//相当于and查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.rangeQuery("ts").from(beginTime).to(endTime));
if(StringUtils.isNotEmpty(boxName)){
boolQuery.must(QueryBuilders.termQuery("boxName",boxName));
}
searchSourceBuilder.query(boolQuery);
//去重
CollapseBuilder collapseBuilder = new CollapseBuilder("ts");
searchSourceBuilder.collapse(collapseBuilder);
//多字段分组
TermsAggregationBuilder aggregation = AggregationBuilders.terms("aggs_group")
.script(new Script("doc['f1'] +'#'+doc['f2']+'#'+doc['f3']+'#'+doc['f4']+'#'+doc['f5']"))
.size(Integer.MAX_VALUE) .order(BucketOrder.aggregation("_key", true));
//sum聚合 aggregation.subAggregation(AggregationBuilders.sum("f6").field("sum1"))
.subAggregation(AggregationBuilders.sum("f7").field("sum2"));
searchSourceBuilder.aggregation(aggregation);
//3.将 SearchSourceBuilder 添加到 SearchRequest中
searchRequest.source(searchSourceBuilder);
//4.执行查询
SearchResponse searchResponse = null;
try {
searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
//5.解析查询结果
System.out.println("花费的时长:" + searchResponse.getTook());
List<HashMap> result=new ArrayList<>();
if(RestStatus.OK.equals(searchResponse.status())) {
// 获取聚合结果
Aggregations aggregations = searchResponse.getAggregations();
Terms byAggsAggregation = aggregations.get("aggs_group");
for(Terms.Bucket buck : byAggsAggregation.getBuckets()) {
HashMap map=new HashMap();
String[] arr= buck.getKeyAsString().split("#");
//取子聚合
ParsedSum sum1 = buck.getAggregations().get("sum1");
ParsedSum sum2 = buck.getAggregations().get("sum2");
map.put("f1",arr[0].replace("[","").replace("]",""));
map.put("sum1",sum1.getValue());
map.put("sum2",sum2.getValue());
result.add(map);
}
}
return result;
}
}
文章来源地址https://uudwc.com/A/6zjxk