技术交流28群

服务热线

135-6963-3175

微信服务号

Sentinel原理-滑动窗口 更新时间 2019-6-16 浏览2045次

Sentinel原理-滑动窗口

最重要的一个Slot非StatisticSlot莫属,因为他做的事是其他所有的Slot的基础。包括各种限流,熔断的规则,都是基于StatisticSlot统计出来的结果进行规则校验的。

我们先来看一段非常熟悉的代码,就是StatisticSlot中的entry方法:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 触发下一个Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
        // 统计信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代码
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockedQps();
        // 省略部分代码
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代码
        throw e;
    }
}

StatisticSlot中就是做了三件事:

1.通过node中的当前的实时统计指标信息进行规则校验

2.如果通过了校验,则重新更新node中的实时指标数据

3.如果被block或出现了异常了,则重新更新node中block的指标或异常指标

可以很清晰的看到,所有的实时指标的统计都是在node中进行的。

DefaultNode和ClusterNode

//node.addPassRequest() //这里的node是一个 DefaultNode 实例

@Override
public void addPassRequest() {
    super.addPassRequest();
    this.clusterNode.addPassRequest();
}

其实都是执行的 StatisticNode 对象的 addPassRequest 方法

//每秒的实时统计信息,使用 ArrayMetric 实现,即基于滑动窗口实现,默认1s 采样 2次。
//即一个统计周期中包含两个滑动窗口。
private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL);
//每分钟实时统计信息,同样使用 ArrayMetric 实现,即基于滑动窗口实现。每1分钟,抽样60次。
//即包含60个滑动窗口,每一个窗口的时间间隔为 1s 。
private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);
@Override
public void addPassRequest() {
    rollingCounterInSecond.addPass();
    rollingCounterInMinute.addPass();
}


Metric

从代码中我们可以看到,具体的增加pass指标是通过一个叫 Metric 的接口进行操作的,并且是通过 ArrayMetric 这种实现类,现在我们在进入 ArrayMetric 中看一下。

public class ArrayMetric implements Metric {
    /**
     * 用来存储各个窗口的数据
     */
    private final LeapArray<MetricBucket> data;
    /**
     *
     * @param sampleCount 在一个采集间隔中抽样的个数,默认为 2,
     *                    例如当 intervalInMs = 1000时,抽象两次,则一个采集间隔中会包含两个相等的区间,
     *                    一个区间就是滑动窗口。
     * @param intervalInMs 表示一个采集的时间间隔,例如1秒,1分钟。
     */
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    /**
     *
     * @param sampleCount
     * @param intervalInMs
     * @param enableOccupy 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,
     *                     这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray,
     *                     否则为 BucketLeapArray。
     */
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
 .....
}

wrap.value().addPass() 是执行的 wrap 对象所包装的 MetricBucket 对象的 addPass 方法,这里就是最终的增加qps中q的值的地方了。进入 MetricBucket 类中看一下,具体的代码如下:

public class MetricBucket {
    private final LongAdder[] counters;
    private volatile long minRt;
    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }
    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }
    ...
}

再看下枚举类MetricEvent:

public enum MetricEvent {
    /**
     * Normal pass.通过数量
     */
    PASS,
    /**
     * Normal block.阻塞数量
     */
    BLOCK,
    /**
     * 异常数量
     */
    EXCEPTION,
    /**
     * 成功数量
     */
    SUCCESS,
    /**
     * 响应时间
     */
    RT,
    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS
}

LongAdder:

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;
    /**
     * Version of plus for use in retryUpdate
     */
    @Override
    final long fn(long v, long x) { return v + x; }
    /**
     * Creates a new adder with initial sum of zero.
     */
    public LongAdder() {
    }
    /**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as;
        long b, v;
        HashCode hc;
        Cell a;
        int n;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            int h = (hc = threadHashCode.get()).code;
            if (as == null || (n = as.length) < 1 ||
                (a = as[(n - 1) & h]) == null ||
                !(uncontended = a.cas(v = a.value, v + x))) { retryUpdate(x, hc, uncontended); }
        }
    }
    /**
     * Equivalent to {@code add(1)}.
     */
    public void increment() {
        add(1L);
    }
    ....
}

所以是通过LongAdder 来保存各种指标的值的。

接下来进行data.currentWindow()分析:

参数为当前时间戳

public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        /**
         * 计算当前时间会落在一个采集间隔 ( LeapArray ) 中哪一个时间窗口中,
         * 即在 LeapArray 中属性 AtomicReferenceArray > array 的下标
         */
        // idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引
        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        /**
         * 计算当前时间戳所在的时间窗口的开始时间,即要计算出 WindowWrap 中 windowStart 的值,
         * 其实就是要算出小于当前时间戳,
         * 并且是 windowLengthInMs 的整数倍最大的数字,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )。
         */
        long windowStart = calculateWindowStart(timeMillis);
        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        //死循环查找当前的时间窗口,这里之所有需要循环,是因为可能多个线程都在获取当前时间窗口。
        while (true) {
            // 从采样数组中根据索引获取缓存的时间窗口
            WindowWrap<T> old = array.get(idx);
            // array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 */
               // 如果没有获取到,则创建一个新的
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                /**
                // 通过CAS将新窗口设置到数组中去
                 * 这里使用了 CAS 机制来更新 LeapArray 数组中的 元素,因为同一时间戳,可能有多个线程都在获取当前时间窗口对象,
                 * 但该时间窗口对象还未创建,这里就是避免创建多个,导致统计数据被覆盖,
                 * 如果用 CAS 更新成功的线程,则返回新建好的 WindowWrap ,CAS 设置不成功的线程继续跑这个流程,然后会进入到代码。
                 */
                if (array.compareAndSet(idx, null, window)) {
                     // 如果能设置成功,则将该窗口返回
                    return window;
                } else {
                    // 否则当前线程让出时间片,等待
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            // 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口
            } else if (windowStart == old.windowStart()) {
                /**
                 * 如果指定索引下的时间窗口对象不为空并判断起始时间相等,则返回。
                 */
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            // 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口
            // 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动
            } else if (windowStart > old.windowStart()) {
                /**
                 * 如果原先存在的窗口开始时间小于当前时间戳计算出来的开始时间,则表示 bucket 已被弃用。
                 * 则需要将开始时间重置到新时间戳对应的开始时间戳
                 */
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            // 这个条件不可能存在
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    /**
     * 首先用当前时间除以一个时间窗口的时间间隔,得出当前时间是多少个时间窗口的倍数,用 n 表示。
     * 然后我们可以看出从一系列时间窗口,从 0 开始,一起向前滚动 n 隔得到当前时间戳代表的时间窗口的位置。
     * 现在我们要定位到这个时间窗口的位置是落在 LeapArray 中数组的下标,
     * 而一个 LeapArray 中包含 sampleCount 个元素,要得到其下标,则使用 n % sampleCount 即可。
     * @param timeMillis
     * @return
     */
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
       // time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        //上一步的结果可能比窗口个数大或者是窗口倍数,所以要进行求余计算落在了哪个窗口里
        return (int)(timeId % array.length());
    }

1.根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx

2.根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位

3.根据索引idx,在采样窗口数组中取得一个时间窗口old

4.循环判断知道获取到一个当前时间窗口

4.1.如果old为空,则创建一个时间窗口,并将它插入到array的第idx个位置,array上面已经分析过了,是一个 AtomicReferenceArray

4.2.如果当前窗口的开始时间time与old的开始时间相等,那么说明old就是当前时间窗口,直接返回old

4.3.如果当前窗口的开始时间time大于old的开始时间,则说明old窗口已经过时了,将old的开始时间更新为最新值:time,下个循环中会在步骤4.2中返回

4.4.如果当前窗口的开始时间time小于old的开始时间,实际上这种情况是不可能存在的,因为time是当前时间,old是过去的一个时间

另外timeId是会随着时间的增长而增加,当前时间每增长一个windowLength的长度,timeId就加1。但是idx不会增长,只会在0和1之间变换,因为array数组的长度是2,只有两个采样时间窗口。至于为什么默认只有两个采样窗口,个人觉得因为sentinel是比较轻量的框架。时间窗口中保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多内存,另一方面时间窗口过多就意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动。


看图理解

1608112491646.jpg

初始的时候arrays数组中只有一个窗口(可能是第一个,也可能是第二个),每个时间窗口的长度是500ms,这就意味着只要当前时间与时间窗口的差值在500ms之内,时间窗口就不会向前滑动。例如,假如当前时间走到300或者500时,当前时间窗口仍然是相同的那个:

1608112491043.jpg

时间继续往前走,当超过500ms时,时间窗口就会向前滑动到下一个,这时就会更新当前窗口的开始时间:

1608112491307.jpg

时间继续往前走,只要不超过1000ms,则当前窗口不会发生变化:

1608112491496.jpg

当时间继续往前走,当前时间超过1000ms时,就会再次进入下一个时间窗口,此时arrays数组中的窗口将会有一个失效,会有另一个新的窗口进行替换:

1608112490610.jpg

以此类推随着时间的流逝,时间窗口也在发生变化,在当前时间点中进入的请求,会被统计到当前时间对应的时间窗口中。计算qps时,会用当前采样的时间窗口中对应的指标统计值除以时间间隔,就是具体的qps。