[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)
让人迷茫的原因只有一个:你本该拼搏的年纪,却想得太多,做得太少。
–> 返回专栏总目录 <–
代码下载地址:https://github.com/f641385712/netflix-learning
目录
- 前言
- 正文
- 累计统计流 BucketedCumulativeCounterStream
- CumulativeCommandEventCounterStream
- CumulativeThreadPoolEventCounterStream
- CumulativeCollapserEventCounterStream
- 浅谈metrics指标释意
- 分布流 RollingDistributionStream
- RollingCommandUserLatencyDistributionStream
- RollingCommandLatencyDistributionStream
- RollingCollapserBatchSizeDistributionStream
- 最大并发流 RollingConcurrencyStream
- 配置流 HystrixConfigurationStream
- 功能流 HystrixUtilizationStream
- 使用示例
- 总结
前言
上篇文章 介绍了Hystrix
的“主流”:在滑动窗口内统计流、健康流。既然Hystrix
的指标数据收集是基于事件驱动,那么自然可以多一些监听流,那么本文将做个收尾,对Hystrix
内置的累计统计流、分发流、最大并发流…等等分别做介绍,让小伙伴们能对这种模式有个更深的理解,后面介绍的Hystrix各维度的监控都基于它们扩展出来的哦。
Hystrix已经内置了对事件监听时各种流的实现,大多数数情况下无需自己来扩展实现的,当然若你着实要和第三方监控平台深度集成,那么你也可以自定义收集方式。
正文
累计统计流 BucketedCumulativeCounterStream
它和BucketedRollingCounterStream
的区别是:它在减桶的过程中,持续/无限累积计数。
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket) // 这是最大的区别,使用scan 一直扫描
.skip(numBuckets)
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.share()
.onBackpressureDrop(); // 背压:多余的直接弃掉
}
// 实现父类方法
@Override
public Observable<Output> observe() {
return sourceStream;
}
}
最大的区别就是对bucketedStream
的处理上,滑动窗口使用的是window + flatMap
,而本处使用的是scan,代表着持续/无限累积计数。它有如下实现类:
CumulativeCommandEventCounterStream
CumulativeThreadPoolEventCounterStream
CumulativeCollapserEventCounterStream
以上实现类源码此处均不展示,是因为完全和BucketedRollingCounterStream
体系的实现一模一样,请参照上篇文章即可。唯一的不同在父类:一个只统计指定窗口,一个持续不断的累计统计。
浅谈metrics指标释意
监控是大型分布式系统的必备系统,它们的数据均来自一些指标信息。收集指标信息的库有很多,其中比较出名的有metrics-core
,它可以把收集到的信息提供给Meter、Histogram、Gauge...
等度量工具使用,从而可以画出如下美图:
当然本文并不讲述metrics-core
如何用,而是以一段指标值为例,稍加解释:
{
"version":"3.0.0",
"timers":{
"count":0,
"max":0,
"mean":0,
"min":0,
"p50":0,
"p75":0,
"p95":0,
"p98":0,
"p99":0,
"p999":0,
"stddev":0,
"m15_rate":0,
"m1_rate":0,
"m5_rate":0,
"mean_rate":0,
"duration_units":"seconds",
"rate_units":"calls/second"
}
}
这个指标还是比较详细的,里面会有mean、p75、p99… 这个其实是很关键的数据指标。这些指标主要用于给你设置超时时间提供极有力的参考,如果你每每设置超时时间参考的是RT值
是mean平均值,那你和瞎蒙没啥区别。
另外到底参考那个值,要看你的系统的整体量级,以及需要满足几个9,比如要满足三个9,那么超时时间是需要谨慎的。
分位数p50、p95、p999代表什么意思?
count/max/min/mean这些都不用解释,其它主要关心:
- p50:也叫中位数(注意中位数不是平均数)。它表示把数据总数分为上下两等分,中间的那个数值
- 分位数:分位数是将总体的全部数据按从小到大顺序排列后,处于各等分位置的变量值。
-
p95 = 10ms
:代表95%的响应时间不大于10ms - p99、p999:含义同上
-
p表示:percent 百分比。
-
m15_rate
:15分钟内。请求数/每秒的比率 -
m1_rate
:1分钟内… -
mean_rate
: 平均每秒请求数(平均QPS,意义不大) -
rate_units
:calls/second” 比率单位,这里表示每秒钟请求数
对于监控指标来说,一般来说平均值几乎没有意义,而分位数一般是重点关注的值。
分布流 RollingDistributionStream
在指定时间窗口内分布流。说到分布,所以和统计、画图有关。。。
public class RollingDistributionStream<Event extends HystrixEvent> {
...
// 订阅者可以订阅消费消息,得到各种分位数,都存在CachedValuesHistogram里呢
public Observable<CachedValuesHistogram> observe() {
return rollingDistributionStream;
}
}
虽然它不是抽象类,但它也没标明具体监听哪种事件,使用什么数据流HystrixEventStream
。总之最终它会监听一个消息流(比如HystrixCommandStartStream
它吧),然后通过RxJava的window操作符对一段时间内的数值进行运算操作,生成统计值放在Histogram对象中,然后重新发射。
它内部集成使用度量工具org.HdrHistogram.Histogram
来统计分析指标数据,并且给出非常详细的分位数数据(最高达四个9 -> p9999)。它还有如下子类:
RollingCommandUserLatencyDistributionStream
LatencyDistribution
:延迟发布。延迟的值来自于:调用方线程提交请求和响应可见之间的时间间隔executionResult.getUserThreadLatency()
。
RollingCommandLatencyDistributionStream
延迟发布。和上的区别是延迟时间来自于:executionResult.getExecutionLatency()
表示:time spent in run() method
它们俩监听的均是HystrixCommandCompletionStream
数据流~
RollingCollapserBatchSizeDistributionStream
监听了HystrixCollapserEventStream
消息流,并且监听窗口期内的ADDED_TO_BATCH
消息类型次数,通过Histogram计算后再发射出去。
最大并发流 RollingConcurrencyStream
它用于对最大并发进行统计:对一段时间内的执行并发量取最大值,如Command/ThreadPool的最大并发数。
// 竟然泛型都木有,干净利落
public abstract class RollingConcurrencyStream {
}
它监听的是HystrixCommandExecutionStarted
事件,它会发送并发数过来,从而便可获得event.getCurrentConcurrency()
,对比每个桶(一个桶代表1s),最后取出最大值Math.max(a, b)
。
RollingCommandMaxConcurrencyStream
RollingThreadPoolMaxConcurrencyStream
因为监听HystrixCommandExecutionStarted
事件的有两种事件流:command的和ThreadPool的,所以必须用两个类来表示。它俩除了关心的事件不一样,其它都一样~
配置流 HystrixConfigurationStream
这个类对当前的Hystrix配置进行采样,并将其作为流公开。
public class HystrixConfigurationStream {
...
private final Observable<HystrixConfiguration> allConfigurationStream;
...
public Observable<HystrixConfiguration> observe() {
return allConfigurationStream;
}
// 当然还可以当读监控某一类配置
public Observable<Map<HystrixCommandKey, HystrixCommandConfiguration>> observeCommandConfiguration() {
return allConfigurationStream.map(getOnlyCommandConfig);
}
public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> observeThreadPoolConfiguration() {
return allConfigurationStream.map(getOnlyThreadPoolConfig);
}
public Observable<Map<HystrixCollapserKey, HystrixCollapserConfiguration>> observeCollapserConfiguration() {
return allConfigurationStream.map(getOnlyCollapserConfig);
}
...
}
可使用hystrix.stream.config.intervalInMilliseconds = 5000
来配置多长时间采样一次,默认5000ms也就是5秒采样一次。另外com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet
就是用该流来获取配置信息的。
功能流 HystrixUtilizationStream
Utilization
:使用、利用(使用率、利用率)。这个类对当前Hystrix资源的利用情况进行采样,并将其公开为流。
public class HystrixUtilizationStream {
// HystrixUtilization就是最终的数据结构格式,下面给使用示例
private final Observable<HystrixUtilization> allUtilizationStream;
...
public Observable<HystrixUtilization> observe() {
return allUtilizationStream;
}
public Observable<Map<HystrixCommandKey, HystrixCommandUtilization>> observeCommandUtilization() {
return allUtilizationStream.map(getOnlyCommandUtilization);
}
public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>> observeThreadPoolUtilization() {
return allUtilizationStream.map(getOnlyThreadPoolUtilization);
}
}
可使用hystrix.stream.utilization.intervalInMilliseconds = 500
来配置多长时间采样一次,默认500ms采样一次。另外com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet
就是用该流来获取资源利用信息的。
使用示例
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
// 指定一个HystrixCommandGroupKey,这样熔断策略会按照此组执行
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("MyAppGroup"));
this.name = name;
}
@Override
protected String run() {
if(name == null){
throw new NullPointerException();
}
return "Hello " + name + "!";
}
@Override
protected String getFallback() {
// super.getFallback():No fallback available.
return "this is fallback msg";
}
}
private static final String toJsonString(Object obj) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Test
public void fun1() throws InterruptedException {
// 查看command、线程池的使用情况
HystrixUtilizationStream utilizationStream = HystrixUtilizationStream.getInstance();
// utilizationStream.observeThreadPoolUtilization()
utilizationStream.observe().subscribe(d -> System.out.println(toJsonString(d)));
// 查看配置情况
HystrixConfigurationStream configStream = HystrixConfigurationStream.getInstance();
configStream.observe().subscribe(d -> {
System.out.println(d);
});
// 累计统计流
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());
CumulativeCommandEventCounterStream counterStream = CumulativeCommandEventCounterStream.getInstance(commandKey, properties);
counterStream.observe().subscribe(d -> System.out.println(toJsonString(d)));
// 最大并发流
RollingCommandMaxConcurrencyStream concurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(commandKey, properties);
concurrencyStream.observe().subscribe(d -> System.out.println(toJsonString(d)));
// 发送事件(发送多次)
CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.execute();
helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.queue();
// 走fallabck
helloWorld = new CommandHelloWorld(null);
helloWorld.queue();
// 因为配置5秒钟才能打印一次
TimeUnit.SECONDS.sleep(5);
}
运行程序,控制台输出:
资源利用情况:
{
"commandUtilizationMap":{
"CommandHelloWorld":{
"concurrentCommandCount":0
}
},
"threadPoolUtilizationMap":{
"MyAppGroup":{
"currentActiveCount":0,
"currentCorePoolSize":10,
"currentPoolSize":3,
"currentQueueSize":0
}
}
}
这是功能流的数据,最明显的是启动了三次任务,线程池大小目前是3。另外,因为配置流中的对应无法很好的用JSON序列化,这里我只能采用笨拙的截图的方式展示喽(下面配置不生效哦,若配置了信号量,那么ThreadPoolConfig这一栏就为null了):
# hystrix.command.default.execution.isolation.strategy = SEMAPHORE
# hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests = 2
累计统计流的数据如下:
[0,2,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0]
对数字解释如下:
- 2:index=1,对应事件为
HystrixEventType.SUCCESS
- 1:index=2,对应事件为
HystrixEventType.FAILURE
- 1:index=9,对应事件为
HystrixEventType.FALLBACK_SUCCESS
如果你愿意,你还可以自行模拟出
TIMEOUT
超时、FALLBACK_FAILURE
回滚失败等等情况,建议亲可试试,以加深理解。
最大并发流输出的数据是1
,因为很明显1秒内最多才一个请求嘛~
HealthCountsStream
健康信息汇总:
{"errorCount":1,"errorPercentage":33,"totalRequests":3}
一共3个请求,失败了一个,所以错误率是33%
。
说明:因为
HealthCountsStream
它默认是500ms照一次快照,所以此处它会打印10次(共5s嘛)
总结
到此,关于Netflix Hystrix
指标收集,以及转换为Stream流式的实现已经全部讲述完成了。最后用一张大佬手绘图对此作出总结:
声明
原创不易,码字不易,多谢你的点赞、收藏、关注。把本文分享到你的朋友圈是被允许的,但拒绝抄袭
。你也可【左边扫码/或加wx:fsx641385712】邀请你加入我的 Java高工、架构师 系列群大家庭学习和交流。
- [享学Netflix] 一、Apache Commons Configuration:你身边的配置管理专家
- [享学Netflix] 二、Apache Commons Configuration事件监听机制及使用ReloadingStrategy实现热更新
- [享学Netflix] 三、Apache Commons Configuration2.x全新的事件-监听机制
- [享学Netflix] 四、Apache Commons Configuration2.x文件定位系统FileLocator和FileHandler
- [享学Netflix] 五、Apache Commons Configuration2.x别样的Builder模式:ConfigurationBuilder
- [享学Netflix] 六、Apache Commons Configuration2.x快速构建工具Parameters和Configurations
- [享学Netflix] 七、Apache Commons Configuration2.x如何实现文件热加载/热更新?
- [享学Netflix] 八、Apache Commons Configuration2.x相较于1.x使用上带来哪些差异?
- [享学Netflix] 九、Netflix Archaius配置管理库:初体验及基础API详解
- [享学Netflix] 十、Netflix Archaius对Commons Configuration核心API Configuration的扩展实现
- [享学Netflix] 十一、Netflix Archaius配置管理器ConfigurationManager和动态属性支持DynamicPropertySupport
- [享学Netflix] 十二、Netflix Archaius动态属性DynamicProperty原理详解(重要)
- [享学Netflix] 十三、Netflix Archaius属性抽象Property和PropertyWrapper详解
- [享学Netflix] 十四、Netflix Archaius如何对多环境、多区域、多云部署提供配置支持?
- [享学Netflix] 十五、Netflix Archaius和Spring Cloud的集成:spring-cloud-starter-netflix-archaius
- [享学Netflix] 十六、Netflix Hystrix断路器:初体验及RxJava简介
- [享学Netflix] 十七、Netflix Hystrix属性抽象以及和Archaius整合实现配置外部化、动态化
- [享学Netflix] 十八、Netflix Hystrix配置之:全局配置和实例配置
- [享学Netflix] 十九、Netflix Hystrix插件机制:SPI接口介绍和HystrixPlugins详解
- [享学Netflix] 二十、Netflix Hystrix跨线程传递数据解决方案:HystrixRequestContext
- [享学Netflix] 二十一、Netflix Hystrix指标数据收集(预热):滑动窗口算法(附代码示例)
- [享学Netflix] 二十二、Netflix Hystrix事件源与事件流:HystrixEvent和HystrixEventStream
- [享学Netflix] 二十三、Netflix Hystrix桶计数器:BucketedCounterStream
- [享学Netflix] 二十四、Netflix Hystrix在滑动窗口内统计:BucketedRollingCounterStream、HealthCountsStream
上一篇: 接口文档示例
下一篇: Prometheus直方图和摘要图