@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { try { // 触发下一个Slot的entry方法 fireEntry(context, resourceWrapper, node, count, args); // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级 // 统计信息 node.increaseThreadNum(); node.addPassRequest(); // 省略部分代码 } catch (BlockException e) { context.getCurEntry().setError(e); // Add block count. node.increaseBlockedQps(); // 省略部分代码 throw e; } catch (Throwable e) { context.getCurEntry().setError(e); // Should not happen node.increaseExceptionQps(); // 省略部分代码 throw e; } }
//node.addPassRequest() //这里的node是一个 DefaultNode 实例
@Override public void addPassRequest() { super.addPassRequest(); this.clusterNode.addPassRequest(); }
其实都是执行的 StatisticNode 对象的 addPassRequest 方法
//每秒的实时统计信息,使用 ArrayMetric 实现,即基于滑动窗口实现,默认1s 采样 2次。 //即一个统计周期中包含两个滑动窗口。 private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL); //每分钟实时统计信息,同样使用 ArrayMetric 实现,即基于滑动窗口实现。每1分钟,抽样60次。 //即包含60个滑动窗口,每一个窗口的时间间隔为 1s 。 private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60); @Override public void addPassRequest() { rollingCounterInSecond.addPass(); rollingCounterInMinute.addPass(); }
从代码中我们可以看到,具体的增加pass指标是通过一个叫 Metric 的接口进行操作的,并且是通过 ArrayMetric 这种实现类,现在我们在进入 ArrayMetric 中看一下。
public class ArrayMetric implements Metric { /** * 用来存储各个窗口的数据 */ private final LeapArray<MetricBucket> data; /** * * @param sampleCount 在一个采集间隔中抽样的个数,默认为 2, * 例如当 intervalInMs = 1000时,抽象两次,则一个采集间隔中会包含两个相等的区间, * 一个区间就是滑动窗口。 * @param intervalInMs 表示一个采集的时间间隔,例如1秒,1分钟。 */ public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } /** * * @param sampleCount * @param intervalInMs * @param enableOccupy 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量, * 这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray, * 否则为 BucketLeapArray。 */ public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } @Override public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); } ..... }
wrap.value().addPass() 是执行的 wrap 对象所包装的 MetricBucket 对象的 addPass 方法,这里就是最终的增加qps中q的值的地方了。进入 MetricBucket 类中看一下,具体的代码如下:
public class MetricBucket { private final LongAdder[] counters; private volatile long minRt; public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } public void addPass(int n) { add(MetricEvent.PASS, n); } public MetricBucket add(MetricEvent event, long n) { counters[event.ordinal()].add(n); return this; } ... }
public enum MetricEvent { /** * Normal pass.通过数量 */ PASS, /** * Normal block.阻塞数量 */ BLOCK, /** * 异常数量 */ EXCEPTION, /** * 成功数量 */ SUCCESS, /** * 响应时间 */ RT, /** * Passed in future quota (pre-occupied, since 1.5.0). */ OCCUPIED_PASS }
public class LongAdder extends Striped64 implements Serializable { private static final long serialVersionUID = 7249069246863182397L; /** * Version of plus for use in retryUpdate */ @Override final long fn(long v, long x) { return v + x; } /** * Creates a new adder with initial sum of zero. */ public LongAdder() { } /** * Adds the given value. * * @param x the value to add */ public void add(long x) { Cell[] as; long b, v; HashCode hc; Cell a; int n; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; int h = (hc = threadHashCode.get()).code; if (as == null || (n = as.length) < 1 || (a = as[(n - 1) & h]) == null || !(uncontended = a.cas(v = a.value, v + x))) { retryUpdate(x, hc, uncontended); } } } /** * Equivalent to {@code add(1)}. */ public void increment() { add(1L); } .... }
所以是通过LongAdder 来保存各种指标的值的。
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } /** * 计算当前时间会落在一个采集间隔 ( LeapArray ) 中哪一个时间窗口中, * 即在 LeapArray 中属性 AtomicReferenceArray > array 的下标 */ // idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. /** * 计算当前时间戳所在的时间窗口的开始时间,即要计算出 WindowWrap 中 windowStart 的值, * 其实就是要算出小于当前时间戳, * 并且是 windowLengthInMs 的整数倍最大的数字,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )。 */ long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ //死循环查找当前的时间窗口,这里之所有需要循环,是因为可能多个线程都在获取当前时间窗口。 while (true) { // 从采样数组中根据索引获取缓存的时间窗口 WindowWrap<T> old = array.get(idx); // array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象 if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 */ // 如果没有获取到,则创建一个新的 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); /** // 通过CAS将新窗口设置到数组中去 * 这里使用了 CAS 机制来更新 LeapArray 数组中的 元素,因为同一时间戳,可能有多个线程都在获取当前时间窗口对象, * 但该时间窗口对象还未创建,这里就是避免创建多个,导致统计数据被覆盖, * 如果用 CAS 更新成功的线程,则返回新建好的 WindowWrap ,CAS 设置不成功的线程继续跑这个流程,然后会进入到代码。 */ if (array.compareAndSet(idx, null, window)) { // 如果能设置成功,则将该窗口返回 return window; } else { // 否则当前线程让出时间片,等待 // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口 } else if (windowStart == old.windowStart()) { /** * 如果指定索引下的时间窗口对象不为空并判断起始时间相等,则返回。 */ /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; // 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口 // 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动 } else if (windowStart > old.windowStart()) { /** * 如果原先存在的窗口开始时间小于当前时间戳计算出来的开始时间,则表示 bucket 已被弃用。 * 则需要将开始时间重置到新时间戳对应的开始时间戳 */ /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 这个条件不可能存在 } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } /** * 首先用当前时间除以一个时间窗口的时间间隔,得出当前时间是多少个时间窗口的倍数,用 n 表示。 * 然后我们可以看出从一系列时间窗口,从 0 开始,一起向前滚动 n 隔得到当前时间戳代表的时间窗口的位置。 * 现在我们要定位到这个时间窗口的位置是落在 LeapArray 中数组的下标, * 而一个 LeapArray 中包含 sampleCount 个元素,要得到其下标,则使用 n % sampleCount 即可。 * @param timeMillis * @return */ private int calculateTimeIdx(/*@Valid*/ long timeMillis) { // time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个 long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. //上一步的结果可能比窗口个数大或者是窗口倍数,所以要进行求余计算落在了哪个窗口里 return (int)(timeId % array.length()); }
4.1.如果old为空,则创建一个时间窗口,并将它插入到array的第idx个位置,array上面已经分析过了,是一个 AtomicReferenceArray