一、简单介绍
- suishen-esb中,提供了Dubbo对Hystrix的集成;
- Hystrix内部使用了线程池完成具体的任务执行;
- 每一个远程service使用独立的线程池;
- 内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常);
- 当瞬时并发数超出最大线程数时,dubbo调用执行异常。
二、事件脉络
- 用户反馈使用异常,紧急查看日志
org.apache.dubbo.rpc.RpcException: Failed to invoke the method validLoginAuthentication in the service weli.wormhole.rpc.user.center.api.IAuthenticationService. Tried 1 times of the providers [10.65.0.205:11090] (1/4) from the registry node1.zk.all.platform.wtc.hwhosts.com:2181 on the consumer 10.65.0.34 using the dubbo version 2.7.3-SNAPSHOT. Last error is: validLoginAuthentication_1 could not be queued for execution and fallback failed.
at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:113)
at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:248)
at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:78)
at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:55)
at org.apache.dubbo.common.bytecode.proxy14.validLoginAuthentication(proxy14.java)
at weli.peanut.web.interceptor.VerifyLoginInterceptor.preHandle(VerifyLoginInterceptor.java:134)
at org.springframework.web.servlet.HandlerExecutionChain.applyPreHandle(HandlerExecutionChain.java:134)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:958)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:98)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1721)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1679)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: validLoginAuthentication_1 could not be queued for execution and fallback failed.
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:818)
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:793)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1454)
at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1379)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.observers.Subscribers$5.onError(Subscribers.java:230)
at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
at rx.Observable.unsafeSubscribe(Observable.java:10158)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.subscribe(Observable.java:10247)
at rx.Observable.subscribe(Observable.java:10214)
at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)
at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:411)
at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:377)
at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:343)
at suishen.esb.hystrix.dubbo.filter.HystrixFilter.invoke(HystrixFilter.java:46)
at com.alibaba.dubbo.rpc.Filter.invoke(Filter.java:29)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:92)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:58)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$CallbackRegistrationInvoker.invoke(ProtocolFilterWrapper.java:157)
at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:82)
... 36 common frames omitted
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1bf63b71 rejected from java.util.concurrent.ThreadPoolExecutor@6fb1f813[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
... 91 common frames omitted
- 根据日志,发现是由于线程池打满造成了任务的拒绝执行,起初认为是提供方的dubbo线程池被打满,迅速排查中台日志;
{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.23 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:0,当前线程数:400,队列大小:0,任务总量:48363297,已完成任务量:48363297","logger_name":"wormholeBusiness","thread_name":"qbScheduler-6","level":"INFO","level_value":20000}
{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.44 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:2,当前线程数:400,队列大小:0,任务总量:48371189,已完成任务量:48371187","logger_name":"wormholeBusiness","thread_name":"qbScheduler-3","level":"INFO","level_value":20000}
- 发现中台服务正常,dubbo空闲线程也比较充裕;
- 在回头看调用方异常信息,发现调用方使用了Hystrix,抛出异常的是Hystrix内部的线程池;
- 此时紧急增加节点,重启服务后,业务开始正常。
三、问题分析
- 根据日志分析,是由HystrixFilter执行了HystrixCommand.execute()造成了异常。
@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {
public HystrixFilter() {
ApplicationContext springContext = ApplicationContextHolder.getContext();
if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {
BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();
if (beanFactory instanceof DefaultListableBeanFactory) {
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);
beanDefinitionBuilder.setDestroyMethodName("preDestroy");
beanDefinitionBuilder.setScope("singleton");
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());
//触发初始化
beanFactory.getBean(HystrixSpringService.class.getSimpleName());
}
}
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 异步调用使用hystrix做熔断没有意义
if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))) {
return invoker.invoke(invocation);
}
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
return command.execute();
}
}
- 当执行的dubbo调用为同步调用时,会创建DubboHystrixCommand,交由Hystrix执行远程调用。
public class DubboHystrixCommand extends HystrixCommand<Result> {
private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;
private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000;
private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
private Invoker invoker;
private Invocation invocation;
public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
.andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
//10秒钟内请求失败上限值,超过此值熔断器发挥作用
.withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
//熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
.withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl()))
//错误率达到50开启熔断保护
.withCircuitBreakerErrorThresholdPercentage(50)
//使用dubbo的超时,禁用这里的超时
.withExecutionTimeoutEnabled(false))
//根据dubbo配置设置线程池大小
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));
this.invoker = invoker;
this.invocation = invocation;
}
/**
* 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
*
* @param url
* @return
*/
private static int getFailQpsThreshold(URL url) {
if (url != null) {
int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
if (logger.isDebugEnabled()) {
logger.debug("FailQpsThreshold: " + threshold);
}
return threshold;
}
return DEFAULT_FAIL_QPS_THRESHOLD;
}
/**
* 获取熔断器中断请求窗口大小
*
* @param url
* @return
*/
private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) {
if (url != null) {
int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds);
}
return circuitBreakerSleepWindowInMilliseconds;
}
return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;
}
/**
* 获取线程池大小
*
* @param url
* @return
*/
private static int getThreadPoolCoreSize(URL url) {
if (url != null) {
int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
if (logger.isDebugEnabled()) {
logger.debug("ThreadPoolCoreSize: " + size);
}
return size;
}
return DEFAULT_THREADPOOL_CORE_SIZE;
}
@Override
protected Result run() throws Exception {
Throwable exception = null;
Result result = null;
try {
result = invoker.invoke(invocation);
exception = result.getException();
} catch (Exception e) {
exception = e;
}
// 这里打印异常是为了记录异常,再抛出异常是为了触发fallback
if (exception != null) {
Logs.error("Dubbo Exception: ", exception);
throw new Exception(exception);
}
return result;
}
@Override
protected Result getFallback() {
return new RpcResult((Object) null);
}
}
看到这里终于找到了根本问题:
- 内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常);
- 当瞬时并发数超出最大线程数时,dubbo调用执行异常。
四、处理
1、修改HystrixFilter,提供是否使用Hystrix开关,对于签名验证等核心接口,选择同步执行。
@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {
public HystrixFilter() {
ApplicationContext springContext = ApplicationContextHolder.getContext();
if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {
BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();
if (beanFactory instanceof DefaultListableBeanFactory) {
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);
beanDefinitionBuilder.setDestroyMethodName("preDestroy");
beanDefinitionBuilder.setScope("singleton");
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());
//触发初始化
beanFactory.getBean(HystrixSpringService.class.getSimpleName());
}
}
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 异步调用使用hystrix做熔断没有意义
if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))
|| "false".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "hystrixOpen"))) {
return invoker.invoke(invocation);
}
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
return command.execute();
}
}
2、调整DubboHystrixCommand中线程池参数,增加最大线程数配置、线程活跃时间及等待队列大小
public class DubboHystrixCommand extends HystrixCommand<Result> {
private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 30;
private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 50;
private static final int DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES = 5;
private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
private Invoker invoker;
private Invocation invocation;
public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
.andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
//10秒钟内请求失败上限值,超过此值熔断器发挥作用
.withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
//熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
.withCircuitBreakerSleepWindowInMilliseconds(30000)
//错误率达到50开启熔断保护
.withCircuitBreakerErrorThresholdPercentage(50)
//使用dubbo的超时,禁用这里的超时
.withExecutionTimeoutEnabled(false))
//根据dubbo配置设置线程池大小
.andThreadPoolPropertiesDefaults(getThreadPoolSetter(invoker.getUrl())));
this.invoker = invoker;
this.invocation = invocation;
}
/**
* 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
*
* @param url
* @return
*/
private static int getFailQpsThreshold(URL url) {
if (url != null) {
int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
if (logger.isDebugEnabled()) {
logger.debug("FailQpsThreshold: " + threshold);
}
return threshold;
}
return DEFAULT_FAIL_QPS_THRESHOLD;
}
private static HystrixThreadPoolProperties.Setter getThreadPoolSetter(URL url) {
return HystrixThreadPoolProperties.Setter()
.withCoreSize(getThreadPoolProperties(url, "threadPoolCoreSize", DEFAULT_THREAD_POOL_CORE_SIZE))
.withMaximumSize(getThreadPoolProperties(url, "threadPoolMaxSize", DEFAULT_THREAD_POOL_MAX_SIZE))
.withKeepAliveTimeMinutes(getThreadPoolProperties(url, "threadPoolKeepAliveTimeMinutes",
DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES));
}
/**
* 获取线程池大小
*
* @param url
* @return
*/
private static int getThreadPoolProperties(URL url, String name, int defaultProperties) {
if (url != null) {
int size = url.getParameter(name, defaultProperties);
if (logger.isDebugEnabled()) {
logger.debug(name + ": " + size);
}
return size;
}
return defaultProperties;
}
@Override
protected Result run() throws Exception {
Throwable exception;
Result result = null;
try {
result = invoker.invoke(invocation);
exception = result.getException();
} catch (Exception e) {
exception = e;
}
// 有异常抛出
if (exception != null) {
Logs.error("dubbo exception: ", exception);
throw new RuntimeException(exception);
}
return result;
}
}
作者介绍
- 郑亚腾 资深服务端开发工程师