随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

%title插图%num

作者 | 向寒 / 孙玄

来源 | 架构之美

头图 | 下载于视觉中国

%title插图%num

关于 Sentinel 

1、理论篇

以下是经过多年分布式经验总结的两个理论基础:

(1)微服务与治理的关系

%title插图%num

(2)爬坡理论

%title插图%num

我们今天的主题分为以下两个主要部分:

  • Sentinel设计原理
  • Sentinel运行流程源码剖析

%title插图%num

Sentinel 设计原理

1、特性

丰富的应用场景:阿里 10 年双十一积累场景,含秒杀、双十一零点持续洪峰、热点商品探测、预热、消息队列削峰填谷、集群流量控制、实时熔断下游不可用应用等多样化的场景。

广泛的开源生态:提供开箱即用的与其它开源框架/库的整合模块,如Dubbo、Spring Cloud、gRPC、Zuul、Reactor 等。

完善的 SPI 扩展点:提供简单易用、完善的 SPI 扩展接口;可通过实现扩展接口来快速地定制逻辑。

完备的实时监控:提供实时的监控功能,可看到接入应用的单台机器秒级数据,及500 台以下规模的集群汇总运行情况。

2、核心关键点

(1)资源:限流的对象

如下代码/user/select即为一个资源:

  1. 1@GetMapping(“/user/select”)
  2. 2
  3. 3@SentinelResource(value = “select”, blockHandler = “exceptionHandler”)
  4. 4
  5. 5public TUser select(@RequestParam Integer userId) {
  6. 6
  7. 7    log.info(“post /user/select userid=” + userId);
  8. 8
  9. 9    return userService.select(userId);
  10. 10
  11. 11}

即被SentinelResource注解修饰的API:

  1. 1@Target({ElementType.METHOD, ElementType.TYPE})
  2. 2
  3. 3@Retention(RetentionPolicy.RUNTIME)
  4. 4
  5. 5@Inherited
  6. 6
  7. 7public @interface SentinelResource {
  8. 8
  9. 9    String value() default “”;
  10. 10
  11. 11
  12. 12
  13. 13    EntryType entryType() default EntryType.OUT;
  14. 14
  15. 15
  16. 16
  17. 17    int resourceType() default 0;
  18. 18
  19. 19
  20. 20
  21. 21    String blockHandler() default “”;
  22. 22
  23. 23
  24. 24
  25. 25    Class<?>[] blockHandlerClass() default {};
  26. 26
  27. 27
  28. 28
  29. 29    String fallback() default “”;
  30. 30
  31. 31.…..
  32. 32
  33. 33}

(2)入口:sentinel为每个资源创建一个Entry。

(3)槽链:每个Entry都会有一条用于记录限流以及各种控制的信息Slot chain,以此来实现下图中绿色部分的功能。

%title插图%num

%title插图%num

Sentinel 运行流程源码剖析

此图为官网全局流程图,接下来我们通过源码,分解该过程:

%title插图%num

1、入口处

  1. 1SphU.entry(“methodA”, EntryType.IN);//入口
  2. 2
  3. 3}

核心代码

  1. 1SphU#lookProcessChain(ResourceWrapper resourceWrapper)

2、入口逻辑

  1. 1private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object… args)
  2. 2
  3. 3    throws BlockException {
  4. 4
  5. 5    // 从threadLocal中获取当前线程对应的context实例。
  6. 6
  7. 7    Context context = ContextUtil.getContext();
  8. 8
  9. 9    if (context instanceof NullContext) {
  10. 10
  11. 11        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
  12. 12
  13. 13        // so here init the entry only. No rule checking will be done.
  14. 14
  15. 15        // 如果context是nullContext的实例,表示当前context的总数已经达到阈值,所以这里直接创建entry实例,并返回,不进行规则的检查。
  16. 16
  17. 17        return new CtEntry(resourceWrapper, null, context);
  18. 18
  19. 19    }
  20. 20
  21. 21
  22. 22
  23. 23    if (context == null) {
  24. 24
  25. 25        // Using default context.
  26. 26
  27. 27        //如果context为空,则使用默认的名字创建一个,就是外部在调用SphU.entry(..)方法前如果没有调用ContextUtil.enter(..),则这里会调用该方法进行内部初始化context
  28. 28
  29. 29        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
  30. 30
  31. 31    }
  32. 32
  33. 33
  34. 34
  35. 35    // Global switch is close, no rule checking will do.
  36. 36
  37. 37    // 总开关
  38. 38
  39. 39    if (!Constants.ON) {
  40. 40
  41. 41        return new CtEntry(resourceWrapper, null, context);
  42. 42
  43. 43    }
  44. 44
  45. 45
  46. 46
  47. 47    // 构造链路(核心实现) go in
  48. 48
  49. 49    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
  50. 50
  51. 51
  52. 52
  53. 53    /*
  54. 54
  55. 55     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
  56. 56
  57. 57     * so no rule checking will be done.
  58. 58
  59. 59     * 当链的大小达到阈值Constants.MAX_SLOT_CHAIN_SIZE时,不会校验任何规则,直接返回。
  60. 60
  61. 61     */
  62. 62
  63. 63    if (chain == null) {
  64. 64
  65. 65        return new CtEntry(resourceWrapper, null, context);
  66. 66
  67. 67    }
  68. 68
  69. 69
  70. 70
  71. 71    Entry e = new CtEntry(resourceWrapper, chain, context);
  72. 72
  73. 73    try {
  74. 74
  75. 75        // 开始进行链路调用。
  76. 76
  77. 77        chain.entry(context, resourceWrapper, null, count, prioritized, args);
  78. 78
  79. 79    } catch (BlockException e1) {
  80. 80
  81. 81        e.exit(count, args);
  82. 82
  83. 83        throw e1;
  84. 84
  85. 85    } catch (Throwable e1) {
  86. 86
  87. 87        // This should not happen, unless there are errors existing in Sentinel internal.
  88. 88
  89. 89        RecordLog.info(“Sentinel unexpected exception”, e1);
  90. 90
  91. 91    }
  92. 92
  93. 93    return e;
  94. 94
  95. 95}

3、上下文信息

Context

Context是当前线程所持有的Sentinel上下文。

进入Sentinel的逻辑时,会首先获取当前线程的Context,如果没有则新建。当任务执行完毕后,会清除当前线程的context。Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。

Context 维持着入口节点(entranceNode)、本次调用链路的 当前节点(curNode)、调用来源(origin)等信息。Context 名称即为调用链路入口名称。

Node

Node是对一个@SentinelResource标记的资源的统计包装。

Context中记录本当前线程资源调用的入口节点。

我们可以通过入口节点的childList,可以追溯资源的调用情况。而每个节点都对应一个@SentinelResource标记的资源及其统计数据,例如:passQps,blockQps,rt等数据。

Entry

Entry是Sentinel中用来表示是否通过限流的一个凭证,如果能正常返回,则说明你可以访问被Sentinel保护的后方服务,否则Sentinel会抛出一个BlockException。

另外,它保存了本次执行entry()方法的一些基本信息,包括资源的Context、Node、对应的责任链等信息,后续完成资源调用后,还需要更具获得的这个Entry去执行一些善后操作,包括退出Entry对应的责任链,完成节点的一些统计信息更新,清除当前线程的Context信息等。

在构建Context时已经完成下图部分:

%title插图%num

4、核心流程

这里有两个需要注意的点:

  • ProcessorSlot chain = lookProcessChain(resourceWrapper); 构建链路。
  • chain.entry(context, resourceWrapper, null, count, prioritized, args); 进行链路调用首先来看链路是如何构建的。

5、获取槽链

  • 已有直接获取;
  • 没有去创建。
  1. 1       //在上下文中每一个资源都有各自的处理槽
  2. 2
  3. 3        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
  4. 4
  5. 5        // 双重检查锁保证线程安全
  6. 6
  7. 7        if (chain == null) {
  8. 8
  9. 9            synchronized (LOCK) {
  10. 10
  11. 11                chain = ch ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ainMap.get(resourceWrapper);
  12. 12
  13. 13                if (chain == null) {
  14. 14
  15. 15                    // Entry size limit.
  16. 16
  17. 17                    // 当链的长度达到阈值时,直接返回null,不进行规则的检查。
  18. 18
  19. 19                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
  20. 20
  21. 21                        return null;
  22. 22
  23. 23                    }
  24. 24
  25. 25                    // 构建链路 go in
  26. 26
  27. 27                    chain = SlotChainProvider.newSlotChain();
  28. 28
  29. 29                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
  30. 30
  31. 31                        chainMap.size() + 1);
  32. 32
  33. 33                    newMap.putAll(chainMap);
  34. 34
  35. 35                    newMap.put(resourceWrapper, chain);
  36. 36
  37. 37                    chainMap = newMap;
  38. 38
  39. 39                }
  40. 40
  41. 41            }
  42. 42
  43. 43        }
  44. 44
  45. 45        return chain;
  46. 46
  47. 47    }

6、创建槽链

SlotChainProvider.newSlotChain();

%title插图%num

  1. 1   // 基于spi扩展点机制来扩展,默认为DefaultSlotChainBuilder
  2. 2
  3. 3slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

7、SPI加载ProcessorSlot

这里采用了spi的机制来扩展SlotChainBuilder,默认是采用DefaultSlotChainBuilder来实现的,可以看到sentinel源码的sentinel-core包下,META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件下,默认属性是:

  1. 1  slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

所以默认采用DefaultSlotChainBuilder来构建链路,因此找到DefaultSlotChainBuilder.build()方法。

8、DefaultSlotChainBuilder

  1. 1public ProcessorSlotChain build() {
  2. 2
  3. 3        // 定义链路起点
  4. 4
  5. 5        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
  6. 6
  7. 7
  8. 8
  9. 9        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
  10. 10
  11. 11        // 基于spi扩展机制,加载ProcessorSlot的实现类,从META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件下获取,并且按指定顺序排序
  12. 12
  13. 13        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
  14. 14
  15. 15        // 遍历构建链路
  16. 16
  17. 17        for (ProcessorSlot slot : sortedSlotList) {
  18. 18
  19. 19            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
  20. 20
  21. 21                RecordLog.warn(“The ProcessorSlot(“ + slot.getClass().getCanonicalName() + “) is not an instance of AbstractLinkedProcessorSlot, can’t be added into ProcessorSlotChain”);
  22. 22
  23. 23                continue;
  24. 24
  25. 25            }
  26. 26
  27. 27            // 将slot节点加入链,因为已经排好序了,只需要加到*后即可
  28. 28
  29. 29            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
  30. 30
  31. 31        }
  32. 32
  33. 33
  34. 34
  35. 35        return chain;
  36. 36
  37. 37    }

9、遍历ProcessorSlots

这里也是通过spi的机制,读取文件META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot:

  1. 1# Sentinel default ProcessorSlots
  2. 2
  3. 3com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
  4. 4
  5. 5com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
  6. 6
  7. 7com.alibaba.csp.sentinel.slots.logger.LogSlot
  8. 8
  9. 9com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
  10. 10
  11. 11com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
  12. 12
  13. 13com.alibaba.csp.sentinel.slots.system.SystemSlot
  14. 14
  15. 15com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
  16. 16
  17. 17com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

从这里看出,链路由这些节点组成,而slot之间的顺序是根据每个slot节点的@SpiOrder注解的值来确定的。

NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot

%title插图%num

链路调用 

chain.entry(…)

上面已经构建好了链路,下面就要开始进行链路的调用了。

回到CtSph#entryWithPriority

1、NodeSelectorSlot

NodeSelectorSlot(@SpiOrder(-10000))

直接进入NodeSelectorSlot类的entry方法。

根据官方文档,NodeSelectorSlot类的作用为:

负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

  1. 1@Override
  2. 2
  3. 3public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object… args)
  4. 4
  5. 5    throws Throwable {
  6. 6
  7. 7
  8. 8
  9. 9    // 双重检查锁+缓存 机制
  10. 10
  11. 11    DefaultNode node = map.get(context.getName());
  12. 12
  13. 13    if (node == null) {
  14. 14
  15. 15        synchronized (this) {
  16. 16
  17. 17            node = map.get(context.getName());
  18. 18
  19. 19            if (node == null) {
  20. 20
  21. 21                node = new DefaultNode(resourceWrapper, null);
  22. 22
  23. 23                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
  24. 24
  25. 25                cacheMap.putAll(map);
  26. 26
  27. 27                cacheMap.put(context.getName(), node);
  28. 28
  29. 29                map = cacheMap;
  30. 30
  31. 31                // Build invocation tree
  32. 32
  33. 33                // 构建调用链的树形结构
  34. 34
  35. 35                ((DefaultNode) context.getLastNode()).addChild(node);
  36. 36
  37. 37            }
  38. 38
  39. 39
  40. 40
  41. 41        }
  42. 42
  43. 43    }
  44. 44
  45. 45
  46. 46
  47. 47    context.setCurNode(node);
  48. 48
  49. 49    // 进入下一个链
  50. 50
  51. 51    fireEntry(context, resourceWrapper, node, count, prioritized, args);
  52. 52
  53. 53}

2、ClusterBuilderSlot

ClusterBuilderSlot(@SpiOrder(-9000))

根据官方文档,ClusterBuilderSlot的作用为:

此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。

3、LogSlot

LogSlot(@SpiOrder(-8000))

该类对链路的传递不做处理,只有在抛出BlockException的时候,向上层层传递的过程中,会通过该类来输入一些日志信息:

  1. 1@Override
  2. 2
  3. 3public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object… args)
  4. 4
  5. 5    throws Throwable {
  6. 6
  7. 7    try {
  8. 8
  9. 9        fireEntry(context, resourceWrapper, obj, count, prioritized, args);
  10. 10
  11. 11    } catch (BlockException e) {
  12. 12
  13. 13        // 当抛出BlockException异常时,这里会输入日志信息
  14. 14
  15. 15        EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
  16. 16
  17. 17            context.getOrigin(), count);
  18. 18
  19. 19        throw e;
  20. 20
  21. 21    } catch (Throwable e) {
  22. 22
  23. 23        RecordLog.warn(“Unexpected entry exception”, e);
  24. 24
  25. 25    }
  26. 26
  27. 27}

4、StatisticSlot

StatisticSlot(@SpiOrder(-7000))

官方文档:

StatisticSlot用于记录、统计不同纬度的 runtime 指标监控信息。

  1. 1@Override
  2. 2
  3. 3public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
  4. 4
  5. 5                  boolean prioritized, Object… args) throws Throwable {
  6. 6
  7. 7    try {
  8. 8
  9. 9        // Do some checking.
  10. 10
  11. 11        // 先将调用链继续下去,等到后续链调用结束了,再执行下面的步骤
  12. 12
  13. 13        fireEntry(context, resourceWrapper, node, count, prioritized, args);
  14. 14
  15. 15
  16. 16
  17. 17        // Request passed, add thread count and pass count.
  18. 18
  19. 19        node.increaseThreadNum();
  20. 20
  21. 21        node.addPassRequest(count);
  22. 22
  23. 23
  24. 24
  25. 25        if (context.getCurEntry().getOriginNode() != null) {
  26. 26
  27. 27            // Add count for origin node.
  28. 28
  29. 29            context.getCurEntry().getOriginNode().increaseThreadNum();
  30. 30
  31. 31            context.getCurEntry().getOriginNode().addPassRequest(count);
  32. 32
  33. 33        }
  34. 34
  35. 35
  36. 36
  37. 37        if (resourceWrapper.getEntryType() == EntryType.IN) {
  38. 38
  39. 39            // Add count for global inbound entry node for global statistics.
  40. 40
  41. 41            Constants.ENTRY_NODE.increaseThreadNum();
  42. 42
  43. 43            Constants.ENTRY_NODE.addPassRequest(count);
  44. 44
  45. 45        }
  46. 46
  47. 47
  48. 48
  49. 49        // Handle pass event with registered entry callback handlers.
  50. 50
  51. 51        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
  52. 52
  53. 53            handler.onPass(context, resourceWrapper, node, count, args);
  54. 54
  55. 55        }
  56. 56
  57. 57    } catch (PriorityWaitException ex) {
  58. 58
  59. 59        node.increaseThreadNum();
  60. 60
  61. 61        if (context.getCurEntry().getOriginNode() != null) {
  62. 62
  63. 63            // Add count for origin node.
  64. 64
  65. 65            context.getCurEntry().getOriginNode().increaseThreadNum();
  66. 66
  67. 67        }
  68. 68
  69. 69
  70. 70
  71. 71        if (resourceWrapper.getEntryType() == EntryType.IN) {
  72. 72
  73. 73            // Add count for global inbound entry node for global statistics.
  74. 74
  75. 75            Constants.ENTRY_NODE.increaseThreadNum();
  76. 76
  77. 77        }
  78. 78
  79. 79        // Handle pass event with registered entry callback handlers.
  80. 80
  81. 81        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
  82. 82
  83. 83            handler.onPass(context, resourceWrapper, node, count, args);
  84. 84
  85. 85        }
  86. 86
  87. 87    } catch (BlockException e) {
  88. 88
  89. 89        // Blocked, set block exception to current entry.
  90. 90
  91. 91        context.getCurEntry().setBlockError(e);
  92. 92
  93. 93
  94. 94
  95. 95        // Add block count.
  96. 96
  97. 97        node.increaseBlockQps(count);
  98. 98
  99. 99        if (context.getCurEntry().getOriginNode() != null) {
  100. 100
  101. 101            context.getCurEntry().getOriginNode().increaseBlockQps(count);
  102. 102
  103. 103        }
  104. 104
  105. 105
  106. 106
  107. 107        if (resourceWrapper.getEntryType() == EntryType.IN) {
  108. 108
  109. 109            // Add count for global inbound entry node for global statistics.
  110. 110
  111. 111            Constants.ENTRY_NODE.increaseBlockQps(count);
  112. 112
  113. 113        }
  114. 114
  115. 115
  116. 116
  117. 117        // Handle block event with registered entry callback handlers.
  118. 118
  119. 119        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
  120. 120
  121. 121            handler.onBlocked(e, context, resourceWrapper, node, count, args);
  122. 122
  123. 123        }
  124. 124
  125. 125
  126. 126
  127. 127        throw e;
  128. 128
  129. 129    } catch (Throwable e) {
  130. 130
  131. 131        // Unexpected internal error, set error to current entry.
  132. 132
  133. 133        context.getCurEntry().setError(e);
  134. 134
  135. 135
  136. 136
  137. 137        throw e;
  138. 138
  139. 139    }
  140. 140
  141. 141}

StatisticSlot 会先将链往下执行,等到后面的节点全部执行完毕,再进行数据统计。

5、AuthoritySlot

@SpiOrder(-6000)

AuthoritySlot

官方文档:

AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制

  1. 1@Override
  2. 2
  3. 3public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object… args)
  4. 4
  5. 5    throws Throwable {
  6. 6
  7. 7    // 黑白名单权限控制
  8. 8
  9. 9    checkBlackWhiteAuthority(resourceWrapper, context);
  10. 10
  11. 11    fireEntry(context, resourceWrapper, node, count, prioritized, args);
  12. 12
  13. 13}
  14. 14
  15. 15
  16. 16
  17. 17void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
  18. 18
  19. 19    Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
  20. 20
  21. 21
  22. 22
  23. 23    if (authorityRules == null) {
  24. 24
  25. 25        return;
  26. 26
  27. 27    }
  28. 28
  29. 29
  30. 30
  31. 31    Set<AuthorityRule> rules = authorityRules.get(resource.getName());
  32. 32
  33. 33    if (rules == null) {
  34. 34
  35. 35        return;
  36. 36
  37. 37    }
  38. 38
  39. 39
  40. 40
  41. 41    for (AuthorityRule rule : rules) {
  42. 42
  43. 43        if (!AuthorityRuleChecker.passCheck(rule, context)) {
  44. 44
  45. 45            throw new AuthorityException(context.getOrigin(), rule);
  46. 46
  47. 47        }
  48. 48
  49. 49    }
  50. 50
  51. 51}

6、SystemSlot

@SpiOrder(-5000)

SystemSlot

官方文档:

SystemSlot:这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。

  1. 1@Override
  2. 2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
  3. 3                 boolean prioritized, Object… args) throws Throwable {
  4. 4   // 系统规则校验
  5. 5   SystemRuleManager.checkSystem(resourceWrapper);
  6. 6   fireEntry(context, resourceWrapper, node, count, prioritized, args);
  7. 7}

7、FlowSlot 限流规则引擎

@SpiOrder(-2000)

FlowSlot

官方文档:

这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:

  • 指定应用生效的规则,即针对调用方限流的;
  • 调用方为 other 的规则;
  • 调用方为 default 的规则。

%title插图%num

入口

  1. 1@Override
  2. 2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
  3. 3                 boolean prioritized, Object… args) throws Throwable {
  4. 4   // 检查限流规则
  5. 5   checkFlow(resourceWrapper, context, node, count, prioritized);
  6. 6
  7. 7   fireEntry(context, resourceWrapper, node, count, prioritized, args);
  8. 8}
  9. 9
  10. 10void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
  11. 11   throws BlockException {
  12. 12   checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
  13. 13}

1、所有规则检查

调用了FlowRuleChecker.checkFlow(…)方法。

  1. 1public void checkFlow(Function<StringCollection<FlowRule>> ruleProviderResourceWrapper resource,
  2. 2                     Context contextDefaultNode nodeint countboolean prioritizedthrows BlockException {
  3. 3   if (ruleProvider == null || resource == null) {
  4. 4       return;
  5. 5  }
  6. 6   // 根据资源名称找到对应的
  7. 7   Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
  8. 8   if (rules != null) {
  9. 9       // 遍历规则,依次判断是否通过
  10. 10       for (FlowRule rule : rules) {
  11. 11           if (!canPassCheck(rule, context, node, count, prioritized)) {
  12. 12               throw new FlowException(rule.getLimitApp(), rule);
  13. 13          }
  14. 14      }
  15. 15  }
  16. 16}

2、单个规则检查

  1. 1public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
  2. 2                                               boolean prioritized) {
  3. 3   String limitApp = rule.getLimitApp();
  4. 4   if (limitApp == null) {
  5. 5       return true;
  6. 6  }
  7. 7   // 集群限流的判断
  8. 8   if (rule.isClusterMode()) {
  9. 9       return passClusterCheck(rule, context, node, acquireCount, prioritized);
  10. 10  }
  11. 11   // 本地节点的判断
  12. 12   return passLocalCheck(rule, context, node, acquireCount, prioritized);
  13. 13}

3、非集群模式的限流判断

  1. 1private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
  2. 2                                     boolean prioritized) {
  3. 3   // 根据请求的信息及策略,选择不同的node节点
  4. 4   Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
  5. 5   if (selectedNode == null) {
  6. 6       return true;
  7. 7  }
  8. 8   // 根据当前规则,获取规则控制器,调用canPass方法进行判断
  9. 9//       rule.getRater()放回的是TrafficShapingController接口的实现类,使用了策略模式,根据使用的控制措施来选择使用哪种实现。
  10. 10   return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
  11. 11}

这里是先根据请求和当前规则的策略,找到该规则下存储统计信息的节点,然后根据当前规则获取相应控制器,通过控制器的canPass(…)方法进行判断。

4、获取节点
  1. 1static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
  2. 2   // The limit app should not be empty.
  3. 3   String limitApp = rule.getLimitApp();
  4. 4   int strategy = rule.getStrategy();
  5. 5   String origin = context.getOrigin();
  6. 6
  7. 7   // 判断调用来源,这种情况下origin不能为default或other
  8. 8   if (limitApp.equals(origin) && filterOrigin(origin)) {
  9. 9       // 如果调用关系策略为STRATEGY_DIRECT,表示仅判断自己,则返回origin statistic node.
  10. 10       if (strategy == RuleConstant.STRATEGY_DIRECT) {
  11. 11           // Matches limit origin, return origin statistic node.
  12. 12           return context.getOriginNode();
  13. 13      }
  14. 14
  15. 15       // 采用调用来源进行判断的策略
  16. 16       return selectReferenceNode(rule, context, node);
  17. 17  } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // 如果调用来源为default默认的
  18. 18       if (strategy == RuleConstant.STRATEGY_DIRECT) { // 如果调用关系策略为STRATEGY_DIRECT,则返回clusterNode
  19. 19           // Return the cluster node.
  20. 20           return node.getClusterNode();
  21. 21      }
  22. 22
  23. 23       return selectReferenceNode(rule, context, node);
  24. 24  } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
  25. 25       && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // 如果调用来源为other,且调用来源不在限制规则内,为其他来源
  26. 26       if (strategy == RuleConstant.STRATEGY_DIRECT) {
  27. 27           return context.getOriginNode();
  28. 28      }
  29. 29       return selectReferenceNode(rule, context, node);
  30. 30  }
  31. 31   return null;
  32. 32}
5、流量整形控制器

rule.getRater()方法会返回一个控制器,接口为TrafficShapingController,该接口的实现类图如下:

%title插图%num

从类图可以看出,是很明显的策略模式,分别针对不同的限流控制策略。

1、默认策略

DefaultController该策略是sentinel的默认策略,如果请求超出阈值,则直接拒*请求。

  1. 1@Override
  2. 2public boolean canPass(Node node, int acquireCount, boolean prioritized) {
  3. 3   // 当前已经统计的数
  4. 4   int curCount = avgUsedTokens(node);
  5. 5   if (curCount + acquireCount > count) {
  6. 6       // 如果是高优先级的,且是基于qps的限流方式,则可以尝试从下个未来的滑动窗口中预支
  7. 7       if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
  8. 8           long currentTime;
  9. 9           long waitInMs;
  10. 10           currentTime = TimeUtil.currentTimeMillis();
  11. 11           // 从下个滑动窗口中提前透支
  12. 12           waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
  13. 13           if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
  14. 14               node.addWaitingRequest(currentTime + waitInMs, acquireCount);
  15. 15               node.addOccupiedPass(acquireCount);
  16. 16               sleep(waitInMs);
  17. 17
  18. 18               // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
  19. 19               throw new PriorityWaitException(waitInMs);
  20. 20          }
  21. 21      }
  22. 22       return false;
  23. 23  }
  24. 24   return true;
  25. 25}
  26. 26
  27. 27private int avgUsedTokens(Node node) {
  28. 28   if (node == null) {
  29. 29       return DEFAULT_AVG_USED_TOKENS;
  30. 30  }
  31. 31   // 如果当前是线程数限流,则返回node.curThreadNum()当前线程数
  32. 32   // 如果是QPS限流,则返回node.passQps()当前已经通过的qps数据
  33. 33   return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
  34. 34}
  35. 35
  36. 36private void sleep(long timeMillis) {
  37. 37   try {
  38. 38       Thread.sleep(timeMillis);
  39. 39  } catch (InterruptedException e) {
  40. 40       // Ignore.
  41. 41  }
  42. 42}
2、匀速排队策略

RateLimiterController

  1. 1@Override
  2. 2public boolean canPass(Node node, int acquireCount, boolean prioritized) {
  3. 3   // Pass when acquire count is less or equal than 0.
  4. 4   if (acquireCount <= 0) {
  5. 5       return true;
  6. 6  }
  7. 7   // Reject when count is less or equal than 0.
  8. 8   // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
  9. 9   if (count <= 0) {
  10. 10       return false;
  11. 11  }
  12. 12
  13. 13   long currentTime = TimeUtil.currentTimeMillis();
  14. 14   // Calculate the interval between every two requests.
  15. 15   // 计算两个请求之间的时间间隔
  16. 16   long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
  17. 17
  18. 18   // Expected pass time of this request. 该请求的预计通过时间 = 上一次通过的时间 + 时间间隔
  19. 19   long expectedTime = costTime + latestPassedTime.get();
  20. 20
  21. 21   // 如果预计时间比当前时间小,表示可以请求完全可以通过
  22. 22   if (expectedTime <= currentTime) {
  23. 23       // Contention may exist here, but it’s okay.
  24. 24       // 这里可能存在竞争,但是不影响。
  25. 25       latestPassedTime.set(currentTime);
  26. 26       return true;
  27. 27  } else {
  28. 28       // Calculate the time to wait.
  29. 29       // 计算等待时间
  30. 30       long waitTime = costTime + latestPassedTime.get() – TimeUtil.currentTimeMillis();
  31. 31       // 如果等待时间超出了等待队列的*大时间,则无法放入等待队列,直接拒*
  32. 32       if (waitTime > maxQueueingTimeMs) {
  33. 33           return false;
  34. 34      } else {
  35. 35           long oldTime = latestPassedTime.addAndGet(costTime);
  36. 36           try {
  37. 37               // 重新计算等待时间
  38. 38               waitTime = oldTime – TimeUtil.currentTimeMillis();
  39. 39               // 判断等待时间是否超过等待队列的*大时间,如果超过了,拒*,并且将latestPassedTime*后一次请求时间重新设置为原值
  40. 40               if (waitTime > maxQueueingTimeMs) {
  41. 41                   latestPassedTime.addAndGet(-costTime);
  42. 42                   return false;
  43. 43              }
  44. 44               // in race condition waitTime may <= 0
  45. 45               // 线程等待
  46. 46               if (waitTime > 0) {
  47. 47                   Thread.sleep(waitTime);
  48. 48              }
  49. 49               return true;
  50. 50          } catch (InterruptedException e) {
  51. 51          }
  52. 52      }
  53. 53  }
  54. 54   return false;
  55. 55}

从代码可以看出,匀速排队策略是使用了虚拟队列的方法,通过控制阈值来计算出请求的时间间隔,然后将上一次请求的时间加上时间间隔,表示下一次请求的时间,如果当前时间比这个值大,说明已经超出时间间隔了,当然可以请求,反之,表示需要等待,那么等待的时长就应该是要等到当前时间达到预期时间才能请求,这里就有个虚拟的等待队列,而等待其实是通过线程的等待来实现的。而这里所说的虚拟队列实际上是由一系列的处于sleep状态的线程组成的,但是实际的数据结构上并没有构成队列。

3、预热/冷启动策略

WarmUpController

首先看WarmUpController的属性和构造方法:

  1. 1// 阈值
  2. 2protected double count;
  3. 3/**
  4. 4* 冷启动的因子 ,默认为3 {@link SentinelConfig#coldFactor()}
  5. 5*/
  6. 6private int coldFactor;
  7. 7// 转折点的令牌数
  8. 8protected int warningToken = 0;
  9. 9// *大令牌数
  10. 10private int maxToken;
  11. 11// 折线初始斜率,标志流量的变化程度
  12. 12protected double slope;
  13. 13
  14. 14// 累积的令牌数 ,累积的令牌数越多,说明系统利用率越低,说明当前流量低,是冷状态
  15. 15protected AtomicLong storedTokens = new AtomicLong(0);
  16. 16// *后更新令牌的时间
  17. 17protected AtomicLong lastFilledTime = new AtomicLong(0);
  18. 18
  19. 19public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
  20. 20   construct(count, warmUpPeriodInSec, coldFactor);
  21. 21}
  22. 22
  23. 23public WarmUpController(double count, int warmUpPeriodInSec) {
  24. 24   construct(count, warmUpPeriodInSec, 3);
  25. 25}
  26. 26
  27. 27/**
  28. 28* @param count             用户设定的阈值(这里假设设定为100)
  29. 29* @param warmUpPeriodInSec 默认为10
  30. 30* @param coldFactor       默认为3
  31. 31*/
  32. 32private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
  33. 33
  34. 34   if (coldFactor <= 1) {
  35. 35       throw new IllegalArgumentException(“Cold factor should be larger than 1”);
  36. 36  }
  37. 37
  38. 38   this.count = count;
  39. 39
  40. 40   this.coldFactor = coldFactor;
  41. 41
  42. 42   // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
  43. 43   // warningToken = 100;
  44. 44
  45. 45   // 按默认的warmUpPeriodInSec = 10,表示1秒钟10个请求,则每个请求为间隔stableInterval = 100ms,那么coldInterval=stableInterval * coldFactor = 100 * 3 = 300ms
  46. 46   // warningToken = 10 * 100 / (3 – 1) = 500
  47. 47   // thresholdPermits = warningToken = 0.5 * warmupPeriod / stableInterval = 0.5 * warmupPeriod / 100ms = 500 ==>> warmupPeriod = 100000ms
  48. 48   warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor – 1);
  49. 49
  50. 50   // / maxPermits = thresholdPermits + 2 * warmupPeriod /
  51. 51   // (stableInterval + coldInterval)
  52. 52   // maxToken = 200
  53. 53
  54. 54   // maxPermits = 500 + 2 * 100000ms / (100ms + 300ms) = 1000
  55. 55   // maxToken = 500 + (2 * 10 * 100 / (1.0 + 3)) = 1000
  56. 56   // maxPermits = maxToken
  57. 57   maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
  58. 58
  59. 59   // slope
  60. 60   // slope = (coldIntervalMicros – stableIntervalMicros) / (maxPermits
  61. 61   // – thresholdPermits);
  62. 62
  63. 63   // slope = (3 – 1.0) / 100 / (600 – 500) = 0.0002
  64. 64   slope = (coldFactor – 1.0) / count / (maxToken – warningToken);
  65. 65
  66. 66}

属性说明:

  • count: 用户设定的qps阈值。
  • coldFactor: 冷启动的因子,初始默认为3,通过SentinelConfig类的coldFactor()方法获取,这里会有个判断,如果启动因子小于等于1,则会设置为默认值3,因为如果是小于等于1,是没有意义的,就不是预热启动了。
  • warningToken:转折点的令牌数,当令牌数开始小于该值得时候,就要开启预热了。
  • maxToken:*大令牌数。
  • slope:折线的斜率。
  • storedTokens:当前存储的令牌数。
  • lastFilledTime:上一次更新令牌的时间。

总体思路:当系统存储的令牌为*大值时,说明系统访问流量较低,处于冷状态,这时候当有正常请求过来时,会让请求通过,并且会补充消耗的令牌数。当瞬时流量来临时,一旦剩余的令牌数小于警戒令牌数(restToken <= warningToken),则表示有大流量过来,需要开启预热过程,开始逐渐增大允许的qps。当qps达到用户设定的阈值后,系统已经预热完毕,这时候就进入了正常的请求阶段。

源码分析如下:

  1. 1@Override
  2. 2public boolean canPass(Node node, int acquireCount, boolean prioritized) {
  3. 3   // 当前已经通过的qps
  4. 4   long passQps = (long) node.passQps();
  5. 5
  6. 6   // 上一个滑动窗口的qps
  7. 7   long previousQps = (long) node.previousPassQps();
  8. 8   // 同步令牌,如果是出于冷启动或预热完毕状态,则考虑要添加令牌
  9. 9   syncToken(previousQps);
  10. 10
  11. 11   // 开始计算它的斜率
  12. 12   // 如果进入了警戒线,开始调整他的qps
  13. 13   long restToken = storedTokens.get();
  14. 14   if (restToken >= warningToken) { // 说明一瞬间有大流量过来,消耗了大量的存储令牌,造成剩余令牌数*警戒值,则要开启预热默认,逐渐增加qps
  15. 15       // 计算当前离警戒线的距离
  16. 16       long aboveToken = restToken – warningToken;
  17. 17       // 消耗的速度要比warning快,但是要比慢
  18. 18       // current interval = restToken*slope+1/count
  19. 19       // restToken越小,interval就越小,表示系统越热
  20. 20       // 随着aboveToken的减小,warningQps会逐渐增大
  21. 21       double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
  22. 22       if (passQps + acquireCount <= warningQps) { // 随着warningQps的增大,acquireCount = 1,那么passQps允许的范围就变大,相应的流量就越大,系统越热
  23. 23           return true;
  24. 24      }
  25. 25  } else {
  26. 26       if (passQps + acquireCount <= count) {
  27. 27           return true;
  28. 28      }
  29. 29  }
  30. 30
  31. 31   return false;
  32. 32}
  33. 33
  34. 34/**
  35. 35* 同步令牌
  36. 36* @param passQps
  37. 37*/
  38. 38protected void syncToken(long passQps) {
  39. 39   long currentTime = TimeUtil.currentTimeMillis();
  40. 40   // 把当前时间的后三位置为0 e.g. 1601456312835 = 1601456312835 – 1601456312835 % 1000 = 1601456312000
  41. 41   currentTime = currentTime – currentTime % 1000;
  42. 42   // 获取上一次更新令牌的时间
  43. 43   long oldLastFillTime = lastFilledTime.get();
  44. 44   if (currentTime <= oldLastFillTime) {
  45. 45       return;
  46. 46  }
  47. 47
  48. 48   // 获得目前的令牌数
  49. 49   long oldValue = storedTokens.get();
  50. 50   // 获取新的令牌数
  51. 51   long newValue = coolDownTokens(currentTime, passQps);
  52. 52
  53. 53   // 更新累积令牌数
  54. 54   if (storedTokens.compareAndSet(oldValue, newValue)) {
  55. 55       // 去除上一次的qps,设置剩下的令牌数
  56. 56       long currentValue = storedTokens.addAndGet(0 – passQps);
  57. 57       if (currentValue < 0) {
  58. 58           // 如果剩下的令牌数小于0,则置为0。
  59. 59           storedTokens.set(0L);
  60. 60      }
  61. 61       // 设置令牌更新时间
  62. 62       lastFilledTime.set(currentTime);
  63. 63  }
  64. 64}
  65. 65
  66. 66private long coolDownTokens(long currentTime, long passQps) {
  67. 67   // 当前拥有的令牌数
  68. 68   long oldValue = storedTokens.get();
  69. 69   long newValue = oldValue;
  70. 70
  71. 71   // 添加令牌的判断前提条件:
  72. 72   // 当令牌的消耗程度远远低于警戒线的时候
  73. 73   if (oldValue < warningToken) { // 这种情况表示已经预热结束,可以开始生成令牌了
  74. 74       // 这里按照count = 100来计算的话,表示旧值oldValue + 距离上次更新的秒数时间差 * count ,表示每秒增加count个令牌
  75. 75       // 这里的currentTime 和 lastFilledTime.get() 都是已经去掉毫秒数的
  76. 76       newValue = (long)(oldValue + (currentTime – lastFilledTime.get()) * count / 1000);
  77. 77  } else if (oldValue > warningToken) { // 进入这里表示当前是冷状态或正处于预热状态
  78. 78       if (passQps < (int)count / coldFactor) { // 如果是冷状态,则补充令牌数,避免令牌数为0
  79. 79           newValue = (long)(oldValue + (currentTime – lastFilledTime.get()) * count / 1000);
  80. 80      }
  81. 81       // 预热阶段则不添加令牌数,从而限制流量的急剧攀升
  82. 82  }
  83. 83   // 限制令牌数不能超过*大令牌数maxToken
  84. 84   return Math.min(newValue, maxToken);
  85. 85}
4、预热的匀速排队策略

WarmUpRateLimiterController

这种是匀速排队模式和预热模式的结合,这里不深入了。搞懂了上面两种,再看这种也比较清晰了。

5、DegradeSlot

官方文档说明:

这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

源码解析:

  1. 1@Override
  2. 2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
  3. 3                 boolean prioritized, Object… args) throws Throwable {
  4. 4   //降级判断
  5. 5   performChecking(context, resourceWrapper);
  6. 6
  7. 7   // 如果有自定义的slot,还会继续进行
  8. 8   fireEntry(context, resourceWrapper, node, count, prioritized, args);
  9. 9}
  10. 10
  11. 11void performChecking(Context context, ResourceWrapper r) throws BlockException {
  12. 12   // 使用DegradeRuleManager获得当前资源的熔断器
  13. 13   List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
  14. 14   if (circuitBreakers == null || circuitBreakers.isEmpty()) {
  15. 15       return;
  16. 16  }
  17. 17   // 遍历熔断器,只要有任何一个满足熔断条件,就抛出DegradeException异常。
  18. 18   for (CircuitBreaker cb : circuitBreakers) {
  19. 19       if (!cb.tryPass(context)) {
  20. 20           throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
  21. 21      }
  22. 22  }
  23. 23}

这里有个关键类,DegradeRuleManager,该类中会保存所有的熔断规则,使用Map<String, List>的格式进行保存。当需要使用的时候,就直接根据资源名称,从该map中获取对应的熔断器列表。

那么规则是如何加载的呢?我们看到DegradeRuleManager这个类,在加载时候,有个静态代码块:

  1. 1private static final RulePropertyListener LISTENER = new RulePropertyListener();
  2. 2private static SentinelProperty<List<DegradeRule>> currentProperty
  3. 3   = new DynamicSentinelProperty<>();
  4. 4
  5. 5static {
  6. 6   currentProperty.addListener(LISTENER);
  7. 7}

currentProperty.addListener(LISTENER);继续分析该段代码,找到DynamicSentinelProperty的addListener(…)方法:

  1. 1@Override
  2. 2public void addListener(PropertyListener<T> listener) {
  3. 3   listeners.add(listener);
  4. 4   listener.configLoad(value);
  5. 5}
  6. 612345

发现会调用监听器的configLoad(…)方法,*终会调用RulePropertyListener这个类的reloadFrom(…)方法。具体怎么解析的其实就是将规则根据资源名称进行归类,并保存为map格式。

%title插图%num

FlowSlot 限流规则引擎之限流算法原理

1、滑动窗口实现原理

%title插图%num

  • 每个时间窗口*大流量为100QPS;
  • 20和80表示当时的真实QPS数量;
  • 一个时间窗口分为两个半限,上半限和下半限;
  • 如果时间窗口1的下半限和时间窗口2的上半限的峰值超过100QPS,那么就丢失一部分流量。

但是这样并不是我们想要的,那么我们来看看计数器滑动窗口。

2、计数器滑动窗口原理

%title插图%num

  • 在滑动窗口算法上优化;
  • 相邻的两个半限总和>总阈值,才丢弃流量。

3、令牌桶算法

%title插图%num

  • 令牌漏斗桶存着所有的Token;
  • 按期发放Token;
  • 如果桶满了,就会熔断;
  • 达到Token的Request可以获取资源;
  • 得不到的就抛弃。