欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)

程序员文章站 2022-03-15 17:31:44
...

让人迷茫的原因只有一个:你本该拼搏的年纪,却想得太多,做得太少。

–> 返回专栏总目录 <–
代码下载地址:https://github.com/f641385712/netflix-learning

前言

上篇文章 介绍了Hystrix的“主流”:在滑动窗口内统计流、健康流。既然Hystrix的指标数据收集是基于事件驱动,那么自然可以多一些监听流,那么本文将做个收尾,对Hystrix内置的累计统计流、分发流、最大并发流…等等分别做介绍,让小伙伴们能对这种模式有个更深的理解,后面介绍的Hystrix各维度的监控都基于它们扩展出来的哦。

Hystrix已经内置了对事件监听时各种流的实现,大多数数情况下无需自己来扩展实现的,当然若你着实要和第三方监控平台深度集成,那么你也可以自定义收集方式。


正文

累计统计流 BucketedCumulativeCounterStream

它和BucketedRollingCounterStream的区别是:它在减桶的过程中,持续/无限累积计数

public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {

    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);


    protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

        this.sourceStream = bucketedStream
                .scan(getEmptyOutputValue(), reduceBucket) // 这是最大的区别,使用scan 一直扫描
                .skip(numBuckets)
                .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
                .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
                .share() 
                .onBackpressureDrop(); // 背压:多余的直接弃掉
    }

	// 实现父类方法
    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }
}

最大的区别就是对bucketedStream的处理上,滑动窗口使用的是window + flatMap,而本处使用的是scan,代表着持续/无限累积计数。它有如下实现类:

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)


CumulativeCommandEventCounterStream

CumulativeThreadPoolEventCounterStream

CumulativeCollapserEventCounterStream

以上实现类源码此处均不展示,是因为完全和BucketedRollingCounterStream体系的实现一模一样,请参照上篇文章即可。唯一的不同在父类:一个只统计指定窗口,一个持续不断的累计统计。


浅谈metrics指标释意

监控是大型分布式系统的必备系统,它们的数据均来自一些指标信息。收集指标信息的库有很多,其中比较出名的有metrics-core,它可以把收集到的信息提供给Meter、Histogram、Gauge...等度量工具使用,从而可以画出如下美图:

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)

当然本文并不讲述metrics-core如何用,而是以一段指标值为例,稍加解释:

{
    "version":"3.0.0",
    "timers":{
        "count":0,
        "max":0,
        "mean":0,
        "min":0,
        "p50":0,
        "p75":0,
        "p95":0,
        "p98":0,
        "p99":0,
        "p999":0,
        "stddev":0,
        "m15_rate":0,
        "m1_rate":0,
        "m5_rate":0,
        "mean_rate":0,
        "duration_units":"seconds",
        "rate_units":"calls/second"
    }
}

这个指标还是比较详细的,里面会有mean、p75、p99… 这个其实是很关键的数据指标。这些指标主要用于给你设置超时时间提供极有力的参考,如果你每每设置超时时间参考的是RT值是mean平均值,那你和瞎蒙没啥区别。

另外到底参考那个值,要看你的系统的整体量级,以及需要满足几个9,比如要满足三个9,那么超时时间是需要谨慎的。


分位数p50、p95、p999代表什么意思?

count/max/min/mean这些都不用解释,其它主要关心:

  • p50:也叫中位数(注意中位数不是平均数)。它表示把数据总数分为上下两等分,中间的那个数值
  • 分位数:分位数是将总体的全部数据按从小到大顺序排列后,处于各等分位置的变量值。
    • p95 = 10ms:代表95%的响应时间不大于10ms
    • p99、p999:含义同上

p表示:percent 百分比。

  • m15_rate:15分钟内。请求数/每秒的比率
  • m1_rate:1分钟内…
  • mean_rate: 平均每秒请求数(平均QPS,意义不大)
  • rate_units:calls/second” 比率单位,这里表示每秒钟请求数

对于监控指标来说,一般来说平均值几乎没有意义,而分位数一般是重点关注的值。


分布流 RollingDistributionStream

在指定时间窗口内分布流。说到分布,所以和统计、画图有关。。。

public class RollingDistributionStream<Event extends HystrixEvent> { 
	... 
	// 订阅者可以订阅消费消息,得到各种分位数,都存在CachedValuesHistogram里呢
    public Observable<CachedValuesHistogram> observe() {
        return rollingDistributionStream;
    }
}

虽然它不是抽象类,但它也没标明具体监听哪种事件,使用什么数据流HystrixEventStream。总之最终它会监听一个消息流(比如HystrixCommandStartStream它吧),然后通过RxJava的window操作符对一段时间内的数值进行运算操作,生成统计值放在Histogram对象中,然后重新发射

它内部集成使用度量工具org.HdrHistogram.Histogram来统计分析指标数据,并且给出非常详细的分位数数据(最高达四个9 -> p9999)。它还有如下子类:

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)


RollingCommandUserLatencyDistributionStream

LatencyDistribution:延迟发布。延迟的值来自于:调用方线程提交请求和响应可见之间的时间间隔executionResult.getUserThreadLatency()


RollingCommandLatencyDistributionStream

延迟发布。和上的区别是延迟时间来自于:executionResult.getExecutionLatency()表示:time spent in run() method

它们俩监听的均是HystrixCommandCompletionStream数据流~


RollingCollapserBatchSizeDistributionStream

监听了HystrixCollapserEventStream消息流,并且监听窗口期内ADDED_TO_BATCH消息类型次数,通过Histogram计算后再发射出去。


最大并发流 RollingConcurrencyStream

它用于对最大并发进行统计:对一段时间内的执行并发量取最大值,如Command/ThreadPool的最大并发数。

// 竟然泛型都木有,干净利落
public abstract class RollingConcurrencyStream {
}

它监听的是HystrixCommandExecutionStarted事件,它会发送并发数过来,从而便可获得event.getCurrentConcurrency()对比每个桶(一个桶代表1s),最后取出最大值Math.max(a, b)


RollingCommandMaxConcurrencyStream

RollingThreadPoolMaxConcurrencyStream

因为监听HystrixCommandExecutionStarted事件的有两种事件流:command的和ThreadPool的,所以必须用两个类来表示。它俩除了关心的事件不一样,其它都一样~


配置流 HystrixConfigurationStream

这个类对当前的Hystrix配置进行采样,并将其作为流公开。

public class HystrixConfigurationStream {

	...
	private final Observable<HystrixConfiguration> allConfigurationStream;
	...
    public Observable<HystrixConfiguration> observe() {
        return allConfigurationStream;
    }
    
    // 当然还可以当读监控某一类配置
    public Observable<Map<HystrixCommandKey, HystrixCommandConfiguration>> observeCommandConfiguration() {
        return allConfigurationStream.map(getOnlyCommandConfig);
    }
    public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> observeThreadPoolConfiguration() {
        return allConfigurationStream.map(getOnlyThreadPoolConfig);
    }
    public Observable<Map<HystrixCollapserKey, HystrixCollapserConfiguration>> observeCollapserConfiguration() {
        return allConfigurationStream.map(getOnlyCollapserConfig);
    }
    ...
}

可使用hystrix.stream.config.intervalInMilliseconds = 5000来配置多长时间采样一次,默认5000ms也就是5秒采样一次。另外com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用该流来获取配置信息的。


功能流 HystrixUtilizationStream

Utilization:使用、利用(使用率、利用率)。这个类对当前Hystrix资源的利用情况进行采样,并将其公开为流。

public class HystrixUtilizationStream {

	// HystrixUtilization就是最终的数据结构格式,下面给使用示例
	private final Observable<HystrixUtilization> allUtilizationStream;
	...
    public Observable<HystrixUtilization> observe() {
        return allUtilizationStream;
    }

    public Observable<Map<HystrixCommandKey, HystrixCommandUtilization>> observeCommandUtilization() {
        return allUtilizationStream.map(getOnlyCommandUtilization);
    }
    public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>> observeThreadPoolUtilization() {
        return allUtilizationStream.map(getOnlyThreadPoolUtilization);
    }
}

可使用hystrix.stream.utilization.intervalInMilliseconds = 500来配置多长时间采样一次,默认500ms采样一次。另外com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet就是用该流来获取资源利用信息的。


使用示例

public class CommandHelloWorld extends HystrixCommand<String> {
    private final String name;

    // 指定一个HystrixCommandGroupKey,这样熔断策略会按照此组执行
    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("MyAppGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        if(name == null){
            throw new NullPointerException();
        }
        return "Hello " + name + "!";
    }

    @Override
    protected String getFallback() {
        // super.getFallback():No fallback available.
        return "this is fallback msg";
    }
}
private static final String toJsonString(Object obj) {
    ObjectMapper mapper = new ObjectMapper();
    try {
        return mapper.writeValueAsString(obj);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}


@Test
public void fun1() throws InterruptedException {
    // 查看command、线程池的使用情况
    HystrixUtilizationStream utilizationStream = HystrixUtilizationStream.getInstance();
    // utilizationStream.observeThreadPoolUtilization()
    utilizationStream.observe().subscribe(d -> System.out.println(toJsonString(d)));

    // 查看配置情况
    HystrixConfigurationStream configStream = HystrixConfigurationStream.getInstance();
    configStream.observe().subscribe(d -> {
        System.out.println(d);
    });

    // 累计统计流
    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
    HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());
    CumulativeCommandEventCounterStream counterStream = CumulativeCommandEventCounterStream.getInstance(commandKey, properties);
    counterStream.observe().subscribe(d -> System.out.println(toJsonString(d)));

    // 最大并发流
    RollingCommandMaxConcurrencyStream concurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(commandKey, properties);
    concurrencyStream.observe().subscribe(d -> System.out.println(toJsonString(d)));

    // 发送事件(发送多次)
    CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.execute();

    helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.queue();

    // 走fallabck
    helloWorld = new CommandHelloWorld(null);
    helloWorld.queue();


    // 因为配置5秒钟才能打印一次
    TimeUnit.SECONDS.sleep(5);

}

运行程序,控制台输出:

资源利用情况:

{
    "commandUtilizationMap":{
        "CommandHelloWorld":{
            "concurrentCommandCount":0
        }
    },
    "threadPoolUtilizationMap":{
        "MyAppGroup":{
            "currentActiveCount":0,
            "currentCorePoolSize":10,
            "currentPoolSize":3,
            "currentQueueSize":0
        }
    }
}

这是功能流的数据,最明显的是启动了三次任务,线程池大小目前是3。另外,因为配置流中的对应无法很好的用JSON序列化,这里我只能采用笨拙的截图的方式展示喽(下面配置不生效哦,若配置了信号量,那么ThreadPoolConfig这一栏就为null了):

# hystrix.command.default.execution.isolation.strategy = SEMAPHORE
# hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests = 2

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)
[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)
累计统计流的数据如下:

[0,2,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0]

对数字解释如下:

  • 2:index=1,对应事件为HystrixEventType.SUCCESS
  • 1:index=2,对应事件为HystrixEventType.FAILURE
  • 1:index=9,对应事件为HystrixEventType.FALLBACK_SUCCESS

如果你愿意,你还可以自行模拟出TIMEOUT超时、FALLBACK_FAILURE回滚失败等等情况,建议亲可试试,以加深理解。

最大并发流输出的数据是1,因为很明显1秒内最多才一个请求嘛~

HealthCountsStream健康信息汇总:

{"errorCount":1,"errorPercentage":33,"totalRequests":3}

一共3个请求,失败了一个,所以错误率是33%

说明:因为HealthCountsStream它默认是500ms照一次快照,所以此处它会打印10次(共5s嘛)


总结

到此,关于Netflix Hystrix指标收集,以及转换为Stream流式的实现已经全部讲述完成了。最后用一张大佬手绘图对此作出总结:

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)

[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)

声明

原创不易,码字不易,多谢你的点赞、收藏、关注。把本文分享到你的朋友圈是被允许的,但拒绝抄袭。你也可【左边扫码/或加wx:fsx641385712】邀请你加入我的 Java高工、架构师 系列群大家庭学习和交流。
[享学Netflix] 二十五、Netflix Hystrix累计统计流、分发流、最大并发流、配置流、功能流(附代码示例)

相关标签: 享学Netflix