深度思考 Spring Cloud + Alibaba Sentinel 源码原理
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
作者 | 向寒 / 孙玄
来源 | 架构之美
头图 | 下载于视觉中国
关于 Sentinel
1、理论篇
以下是经过多年分布式经验总结的两个理论基础:
(1)微服务与治理的关系
(2)爬坡理论
我们今天的主题分为以下两个主要部分:
- Sentinel设计原理
- Sentinel运行流程源码剖析
Sentinel 设计原理
1、特性
丰富的应用场景:阿里 10 年双十一积累场景,含秒杀、双十一零点持续洪峰、热点商品探测、预热、消息队列削峰填谷、集群流量控制、实时熔断下游不可用应用等多样化的场景。
广泛的开源生态:提供开箱即用的与其它开源框架/库的整合模块,如Dubbo、Spring Cloud、gRPC、Zuul、Reactor 等。
完善的 SPI 扩展点:提供简单易用、完善的 SPI 扩展接口;可通过实现扩展接口来快速地定制逻辑。
完备的实时监控:提供实时的监控功能,可看到接入应用的单台机器秒级数据,及500 台以下规模的集群汇总运行情况。
2、核心关键点
(1)资源:限流的对象
如下代码/user/select即为一个资源:
-
1@GetMapping(“/user/select”)
-
2
-
3@SentinelResource(value = “select”, blockHandler = “exceptionHandler”)
-
4
-
5public TUser select(@RequestParam Integer userId) {
-
6
-
7 log.info(“post /user/select userid=” + userId);
-
8
-
9 return userService.select(userId);
-
10
-
11}
-
即被SentinelResource注解修饰的API:
-
1@Target({ElementType.METHOD, ElementType.TYPE})
-
2
-
3@Retention(RetentionPolicy.RUNTIME)
-
4
-
5@Inherited
-
6
-
7public @interface SentinelResource {
-
8
-
9 String value() default “”;
-
10
-
11
-
12
-
13 EntryType entryType() default EntryType.OUT;
-
14
-
15
-
16
-
17 int resourceType() default 0;
-
18
-
19
-
20
-
21 String blockHandler() default “”;
-
22
-
23
-
24
-
25 Class<?>[] blockHandlerClass() default {};
-
26
-
27
-
28
-
29 String fallback() default “”;
-
30
-
31.…..
-
32
-
33}
-
(2)入口:sentinel为每个资源创建一个Entry。
(3)槽链:每个Entry都会有一条用于记录限流以及各种控制的信息Slot chain,以此来实现下图中绿色部分的功能。
Sentinel 运行流程源码剖析
此图为官网全局流程图,接下来我们通过源码,分解该过程:
1、入口处
-
1SphU.entry(“methodA”, EntryType.IN);//入口
-
2
-
3}
-
核心代码
-
1SphU#lookProcessChain(ResourceWrapper resourceWrapper)
-
2、入口逻辑
-
1private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object… args)
-
2
-
3 throws BlockException {
-
4
-
5 // 从threadLocal中获取当前线程对应的context实例。
-
6
-
7 Context context = ContextUtil.getContext();
-
8
-
9 if (context instanceof NullContext) {
-
10
-
11 // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
-
12
-
13 // so here init the entry only. No rule checking will be done.
-
14
-
15 // 如果context是nullContext的实例,表示当前context的总数已经达到阈值,所以这里直接创建entry实例,并返回,不进行规则的检查。
-
16
-
17 return new CtEntry(resourceWrapper, null, context);
-
18
-
19 }
-
20
-
21
-
22
-
23 if (context == null) {
-
24
-
25 // Using default context.
-
26
-
27 //如果context为空,则使用默认的名字创建一个,就是外部在调用SphU.entry(..)方法前如果没有调用ContextUtil.enter(..),则这里会调用该方法进行内部初始化context
-
28
-
29 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
-
30
-
31 }
-
32
-
33
-
34
-
35 // Global switch is close, no rule checking will do.
-
36
-
37 // 总开关
-
38
-
39 if (!Constants.ON) {
-
40
-
41 return new CtEntry(resourceWrapper, null, context);
-
42
-
43 }
-
44
-
45
-
46
-
47 // 构造链路(核心实现) go in
-
48
-
49 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
-
50
-
51
-
52
-
53 /*
-
54
-
55 * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
-
56
-
57 * so no rule checking will be done.
-
58
-
59 * 当链的大小达到阈值Constants.MAX_SLOT_CHAIN_SIZE时,不会校验任何规则,直接返回。
-
60
-
61 */
-
62
-
63 if (chain == null) {
-
64
-
65 return new CtEntry(resourceWrapper, null, context);
-
66
-
67 }
-
68
-
69
-
70
-
71 Entry e = new CtEntry(resourceWrapper, chain, context);
-
72
-
73 try {
-
74
-
75 // 开始进行链路调用。
-
76
-
77 chain.entry(context, resourceWrapper, null, count, prioritized, args);
-
78
-
79 } catch (BlockException e1) {
-
80
-
81 e.exit(count, args);
-
82
-
83 throw e1;
-
84
-
85 } catch (Throwable e1) {
-
86
-
87 // This should not happen, unless there are errors existing in Sentinel internal.
-
88
-
89 RecordLog.info(“Sentinel unexpected exception”, e1);
-
90
-
91 }
-
92
-
93 return e;
-
94
-
95}
-
3、上下文信息
Context
Context是当前线程所持有的Sentinel上下文。
进入Sentinel的逻辑时,会首先获取当前线程的Context,如果没有则新建。当任务执行完毕后,会清除当前线程的context。Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。
Context 维持着入口节点(entranceNode)、本次调用链路的 当前节点(curNode)、调用来源(origin)等信息。Context 名称即为调用链路入口名称。
Node
Node是对一个@SentinelResource标记的资源的统计包装。
Context中记录本当前线程资源调用的入口节点。
我们可以通过入口节点的childList,可以追溯资源的调用情况。而每个节点都对应一个@SentinelResource标记的资源及其统计数据,例如:passQps,blockQps,rt等数据。
Entry
Entry是Sentinel中用来表示是否通过限流的一个凭证,如果能正常返回,则说明你可以访问被Sentinel保护的后方服务,否则Sentinel会抛出一个BlockException。
另外,它保存了本次执行entry()方法的一些基本信息,包括资源的Context、Node、对应的责任链等信息,后续完成资源调用后,还需要更具获得的这个Entry去执行一些善后操作,包括退出Entry对应的责任链,完成节点的一些统计信息更新,清除当前线程的Context信息等。
在构建Context时已经完成下图部分:
4、核心流程
这里有两个需要注意的点:
- ProcessorSlot chain = lookProcessChain(resourceWrapper); 构建链路。
- chain.entry(context, resourceWrapper, null, count, prioritized, args); 进行链路调用首先来看链路是如何构建的。
5、获取槽链
- 已有直接获取;
- 没有去创建。
-
1 //在上下文中每一个资源都有各自的处理槽
-
2
-
3 ProcessorSlotChain chain = chainMap.get(resourceWrapper);
-
4
-
5 // 双重检查锁保证线程安全
-
6
-
7 if (chain == null) {
-
8
-
9 synchronized (LOCK) {
-
10
-
11 chain = ch ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ainMap.get(resourceWrapper);
-
12
-
13 if (chain == null) {
-
14
-
15 // Entry size limit.
-
16
-
17 // 当链的长度达到阈值时,直接返回null,不进行规则的检查。
-
18
-
19 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
-
20
-
21 return null;
-
22
-
23 }
-
24
-
25 // 构建链路 go in
-
26
-
27 chain = SlotChainProvider.newSlotChain();
-
28
-
29 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
-
30
-
31 chainMap.size() + 1);
-
32
-
33 newMap.putAll(chainMap);
-
34
-
35 newMap.put(resourceWrapper, chain);
-
36
-
37 chainMap = newMap;
-
38
-
39 }
-
40
-
41 }
-
42
-
43 }
-
44
-
45 return chain;
-
46
-
47 }
6、创建槽链
SlotChainProvider.newSlotChain();
-
1 // 基于spi扩展点机制来扩展,默认为DefaultSlotChainBuilder
-
2
-
3slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
-
7、SPI加载ProcessorSlot
这里采用了spi的机制来扩展SlotChainBuilder,默认是采用DefaultSlotChainBuilder来实现的,可以看到sentinel源码的sentinel-core包下,META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件下,默认属性是:
-
1 slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
-
所以默认采用DefaultSlotChainBuilder来构建链路,因此找到DefaultSlotChainBuilder.build()方法。
8、DefaultSlotChainBuilder
-
1public ProcessorSlotChain build() {
-
2
-
3 // 定义链路起点
-
4
-
5 ProcessorSlotChain chain = new DefaultProcessorSlotChain();
-
6
-
7
-
8
-
9 // Note: the instances of ProcessorSlot should be different, since they are not stateless.
-
10
-
11 // 基于spi扩展机制,加载ProcessorSlot的实现类,从META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件下获取,并且按指定顺序排序
-
12
-
13 List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
-
14
-
15 // 遍历构建链路
-
16
-
17 for (ProcessorSlot slot : sortedSlotList) {
-
18
-
19 if (!(slot instanceof AbstractLinkedProcessorSlot)) {
-
20
-
21 RecordLog.warn(“The ProcessorSlot(“ + slot.getClass().getCanonicalName() + “) is not an instance of AbstractLinkedProcessorSlot, can’t be added into ProcessorSlotChain”);
-
22
-
23 continue;
-
24
-
25 }
-
26
-
27 // 将slot节点加入链,因为已经排好序了,只需要加到*后即可
-
28
-
29 chain.addLast((AbstractLinkedProcessorSlot >) slot);
-
30
-
31 }
-
32
-
33
-
34
-
35 return chain;
-
36
-
37 }
9、遍历ProcessorSlots
这里也是通过spi的机制,读取文件META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot:
-
1# Sentinel default ProcessorSlots
-
2
-
3com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
-
4
-
5com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
-
6
-
7com.alibaba.csp.sentinel.slots.logger.LogSlot
-
8
-
9com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
-
10
-
11com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
-
12
-
13com.alibaba.csp.sentinel.slots.system.SystemSlot
-
14
-
15com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
-
16
-
17com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
-
从这里看出,链路由这些节点组成,而slot之间的顺序是根据每个slot节点的@SpiOrder注解的值来确定的。
NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot
链路调用
chain.entry(…)
上面已经构建好了链路,下面就要开始进行链路的调用了。
回到CtSph#entryWithPriority
1、NodeSelectorSlot
NodeSelectorSlot(@SpiOrder(-10000))
直接进入NodeSelectorSlot类的entry方法。
根据官方文档,NodeSelectorSlot类的作用为:
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
-
1@Override
-
2
-
3public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object… args)
-
4
-
5 throws Throwable {
-
6
-
7
-
8
-
9 // 双重检查锁+缓存 机制
-
10
-
11 DefaultNode node = map.get(context.getName());
-
12
-
13 if (node == null) {
-
14
-
15 synchronized (this) {
-
16
-
17 node = map.get(context.getName());
-
18
-
19 if (node == null) {
-
20
-
21 node = new DefaultNode(resourceWrapper, null);
-
22
-
23 HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
-
24
-
25 cacheMap.putAll(map);
-
26
-
27 cacheMap.put(context.getName(), node);
-
28
-
29 map = cacheMap;
-
30
-
31 // Build invocation tree
-
32
-
33 // 构建调用链的树形结构
-
34
-
35 ((DefaultNode) context.getLastNode()).addChild(node);
-
36
-
37 }
-
38
-
39
-
40
-
41 }
-
42
-
43 }
-
44
-
45
-
46
-
47 context.setCurNode(node);
-
48
-
49 // 进入下一个链
-
50
-
51 fireEntry(context, resourceWrapper, node, count, prioritized, args);
-
52
-
53}
-
2、ClusterBuilderSlot
ClusterBuilderSlot(@SpiOrder(-9000))
根据官方文档,ClusterBuilderSlot的作用为:
此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。
3、LogSlot
LogSlot(@SpiOrder(-8000))
该类对链路的传递不做处理,只有在抛出BlockException的时候,向上层层传递的过程中,会通过该类来输入一些日志信息:
-
1@Override
-
2
-
3public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object… args)
-
4
-
5 throws Throwable {
-
6
-
7 try {
-
8
-
9 fireEntry(context, resourceWrapper, obj, count, prioritized, args);
-
10
-
11 } catch (BlockException e) {
-
12
-
13 // 当抛出BlockException异常时,这里会输入日志信息
-
14
-
15 EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
-
16
-
17 context.getOrigin(), count);
-
18
-
19 throw e;
-
20
-
21 } catch (Throwable e) {
-
22
-
23 RecordLog.warn(“Unexpected entry exception”, e);
-
24
-
25 }
-
26
-
27}
-
4、StatisticSlot
StatisticSlot(@SpiOrder(-7000))
官方文档:
StatisticSlot用于记录、统计不同纬度的 runtime 指标监控信息。
-
1@Override
-
2
-
3public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
-
4
-
5 boolean prioritized, Object… args) throws Throwable {
-
6
-
7 try {
-
8
-
9 // Do some checking.
-
10
-
11 // 先将调用链继续下去,等到后续链调用结束了,再执行下面的步骤
-
12
-
13 fireEntry(context, resourceWrapper, node, count, prioritized, args);
-
14
-
15
-
16
-
17 // Request passed, add thread count and pass count.
-
18
-
19 node.increaseThreadNum();
-
20
-
21 node.addPassRequest(count);
-
22
-
23
-
24
-
25 if (context.getCurEntry().getOriginNode() != null) {
-
26
-
27 // Add count for origin node.
-
28
-
29 context.getCurEntry().getOriginNode().increaseThreadNum();
-
30
-
31 context.getCurEntry().getOriginNode().addPassRequest(count);
-
32
-
33 }
-
34
-
35
-
36
-
37 if (resourceWrapper.getEntryType() == EntryType.IN) {
-
38
-
39 // Add count for global inbound entry node for global statistics.
-
40
-
41 Constants.ENTRY_NODE.increaseThreadNum();
-
42
-
43 Constants.ENTRY_NODE.addPassRequest(count);
-
44
-
45 }
-
46
-
47
-
48
-
49 // Handle pass event with registered entry callback handlers.
-
50
-
51 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
-
52
-
53 handler.onPass(context, resourceWrapper, node, count, args);
-
54
-
55 }
-
56
-
57 } catch (PriorityWaitException ex) {
-
58
-
59 node.increaseThreadNum();
-
60
-
61 if (context.getCurEntry().getOriginNode() != null) {
-
62
-
63 // Add count for origin node.
-
64
-
65 context.getCurEntry().getOriginNode().increaseThreadNum();
-
66
-
67 }
-
68
-
69
-
70
-
71 if (resourceWrapper.getEntryType() == EntryType.IN) {
-
72
-
73 // Add count for global inbound entry node for global statistics.
-
74
-
75 Constants.ENTRY_NODE.increaseThreadNum();
-
76
-
77 }
-
78
-
79 // Handle pass event with registered entry callback handlers.
-
80
-
81 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
-
82
-
83 handler.onPass(context, resourceWrapper, node, count, args);
-
84
-
85 }
-
86
-
87 } catch (BlockException e) {
-
88
-
89 // Blocked, set block exception to current entry.
-
90
-
91 context.getCurEntry().setBlockError(e);
-
92
-
93
-
94
-
95 // Add block count.
-
96
-
97 node.increaseBlockQps(count);
-
98
-
99 if (context.getCurEntry().getOriginNode() != null) {
-
100
-
101 context.getCurEntry().getOriginNode().increaseBlockQps(count);
-
102
-
103 }
-
104
-
105
-
106
-
107 if (resourceWrapper.getEntryType() == EntryType.IN) {
-
108
-
109 // Add count for global inbound entry node for global statistics.
-
110
-
111 Constants.ENTRY_NODE.increaseBlockQps(count);
-
112
-
113 }
-
114
-
115
-
116
-
117 // Handle block event with registered entry callback handlers.
-
118
-
119 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
-
120
-
121 handler.onBlocked(e, context, resourceWrapper, node, count, args);
-
122
-
123 }
-
124
-
125
-
126
-
127 throw e;
-
128
-
129 } catch (Throwable e) {
-
130
-
131 // Unexpected internal error, set error to current entry.
-
132
-
133 context.getCurEntry().setError(e);
-
134
-
135
-
136
-
137 throw e;
-
138
-
139 }
-
140
-
141}
-
StatisticSlot 会先将链往下执行,等到后面的节点全部执行完毕,再进行数据统计。
5、AuthoritySlot
@SpiOrder(-6000)
AuthoritySlot
官方文档:
AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制
-
1@Override
-
2
-
3public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object… args)
-
4
-
5 throws Throwable {
-
6
-
7 // 黑白名单权限控制
-
8
-
9 checkBlackWhiteAuthority(resourceWrapper, context);
-
10
-
11 fireEntry(context, resourceWrapper, node, count, prioritized, args);
-
12
-
13}
-
14
-
15
-
16
-
17void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
-
18
-
19 Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
-
20
-
21
-
22
-
23 if (authorityRules == null) {
-
24
-
25 return;
-
26
-
27 }
-
28
-
29
-
30
-
31 Set<AuthorityRule> rules = authorityRules.get(resource.getName());
-
32
-
33 if (rules == null) {
-
34
-
35 return;
-
36
-
37 }
-
38
-
39
-
40
-
41 for (AuthorityRule rule : rules) {
-
42
-
43 if (!AuthorityRuleChecker.passCheck(rule, context)) {
-
44
-
45 throw new AuthorityException(context.getOrigin(), rule);
-
46
-
47 }
-
48
-
49 }
-
50
-
51}
-
6、SystemSlot
@SpiOrder(-5000)
SystemSlot
官方文档:
SystemSlot:这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。
-
1@Override
-
2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
-
3 boolean prioritized, Object… args) throws Throwable {
-
4 // 系统规则校验
-
5 SystemRuleManager.checkSystem(resourceWrapper);
-
6 fireEntry(context, resourceWrapper, node, count, prioritized, args);
-
7}
-
7、FlowSlot 限流规则引擎
@SpiOrder(-2000)
FlowSlot
官方文档:
这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:
- 指定应用生效的规则,即针对调用方限流的;
- 调用方为 other 的规则;
- 调用方为 default 的规则。
入口
-
1@Override
-
2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
-
3 boolean prioritized, Object… args) throws Throwable {
-
4 // 检查限流规则
-
5 checkFlow(resourceWrapper, context, node, count, prioritized);
-
6
-
7 fireEntry(context, resourceWrapper, node, count, prioritized, args);
-
8}
-
9
-
10void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
-
11 throws BlockException {
-
12 checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
-
13}
1、所有规则检查
调用了FlowRuleChecker.checkFlow(…)方法。
-
1public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
-
2 Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
-
3 if (ruleProvider == null || resource == null) {
-
4 return;
-
5 }
-
6 // 根据资源名称找到对应的
-
7 Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
-
8 if (rules != null) {
-
9 // 遍历规则,依次判断是否通过
-
10 for (FlowRule rule : rules) {
-
11 if (!canPassCheck(rule, context, node, count, prioritized)) {
-
12 throw new FlowException(rule.getLimitApp(), rule);
-
13 }
-
14 }
-
15 }
-
16}
-
2、单个规则检查
-
1public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
-
2 boolean prioritized) {
-
3 String limitApp = rule.getLimitApp();
-
4 if (limitApp == null) {
-
5 return true;
-
6 }
-
7 // 集群限流的判断
-
8 if (rule.isClusterMode()) {
-
9 return passClusterCheck(rule, context, node, acquireCount, prioritized);
-
10 }
-
11 // 本地节点的判断
-
12 return passLocalCheck(rule, context, node, acquireCount, prioritized);
-
13}
-
3、非集群模式的限流判断
-
1private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
-
2 boolean prioritized) {
-
3 // 根据请求的信息及策略,选择不同的node节点
-
4 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
-
5 if (selectedNode == null) {
-
6 return true;
-
7 }
-
8 // 根据当前规则,获取规则控制器,调用canPass方法进行判断
-
9// rule.getRater()放回的是TrafficShapingController接口的实现类,使用了策略模式,根据使用的控制措施来选择使用哪种实现。
-
10 return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
-
11}
-
这里是先根据请求和当前规则的策略,找到该规则下存储统计信息的节点,然后根据当前规则获取相应控制器,通过控制器的canPass(…)方法进行判断。
4、获取节点
-
1static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
-
2 // The limit app should not be empty.
-
3 String limitApp = rule.getLimitApp();
-
4 int strategy = rule.getStrategy();
-
5 String origin = context.getOrigin();
-
6
-
7 // 判断调用来源,这种情况下origin不能为default或other
-
8 if (limitApp.equals(origin) && filterOrigin(origin)) {
-
9 // 如果调用关系策略为STRATEGY_DIRECT,表示仅判断自己,则返回origin statistic node.
-
10 if (strategy == RuleConstant.STRATEGY_DIRECT) {
-
11 // Matches limit origin, return origin statistic node.
-
12 return context.getOriginNode();
-
13 }
-
14
-
15 // 采用调用来源进行判断的策略
-
16 return selectReferenceNode(rule, context, node);
-
17 } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // 如果调用来源为default默认的
-
18 if (strategy == RuleConstant.STRATEGY_DIRECT) { // 如果调用关系策略为STRATEGY_DIRECT,则返回clusterNode
-
19 // Return the cluster node.
-
20 return node.getClusterNode();
-
21 }
-
22
-
23 return selectReferenceNode(rule, context, node);
-
24 } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
-
25 && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // 如果调用来源为other,且调用来源不在限制规则内,为其他来源
-
26 if (strategy == RuleConstant.STRATEGY_DIRECT) {
-
27 return context.getOriginNode();
-
28 }
-
29 return selectReferenceNode(rule, context, node);
-
30 }
-
31 return null;
-
32}
-
5、流量整形控制器
rule.getRater()方法会返回一个控制器,接口为TrafficShapingController,该接口的实现类图如下:
从类图可以看出,是很明显的策略模式,分别针对不同的限流控制策略。
1、默认策略
DefaultController该策略是sentinel的默认策略,如果请求超出阈值,则直接拒*请求。
-
1@Override
-
2public boolean canPass(Node node, int acquireCount, boolean prioritized) {
-
3 // 当前已经统计的数
-
4 int curCount = avgUsedTokens(node);
-
5 if (curCount + acquireCount > count) {
-
6 // 如果是高优先级的,且是基于qps的限流方式,则可以尝试从下个未来的滑动窗口中预支
-
7 if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
-
8 long currentTime;
-
9 long waitInMs;
-
10 currentTime = TimeUtil.currentTimeMillis();
-
11 // 从下个滑动窗口中提前透支
-
12 waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
-
13 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
-
14 node.addWaitingRequest(currentTime + waitInMs, acquireCount);
-
15 node.addOccupiedPass(acquireCount);
-
16 sleep(waitInMs);
-
17
-
18 // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
-
19 throw new PriorityWaitException(waitInMs);
-
20 }
-
21 }
-
22 return false;
-
23 }
-
24 return true;
-
25}
-
26
-
27private int avgUsedTokens(Node node) {
-
28 if (node == null) {
-
29 return DEFAULT_AVG_USED_TOKENS;
-
30 }
-
31 // 如果当前是线程数限流,则返回node.curThreadNum()当前线程数
-
32 // 如果是QPS限流,则返回node.passQps()当前已经通过的qps数据
-
33 return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
-
34}
-
35
-
36private void sleep(long timeMillis) {
-
37 try {
-
38 Thread.sleep(timeMillis);
-
39 } catch (InterruptedException e) {
-
40 // Ignore.
-
41 }
-
42}
-
2、匀速排队策略
RateLimiterController
-
1@Override
-
2public boolean canPass(Node node, int acquireCount, boolean prioritized) {
-
3 // Pass when acquire count is less or equal than 0.
-
4 if (acquireCount <= 0) {
-
5 return true;
-
6 }
-
7 // Reject when count is less or equal than 0.
-
8 // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
-
9 if (count <= 0) {
-
10 return false;
-
11 }
-
12
-
13 long currentTime = TimeUtil.currentTimeMillis();
-
14 // Calculate the interval between every two requests.
-
15 // 计算两个请求之间的时间间隔
-
16 long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
-
17
-
18 // Expected pass time of this request. 该请求的预计通过时间 = 上一次通过的时间 + 时间间隔
-
19 long expectedTime = costTime + latestPassedTime.get();
-
20
-
21 // 如果预计时间比当前时间小,表示可以请求完全可以通过
-
22 if (expectedTime <= currentTime) {
-
23 // Contention may exist here, but it’s okay.
-
24 // 这里可能存在竞争,但是不影响。
-
25 latestPassedTime.set(currentTime);
-
26 return true;
-
27 } else {
-
28 // Calculate the time to wait.
-
29 // 计算等待时间
-
30 long waitTime = costTime + latestPassedTime.get() – TimeUtil.currentTimeMillis();
-
31 // 如果等待时间超出了等待队列的*大时间,则无法放入等待队列,直接拒*
-
32 if (waitTime > maxQueueingTimeMs) {
-
33 return false;
-
34 } else {
-
35 long oldTime = latestPassedTime.addAndGet(costTime);
-
36 try {
-
37 // 重新计算等待时间
-
38 waitTime = oldTime – TimeUtil.currentTimeMillis();
-
39 // 判断等待时间是否超过等待队列的*大时间,如果超过了,拒*,并且将latestPassedTime*后一次请求时间重新设置为原值
-
40 if (waitTime > maxQueueingTimeMs) {
-
41 latestPassedTime.addAndGet(-costTime);
-
42 return false;
-
43 }
-
44 // in race condition waitTime may <= 0
-
45 // 线程等待
-
46 if (waitTime > 0) {
-
47 Thread.sleep(waitTime);
-
48 }
-
49 return true;
-
50 } catch (InterruptedException e) {
-
51 }
-
52 }
-
53 }
-
54 return false;
-
55}
-
从代码可以看出,匀速排队策略是使用了虚拟队列的方法,通过控制阈值来计算出请求的时间间隔,然后将上一次请求的时间加上时间间隔,表示下一次请求的时间,如果当前时间比这个值大,说明已经超出时间间隔了,当然可以请求,反之,表示需要等待,那么等待的时长就应该是要等到当前时间达到预期时间才能请求,这里就有个虚拟的等待队列,而等待其实是通过线程的等待来实现的。而这里所说的虚拟队列实际上是由一系列的处于sleep状态的线程组成的,但是实际的数据结构上并没有构成队列。
3、预热/冷启动策略
WarmUpController
首先看WarmUpController的属性和构造方法:
-
1// 阈值
-
2protected double count;
-
3/**
-
4* 冷启动的因子 ,默认为3 {@link SentinelConfig#coldFactor()}
-
5*/
-
6private int coldFactor;
-
7// 转折点的令牌数
-
8protected int warningToken = 0;
-
9// *大令牌数
-
10private int maxToken;
-
11// 折线初始斜率,标志流量的变化程度
-
12protected double slope;
-
13
-
14// 累积的令牌数 ,累积的令牌数越多,说明系统利用率越低,说明当前流量低,是冷状态
-
15protected AtomicLong storedTokens = new AtomicLong(0);
-
16// *后更新令牌的时间
-
17protected AtomicLong lastFilledTime = new AtomicLong(0);
-
18
-
19public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
-
20 construct(count, warmUpPeriodInSec, coldFactor);
-
21}
-
22
-
23public WarmUpController(double count, int warmUpPeriodInSec) {
-
24 construct(count, warmUpPeriodInSec, 3);
-
25}
-
26
-
27/**
-
28* @param count 用户设定的阈值(这里假设设定为100)
-
29* @param warmUpPeriodInSec 默认为10
-
30* @param coldFactor 默认为3
-
31*/
-
32private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
-
33
-
34 if (coldFactor <= 1) {
-
35 throw new IllegalArgumentException(“Cold factor should be larger than 1”);
-
36 }
-
37
-
38 this.count = count;
-
39
-
40 this.coldFactor = coldFactor;
-
41
-
42 // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
-
43 // warningToken = 100;
-
44
-
45 // 按默认的warmUpPeriodInSec = 10,表示1秒钟10个请求,则每个请求为间隔stableInterval = 100ms,那么coldInterval=stableInterval * coldFactor = 100 * 3 = 300ms
-
46 // warningToken = 10 * 100 / (3 – 1) = 500
-
47 // thresholdPermits = warningToken = 0.5 * warmupPeriod / stableInterval = 0.5 * warmupPeriod / 100ms = 500 ==>> warmupPeriod = 100000ms
-
48 warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor – 1);
-
49
-
50 // / maxPermits = thresholdPermits + 2 * warmupPeriod /
-
51 // (stableInterval + coldInterval)
-
52 // maxToken = 200
-
53
-
54 // maxPermits = 500 + 2 * 100000ms / (100ms + 300ms) = 1000
-
55 // maxToken = 500 + (2 * 10 * 100 / (1.0 + 3)) = 1000
-
56 // maxPermits = maxToken
-
57 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
-
58
-
59 // slope
-
60 // slope = (coldIntervalMicros – stableIntervalMicros) / (maxPermits
-
61 // – thresholdPermits);
-
62
-
63 // slope = (3 – 1.0) / 100 / (600 – 500) = 0.0002
-
64 slope = (coldFactor – 1.0) / count / (maxToken – warningToken);
-
65
-
66}
-
属性说明:
- count: 用户设定的qps阈值。
- coldFactor: 冷启动的因子,初始默认为3,通过SentinelConfig类的coldFactor()方法获取,这里会有个判断,如果启动因子小于等于1,则会设置为默认值3,因为如果是小于等于1,是没有意义的,就不是预热启动了。
- warningToken:转折点的令牌数,当令牌数开始小于该值得时候,就要开启预热了。
- maxToken:*大令牌数。
- slope:折线的斜率。
- storedTokens:当前存储的令牌数。
- lastFilledTime:上一次更新令牌的时间。
总体思路:当系统存储的令牌为*大值时,说明系统访问流量较低,处于冷状态,这时候当有正常请求过来时,会让请求通过,并且会补充消耗的令牌数。当瞬时流量来临时,一旦剩余的令牌数小于警戒令牌数(restToken <= warningToken),则表示有大流量过来,需要开启预热过程,开始逐渐增大允许的qps。当qps达到用户设定的阈值后,系统已经预热完毕,这时候就进入了正常的请求阶段。
源码分析如下:
-
1@Override
-
2public boolean canPass(Node node, int acquireCount, boolean prioritized) {
-
3 // 当前已经通过的qps
-
4 long passQps = (long) node.passQps();
-
5
-
6 // 上一个滑动窗口的qps
-
7 long previousQps = (long) node.previousPassQps();
-
8 // 同步令牌,如果是出于冷启动或预热完毕状态,则考虑要添加令牌
-
9 syncToken(previousQps);
-
10
-
11 // 开始计算它的斜率
-
12 // 如果进入了警戒线,开始调整他的qps
-
13 long restToken = storedTokens.get();
-
14 if (restToken >= warningToken) { // 说明一瞬间有大流量过来,消耗了大量的存储令牌,造成剩余令牌数*警戒值,则要开启预热默认,逐渐增加qps
-
15 // 计算当前离警戒线的距离
-
16 long aboveToken = restToken – warningToken;
-
17 // 消耗的速度要比warning快,但是要比慢
-
18 // current interval = restToken*slope+1/count
-
19 // restToken越小,interval就越小,表示系统越热
-
20 // 随着aboveToken的减小,warningQps会逐渐增大
-
21 double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
-
22 if (passQps + acquireCount <= warningQps) { // 随着warningQps的增大,acquireCount = 1,那么passQps允许的范围就变大,相应的流量就越大,系统越热
-
23 return true;
-
24 }
-
25 } else {
-
26 if (passQps + acquireCount <= count) {
-
27 return true;
-
28 }
-
29 }
-
30
-
31 return false;
-
32}
-
33
-
34/**
-
35* 同步令牌
-
36* @param passQps
-
37*/
-
38protected void syncToken(long passQps) {
-
39 long currentTime = TimeUtil.currentTimeMillis();
-
40 // 把当前时间的后三位置为0 e.g. 1601456312835 = 1601456312835 – 1601456312835 % 1000 = 1601456312000
-
41 currentTime = currentTime – currentTime % 1000;
-
42 // 获取上一次更新令牌的时间
-
43 long oldLastFillTime = lastFilledTime.get();
-
44 if (currentTime <= oldLastFillTime) {
-
45 return;
-
46 }
-
47
-
48 // 获得目前的令牌数
-
49 long oldValue = storedTokens.get();
-
50 // 获取新的令牌数
-
51 long newValue = coolDownTokens(currentTime, passQps);
-
52
-
53 // 更新累积令牌数
-
54 if (storedTokens.compareAndSet(oldValue, newValue)) {
-
55 // 去除上一次的qps,设置剩下的令牌数
-
56 long currentValue = storedTokens.addAndGet(0 – passQps);
-
57 if (currentValue < 0) {
-
58 // 如果剩下的令牌数小于0,则置为0。
-
59 storedTokens.set(0L);
-
60 }
-
61 // 设置令牌更新时间
-
62 lastFilledTime.set(currentTime);
-
63 }
-
64}
-
65
-
66private long coolDownTokens(long currentTime, long passQps) {
-
67 // 当前拥有的令牌数
-
68 long oldValue = storedTokens.get();
-
69 long newValue = oldValue;
-
70
-
71 // 添加令牌的判断前提条件:
-
72 // 当令牌的消耗程度远远低于警戒线的时候
-
73 if (oldValue < warningToken) { // 这种情况表示已经预热结束,可以开始生成令牌了
-
74 // 这里按照count = 100来计算的话,表示旧值oldValue + 距离上次更新的秒数时间差 * count ,表示每秒增加count个令牌
-
75 // 这里的currentTime 和 lastFilledTime.get() 都是已经去掉毫秒数的
-
76 newValue = (long)(oldValue + (currentTime – lastFilledTime.get()) * count / 1000);
-
77 } else if (oldValue > warningToken) { // 进入这里表示当前是冷状态或正处于预热状态
-
78 if (passQps < (int)count / coldFactor) { // 如果是冷状态,则补充令牌数,避免令牌数为0
-
79 newValue = (long)(oldValue + (currentTime – lastFilledTime.get()) * count / 1000);
-
80 }
-
81 // 预热阶段则不添加令牌数,从而限制流量的急剧攀升
-
82 }
-
83 // 限制令牌数不能超过*大令牌数maxToken
-
84 return Math.min(newValue, maxToken);
-
85}
4、预热的匀速排队策略
WarmUpRateLimiterController
这种是匀速排队模式和预热模式的结合,这里不深入了。搞懂了上面两种,再看这种也比较清晰了。
5、DegradeSlot
官方文档说明:
这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
源码解析:
-
1@Override
-
2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
-
3 boolean prioritized, Object… args) throws Throwable {
-
4 //降级判断
-
5 performChecking(context, resourceWrapper);
-
6
-
7 // 如果有自定义的slot,还会继续进行
-
8 fireEntry(context, resourceWrapper, node, count, prioritized, args);
-
9}
-
10
-
11void performChecking(Context context, ResourceWrapper r) throws BlockException {
-
12 // 使用DegradeRuleManager获得当前资源的熔断器
-
13 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
-
14 if (circuitBreakers == null || circuitBreakers.isEmpty()) {
-
15 return;
-
16 }
-
17 // 遍历熔断器,只要有任何一个满足熔断条件,就抛出DegradeException异常。
-
18 for (CircuitBreaker cb : circuitBreakers) {
-
19 if (!cb.tryPass(context)) {
-
20 throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
-
21 }
-
22 }
-
23}
这里有个关键类,DegradeRuleManager,该类中会保存所有的熔断规则,使用Map<String, List>的格式进行保存。当需要使用的时候,就直接根据资源名称,从该map中获取对应的熔断器列表。
那么规则是如何加载的呢?我们看到DegradeRuleManager这个类,在加载时候,有个静态代码块:
-
1private static final RulePropertyListener LISTENER = new RulePropertyListener();
-
2private static SentinelProperty<List<DegradeRule>> currentProperty
-
3 = new DynamicSentinelProperty<>();
-
4
-
5static {
-
6 currentProperty.addListener(LISTENER);
-
7}
-
currentProperty.addListener(LISTENER);继续分析该段代码,找到DynamicSentinelProperty的addListener(…)方法:
-
1@Override
-
2public void addListener(PropertyListener<T> listener) {
-
3 listeners.add(listener);
-
4 listener.configLoad(value);
-
5}
-
612345
-
发现会调用监听器的configLoad(…)方法,*终会调用RulePropertyListener这个类的reloadFrom(…)方法。具体怎么解析的其实就是将规则根据资源名称进行归类,并保存为map格式。
FlowSlot 限流规则引擎之限流算法原理
1、滑动窗口实现原理
- 每个时间窗口*大流量为100QPS;
- 20和80表示当时的真实QPS数量;
- 一个时间窗口分为两个半限,上半限和下半限;
- 如果时间窗口1的下半限和时间窗口2的上半限的峰值超过100QPS,那么就丢失一部分流量。
但是这样并不是我们想要的,那么我们来看看计数器滑动窗口。
2、计数器滑动窗口原理
- 在滑动窗口算法上优化;
- 相邻的两个半限总和>总阈值,才丢弃流量。
3、令牌桶算法
- 令牌漏斗桶存着所有的Token;
- 按期发放Token;
- 如果桶满了,就会熔断;
- 达到Token的Request可以获取资源;
- 得不到的就抛弃。