实时可视化大数据项目04 -- 后端代码
程序员文章站
2022-10-03 16:50:00
1、SpringBootApplicationimport org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;/** * @author ayub */@SpringBootApplicationpublic class BigdataEsApplication { public static void mai...
1、启动类SpringBootApplication
SpringApplication.run,很简单主类反射一下就好了。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author ayub
*/
@SpringBootApplication
public class BigdataEsApplication {
public static void main(String[] args) {
SpringApplication.run(BigdataEsApplication.class, args);
}
}
2、web.EsQueryController
Controller层,主要是进行暴露给前端接口的地方
import com.qhins.bigdata.es.dao.*;
import com.qhins.bigdata.es.service.EsQueryESService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
@RestController
public class EsQueryController {
@Autowired
EsQueryESService esService;
/**
* 根据特定索引ID查询数据
* @param crt_tm id
* @return pojo
*/
@RequestMapping("/queryVipPremiun")
public QhpmsVipColl queryVipPremiun(String crt_tm){
return esService.buildQuery(crt_tm);
}
/**进行查询索引,must条件,以及notmust条件的group By判断
*
* @param indexName 索引名字
* @param rangeQueryTime 数据的时间
* @param termsQueryName 查询的字段
* @param termsQueryList 多个条件字段
* @param from 下范围
* @param to 上范围
* @return 对象
*/
@RequestMapping(value = "/queryIndexNotMustRangAggs",method = RequestMethod.GET)
public List<AggsKeyWord> queryIndexNotMustRangAggs(String indexName, String rangeQueryTime, String termsQueryName,
@RequestParam(value="termsQueryList",required=false)List<String> termsQueryList ,
String from, String to){
return esService.queryIndexNotMustRangAggs(indexName,rangeQueryTime,termsQueryName,termsQueryList,from,to);
}
@RequestMapping(value = "/queryRangAggs",method = RequestMethod.GET)
public List<AggsKeyWord> queryRangAggs(String indexName,String groupName,@RequestParam(value="groupList",required=false)List<String> groupList ,
String codeName,@RequestParam(value="codeList",required=false)List<String> codeList ,
String registerTime,String registerFrom,String registerTo,
String endTime,String endFrom,String endTo, String aggName,int aggSize){
return esService.queryRangAggs( indexName, groupName, groupList ,
codeName, codeList ,
registerTime, registerFrom, registerTo,
endTime, endFrom, endTo,
aggName,aggSize);
}
@RequestMapping("/queryUserNow")
public List<PmsUser> queryUserNow(String indexName, String groupName, String[] groupList , String registerTime,
String registerFrom, String registerTo, String[] includes, String lookupResult){
return esService.queryUserNow(indexName,groupName,groupList , registerTime,registerFrom,registerTo, includes,lookupResult);
}
@RequestMapping("/dateQueryTotal")
public NumberTotal dateQueryTotal(String indexName, String[] groupName, String registerTime, String registerFrom, String registerTo, String vipEndTime) {
NumberTotal numberTotal = new NumberTotal();
Long vipTotal = esService.dateQueryTotal(indexName, registerTime, registerFrom, registerTo);
Long groupTotal = esService.dateQueryTotal(indexName, groupName, registerTime, registerFrom, registerTo);
Long endTotal = esService.payQueryTotal(indexName, registerTime, registerFrom, registerTo, vipEndTime);
numberTotal.setVipTotal(vipTotal);
numberTotal.setGroupTotal(groupTotal);
numberTotal.setEndTotal(endTotal);
return numberTotal;
}
@RequestMapping("/carStateQuery")
public List<CarStateGroup> carStateQuery(String indexName, String group1, String group2){
//分组1
ArrayList<CarStateGroup> carStateGroups = new ArrayList<>();
List<AggsKeyWord> keyWordList = esService.carStateQuery(indexName, group1);
CarStateGroup carStateNumber = getCarStateNumber(group1, keyWordList);
//分组2
List<AggsKeyWord> keyWordList1 = esService.carStateQuery(indexName, group2);
CarStateGroup carStateNumber1 = getCarStateNumber(group2, keyWordList1);
carStateGroups.add(carStateNumber);
carStateGroups.add(carStateNumber1);
return carStateGroups;
}
@RequestMapping("/getVipAndPremium")
public VipAndPremium getVipAndPremium(){
return esService.getVipAndPremium();
}
/***拆分合并数据
*
* @param group1 用户组
* @param keyWordList 分组数据
* @return 整合数据
*/
private CarStateGroup getCarStateNumber(String group1, List<AggsKeyWord> keyWordList) {
CarStateGroup carStateGroup = new CarStateGroup();
long numTotal=0L;
for(AggsKeyWord aggs:keyWordList){
if ("激活".equals(aggs.getName())){
carStateGroup.setStateCount(aggs.getValue());//激活人数
}
numTotal+=aggs.getValue();
}
carStateGroup.setName(group1);//分组
carStateGroup.setNumCount(numTotal);//总数
if (carStateGroup.getStateCount()==null){
carStateGroup.setStateCount(0L);
}
if (carStateGroup.getNumCount()==null){
carStateGroup.setNumCount(0L);
}
double scale = carStateGroup.getStateCount() * 1.0 / numTotal * 100;
carStateGroup.setLoginScale((double) Math.round(scale * 10) / 10);
return carStateGroup;
}
}
3、service.EsQueryESService
业务层,使用ES高级API调用ES,
import com.qhins.bigdata.es.dao.*;
import com.qhins.bigdata.es.support.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.*;
@Service
public class EsQueryESService {
@Resource(name = "highLevelClient")
RestHighLevelClient client;
private static final Logger logger = LoggerFactory.getLogger(EsQueryESService.class);
/** 根据ID 查询数据 会员和保费情况
* @Description 查询当年保费和VIP
* @Author say
* @Date 2020/6/30
* @Param 无
* @Return List当天
*/
public QhpmsVipColl buildQuery(String crt_tm){
SearchRequest request = new SearchRequest("qhpms_vip_coll");
SearchSourceBuilder builder = new SearchSourceBuilder();
// GET qhpms_vip_coll/_search
// {"query": {"bool": {"must": [{"term": {"_id": {"value": "2020-06-30"} }}] }}}
//定义一个termQuery
if (StringUtils.isEmpty(crt_tm)){
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
crt_tm = df.format(new Date());
}
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("_id", crt_tm);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(termQueryBuilder);
builder.query(boolQueryBuilder).from(0).size(1);
//设置源字段过滤,第一个参数结果集包括哪些字段,第二字段结果集不包括哪些字段
String[] includes = {"C_YEAR_PREMIUM","C_VIP_TOTAL","C_CRT_TM"};
builder.fetchSource(includes,new String[]{"type"});
request.source(builder);
logger.info("buildQueryDSL:"+builder.toString());
SearchHit[] searchHits = getSearchHits(request);
for (SearchHit searchHit : searchHits) {
//源文档内容
String sourceAsString = searchHit.getSourceAsString();
logger.info("sourceString:"+sourceAsString);
return JSON.parseObject(sourceAsString, QhpmsVipColl.class);
}
return new QhpmsVipColl(0.0,"",0.0);
}
/***类似于SQL select a,count(a) from A where a not in ('b','c') and c>1 and c<2 group by a;
* 索引根据索引terms查询,rang agg分组 职群分布
*
* @param indexName 索引
* @param rangeQueryTime 时间字段
* @param termsQueryName terms字段
* @param termsQueryList terms数据
* @param from 时间底区间
* @param to 时间上区间
* @return 数据对象List(key,value)
*/
public List<AggsKeyWord> queryIndexNotMustRangAggs(String indexName, String rangeQueryTime, String termsQueryName, List<String> termsQueryList , String from, String to){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
//字段
//定义一个termsQuery
//List<String> list = Stream.of("车险转换", "车险批量转换", "默认", "互联网业务部双十一抽奖会员引流").collect(Collectors.toList());
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(termsQueryName, termsQueryList);
//定义一个ranggeQuery
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(rangeQueryTime)
.from(from)
.to(to);
//分组聚合terms:别名,分组字段分组字段
AggregationBuilder groupByType = AggregationBuilders
.terms("all_group_worker")
.field(termsQueryName).size(7);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//mustNot
boolQueryBuilder.mustNot(termsQueryBuilder);
//must
boolQueryBuilder.must(rangeQuery);
//构建查询
//设置源字段过滤,第一个参数结果集包括哪些字段,第二字段结果集不包括哪些字段
//String[] includes = {"sum_other_doc_count","buckets"};
builder.query(boolQueryBuilder).aggregation(groupByType).size(0);
request.source(builder);
logger.info("queryIndexNotMustRangAggsDSL:"+builder.toString());
return getAggsBucket(request);
}
/** 查询分组agg详情 优秀店长
* @Description
* @Author say
* @Date 2020/7/2
* @Param
* @Return
*/
public List<AggsKeyWord> queryRangAggs(String indexName,String groupName,List<String> groupList ,
String codeName,List<String> codeList ,
String registerTime,String registerFrom,String registerTo,
String endTime,String endFrom,String endTo,
String aggName,int aggSize){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
//terms
TermsQueryBuilder codeQuery = QueryBuilders.termsQuery(codeName, codeList);
TermsQueryBuilder groupQuery = QueryBuilders.termsQuery(groupName, groupList);
//range
RangeQueryBuilder registerRangQuery = QueryBuilders.rangeQuery(registerTime).from(registerFrom).to(registerTo);
RangeQueryBuilder endRangQuery = QueryBuilders.rangeQuery(endTime).from(endFrom).to(endTo);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.mustNot(codeQuery).mustNot(groupQuery).must(registerRangQuery).must(endRangQuery);
//分组聚合terms:别名,分组字段分组字段
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("all_group").field(aggName).size(aggSize);
//执行,不需要源数据
builder.query(boolQueryBuilder).aggregation(aggregationBuilder).size(0);
request.source(builder);
logger.info("queryRangAggsDSL:"+builder.toString());
return getAggsBucket(request);
}
/**实时查询 最新会员明细
*
* @param indexName 索引
* @param groupName must_not字段
* @param groupList must_not_List
* @param registerTime 注册时间字段
* @param registerFrom 时间from
* @param registerTo 时间to
* @param includes source字段
*/
public List<PmsUser> queryUserNow(String indexName, String groupName, String[] groupList ,
String registerTime, String registerFrom, String registerTo,
String[] includes, String lookupResult){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
//terms
TermsQueryBuilder groupQuery = QueryBuilders.termsQuery(groupName, groupList);
//range
RangeQueryBuilder registerRangQuery = QueryBuilders.rangeQuery(registerTime).from(registerFrom).to(registerTo);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(registerRangQuery).mustNot(groupQuery);
//查询,执行
builder.query(boolQueryBuilder).fetchSource(includes,null).sort(registerTime, SortOrder.DESC).size(10);
request.source(builder);
logger.info("queryUserNowDSL:"+builder.toString());
SearchHit[] searchHits = getSearchHits(request);
ArrayList<PmsUser> userList = new ArrayList<>();
for (SearchHit searchHit : searchHits) {
//源文档内容
String sourceAsString = searchHit.getSourceAsString();
//获取字段
logger.info("sourceString:"+sourceAsString);
PmsUser pmsUser = JSONObject.parseObject(JSONObject.parseObject(sourceAsString).getString(lookupResult),PmsUser.class);
userList.add(pmsUser);
}
return userList;
}
/**时间范围查询求个数 会员情况
*
* @param indexName 索引
* @param registerTime 注册时间
* @param registerFrom 上
* @param registerTo 下
* @return 总数
*/
public Long dateQueryTotal(String indexName,String registerTime,String registerFrom,String registerTo){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
//range
RangeQueryBuilder registerRangQuery = QueryBuilders.rangeQuery(registerTime).from(registerFrom).to(registerTo);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(registerRangQuery);
//查询,执行
builder.query(boolQueryBuilder).size(0);
request.source(builder);
logger.info("dateQueryTotalDSL:"+builder.toString());
return getSearchTotal(request);
}
/**支付时间范围查询求个数
*
* @param indexName 索引
* @param payTime 支付时间
* @param payFrom 上
* @param payTo 下
* @return 总数
*/
public Long payQueryTotal(String indexName,String payTime,String payFrom,String payTo,String vipEndTime){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
//range
RangeQueryBuilder payRangQuery = QueryBuilders.rangeQuery(payTime).from(payFrom).to(payTo);
RangeQueryBuilder endRangQuery = QueryBuilders.rangeQuery(vipEndTime).from(payFrom);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(payRangQuery).must(endRangQuery);
//查询,执行
builder.query(boolQueryBuilder).size(0);
request.source(builder);
logger.info("payQueryTotalDSL:"+builder.toString());
return getSearchTotal(request);
}
/**时间范围字段查询求个数
*
* @param indexName 索引
* @param groupName 分组
* @param registerTime 时间
* @param registerFrom 上
* @param registerTo 下
* @return 个数
*/
public Long dateQueryTotal(String indexName,String[] groupName,String registerTime,String registerFrom,String registerTo){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
//range
RangeQueryBuilder registerRangQuery = QueryBuilders.rangeQuery(registerTime).from(registerFrom).to(registerTo);
TermsQueryBuilder termsQuery = QueryBuilders.termsQuery("lookupResult.group_worker.keyword", groupName);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(registerRangQuery).must(termsQuery);
//查询,执行
builder.query(boolQueryBuilder).size(0);
request.source(builder);
logger.info("dateQueryTotalGDSL:"+builder.toString());
return getSearchTotal(request);
}
/***车险转换激活人数 只是分组
*
* @param indexName 索引名字
* @param name 分组字段
* @return 分组字段
*/
public List<AggsKeyWord> carStateQuery(String indexName, String name){
SearchRequest request = new SearchRequest(indexName);//"xls_member_sum"
SearchSourceBuilder builder = new SearchSourceBuilder();
TermsQueryBuilder termsQuery = QueryBuilders.termsQuery("lookupResult.group_worker.keyword", name);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(termsQuery);
//分组聚合terms:别名,分组字段分组字段
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("group").field("lookupResult.activation_state.keyword").size(5);
//执行,不需要源数据
builder.query(boolQueryBuilder).aggregation(aggregationBuilder).size(0);
request.source(builder);
logger.info("carStateQueryDSL:"+builder.toString());
return getAggsBucket(request);
}
/***当前时间获取当月,当月保费,上个月保费,环比,会籍期,权益余额,时间达成率
*
* @return 当月,当月保费,上个月保费,环比,会籍期,权益余额,时间达成率的对象
*/
public VipAndPremium getVipAndPremium(){
String index="qhpms_vip_coll";
String[] includes = {"C_MONTH_PREMIUM","C_VALID_VIP","C_VIP_MONEYTOT","C_TIME_PROGRESS","C_PREMIUM_TASK"};
String[] monthP = {"C_MONTH_PREMIUM"};
//获取当前日期
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
//当前日期
String currentDate = dateFormat.format( new Date() );
//上个月
String upDate = DateUtil.addMonth(currentDate, -1);
String upYearMonth = DateUtil.getYearMonth(upDate);
//上上个月
String up2Date = DateUtil.addMonth(upDate, -1);
String up2YearMonth = DateUtil.getYearMonth(up2Date);
//当前
SearchRequest request = getSearchRequestId(index,currentDate,includes);
//上个月最后一天
SearchRequest upMonthPrequest = getSearchRequestId(index, upYearMonth, monthP);
//上个月最后一天
SearchRequest up2MonthPrequest = getSearchRequestId(index, up2YearMonth, monthP);
//查询
SearchHit[] searchHits = getSearchHits(request);
SearchHit[] searchHits1 = getSearchHits(upMonthPrequest);
SearchHit[] searchHits2 = getSearchHits(up2MonthPrequest);
VipAndPremium vipAndPremium=new VipAndPremium();
for (SearchHit searchHit : searchHits) {
//源文档内容
String sourceAsString = searchHit.getSourceAsString();
vipAndPremium = JSON.parseObject(sourceAsString, VipAndPremium.class);
}
Double dUpMonthP=0d;
Double dUp2MonthP=0d;
//上个月
for (SearchHit searchHit1 : searchHits1) {
//源文档内容
String sourceAsString = searchHit1.getSourceAsString();
MonthPremium monthPremium = JSON.parseObject(sourceAsString, MonthPremium.class);
dUpMonthP = monthPremium.getC_MONTH_PREMIUM();
}
//上上个月
for (SearchHit searchHit2 : searchHits2) {
//源文档内容
String sourceAsString = searchHit2.getSourceAsString();
MonthPremium monthPremium = JSON.parseObject(sourceAsString, MonthPremium.class);
dUp2MonthP = monthPremium.getC_MONTH_PREMIUM();
}
if (dUp2MonthP==0d){
vipAndPremium.setUP_PRM_SCALE(0d);
} else {
double scale = (dUpMonthP - dUp2MonthP)*1.0 / dUp2MonthP*100;
BigDecimal dScale = new BigDecimal(scale).setScale(2, RoundingMode.UP);
vipAndPremium.setUP_PRM_SCALE(dScale.doubleValue());
}
vipAndPremium.setUP_MONTH_PREMIUM(dUpMonthP);
return vipAndPremium;
}
/***根据ID查询索引的字段
*
* @param index 索引
* @param id id
* @param includes 包含字段
* @return DSL
*/
private SearchRequest getSearchRequestId(String index,String id,String[] includes) {
SearchRequest request = new SearchRequest(index);
SearchSourceBuilder builder = new SearchSourceBuilder();
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("_id", id);
//定义一个boolQuery
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(termQueryBuilder);
//设置源字段过滤,第一个参数结果集包括哪些字段,第二字段结果集不包括哪些字段
builder.query(boolQueryBuilder).from(0).size(1).fetchSource(includes,null);
request.source(builder);
logger.info("getIndexIdDSL:"+builder.toString());
return request;
}
//同步返回hits
private SearchHit[] getSearchHits(SearchRequest request) {
//执行搜索,向ES发起http请求
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
// 搜索结果,得到文档。
return response.getHits().getHits();
}
//同步返回totalhit
private Long getSearchTotal(SearchRequest request) {
//执行搜索,向ES发起http请求
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
// 搜索结果,得到文档。
return response.getHits().getTotalHits().value;
}
/** 搜索请求,解析聚合key-value
* @Description 搜索解析key-value
* @Author say
* @Date 2020/7/2
* @Param agg请求和搜索条件
* @Return List对象
*/
private List<AggsKeyWord> getAggsBucket(SearchRequest request) {
//执行搜索,向ES发起http请求
SearchResponse response = null;
List<AggsKeyWord> keyWordList = new ArrayList<>();
//聚合
try {
response = client.search(request, RequestOptions.DEFAULT);
Aggregations aggs = response.getAggregations();
for (Aggregation agg:aggs){
Terms stringTerms = (Terms)agg;
List<? extends Terms.Bucket> buckets = stringTerms.getBuckets();
for (Terms.Bucket bucket : buckets){
String key = bucket.getKeyAsString();
long bucketDocCount = bucket.getDocCount();
AggsKeyWord aggsKeyWord = new AggsKeyWord();
aggsKeyWord.setName(key);
aggsKeyWord.setValue(bucketDocCount);
keyWordList.add(aggsKeyWord);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return keyWordList;
}
}
4、dao.AggsKeyWord
用于解析ES的json的桶数据
import lombok.Data;
/**
* 解析桶数据
*/
@Data
public class AggsKeyWord {
private String name;
private Long value;
}
5、dao.CarStateGroup
import lombok.Data;
/**
* 车险转换激活率
*/
@Data
public class CarStateGroup {
private String name;//职群名字
private Long numCount;//总数
private Long stateCount;//激活总数
private double loginScale;//激活率
}
6、dao.MonthPremium
import lombok.Data;
/**
* 解析数字《当月保费》
*/
@Data
public class MonthPremium {
private Double C_MONTH_PREMIUM;
}
7、dao.NumberTotal
import lombok.Data;
//会员信息
@Data
public class NumberTotal {
private Long vipTotal;
private Long groupTotal;
private Long endTotal;
}
8、dao.PmsUser
import lombok.Data;
/**
* 优秀店长
*/
@Data
public class PmsUser {
private String name;
private String groupWorker;
private String memberLevel;
private String depName3;
}
本文地址:https://blog.csdn.net/AyubLIbra/article/details/107504020
上一篇: JAVA中JVM类加载器详细介绍
下一篇: 关于稀疏数组的学习和实现