记线上Dubbo调用异常排查处理

一、简单介绍

  • suishen-esb中,提供了Dubbo对Hystrix的集成;
  • Hystrix内部使用了线程池完成具体的任务执行;
  • 每一个远程service使用独立的线程池;
  • 内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常);
  • 当瞬时并发数超出最大线程数时,dubbo调用执行异常。

二、事件脉络

  • 用户反馈使用异常,紧急查看日志
org.apache.dubbo.rpc.RpcException: Failed to invoke the method validLoginAuthentication in the service weli.wormhole.rpc.user.center.api.IAuthenticationService. Tried 1 times of the providers [10.65.0.205:11090] (1/4) from the registry node1.zk.all.platform.wtc.hwhosts.com:2181 on the consumer 10.65.0.34 using the dubbo version 2.7.3-SNAPSHOT. Last error is: validLoginAuthentication_1 could not be queued for execution and fallback failed.  
    at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:113)
    at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:248)
    at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:78)
    at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:55)
    at org.apache.dubbo.common.bytecode.proxy14.validLoginAuthentication(proxy14.java)
    at weli.peanut.web.interceptor.VerifyLoginInterceptor.preHandle(VerifyLoginInterceptor.java:134)
    at org.springframework.web.servlet.HandlerExecutionChain.applyPreHandle(HandlerExecutionChain.java:134)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:958)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:98)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1721)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1679)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: validLoginAuthentication_1 could not be queued for execution and fallback failed.  
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:818)
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:793)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1454)
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1379)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.observers.Subscribers$5.onError(Subscribers.java:230)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.Observable.unsafeSubscribe(Observable.java:10158)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:10247)
    at rx.Observable.subscribe(Observable.java:10214)
    at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)
    at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:411)
    at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:377)
    at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:343)
    at suishen.esb.hystrix.dubbo.filter.HystrixFilter.invoke(HystrixFilter.java:46)
    at com.alibaba.dubbo.rpc.Filter.invoke(Filter.java:29)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:92)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:58)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$CallbackRegistrationInvoker.invoke(ProtocolFilterWrapper.java:157)
    at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
    at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:82)
    ... 36 common frames omitted
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1bf63b71 rejected from java.util.concurrent.ThreadPoolExecutor@6fb1f813[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 0]  
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    ... 91 common frames omitted
  • 根据日志,发现是由于线程池打满造成了任务的拒绝执行,起初认为是提供方的dubbo线程池被打满,迅速排查中台日志;
{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.23 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:0,当前线程数:400,队列大小:0,任务总量:48363297,已完成任务量:48363297","logger_name":"wormholeBusiness","thread_name":"qbScheduler-6","level":"INFO","level_value":20000}
{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.44 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:2,当前线程数:400,队列大小:0,任务总量:48371189,已完成任务量:48371187","logger_name":"wormholeBusiness","thread_name":"qbScheduler-3","level":"INFO","level_value":20000}
  • 发现中台服务正常,dubbo空闲线程也比较充裕;
  • 在回头看调用方异常信息,发现调用方使用了Hystrix,抛出异常的是Hystrix内部的线程池;
  • 此时紧急增加节点,重启服务后,业务开始正常。

三、问题分析

  • 根据日志分析,是由HystrixFilter执行了HystrixCommand.execute()造成了异常。
@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {

    public HystrixFilter() {
        ApplicationContext springContext = ApplicationContextHolder.getContext();
        if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {
            BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();
            if (beanFactory instanceof DefaultListableBeanFactory) {
                BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);
                beanDefinitionBuilder.setDestroyMethodName("preDestroy");
                beanDefinitionBuilder.setScope("singleton");

                ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());

                //触发初始化
                beanFactory.getBean(HystrixSpringService.class.getSimpleName());
            }
        }
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 异步调用使用hystrix做熔断没有意义
        if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))) {
            return invoker.invoke(invocation);
        }

        DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
        return command.execute();
    }

}
  • 当执行的dubbo调用为同步调用时,会创建DubboHystrixCommand,交由Hystrix执行远程调用。
public class DubboHystrixCommand extends HystrixCommand<Result> {

    private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
    private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;
    private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000;
    private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
    private Invoker invoker;
    private Invocation invocation;

    public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //10秒钟内请求失败上限值,超过此值熔断器发挥作用
                        .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
                        //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl()))
                        //错误率达到50开启熔断保护
                        .withCircuitBreakerErrorThresholdPercentage(50)
                        //使用dubbo的超时,禁用这里的超时
                        .withExecutionTimeoutEnabled(false))
                //根据dubbo配置设置线程池大小
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));


        this.invoker = invoker;
        this.invocation = invocation;
    }

    /**
     * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
     *
     * @param url
     * @return
     */
    private static int getFailQpsThreshold(URL url) {
        if (url != null) {
            int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
            if (logger.isDebugEnabled()) {
                logger.debug("FailQpsThreshold: " + threshold);
            }
            return threshold;
        }

        return DEFAULT_FAIL_QPS_THRESHOLD;
    }

    /**
     * 获取熔断器中断请求窗口大小
     *
     * @param url
     * @return
     */
    private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) {
        if (url != null) {
            int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS);
            if (logger.isDebugEnabled()) {
                logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds);
            }
            return circuitBreakerSleepWindowInMilliseconds;
        }

        return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;

    }

    /**
     * 获取线程池大小
     *
     * @param url
     * @return
     */
    private static int getThreadPoolCoreSize(URL url) {
        if (url != null) {
            int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);
            if (logger.isDebugEnabled()) {
                logger.debug("ThreadPoolCoreSize: " + size);
            }
            return size;
        }

        return DEFAULT_THREADPOOL_CORE_SIZE;

    }

    @Override
    protected Result run() throws Exception {
        Throwable exception = null;

        Result result = null;

        try {
            result = invoker.invoke(invocation);
            exception = result.getException();
        } catch (Exception e) {
            exception = e;
        }

        // 这里打印异常是为了记录异常,再抛出异常是为了触发fallback
        if (exception != null) {
            Logs.error("Dubbo Exception: ", exception);
            throw new Exception(exception);
        }

        return result;
    }

    @Override
    protected Result getFallback() {
        return new RpcResult((Object) null);
    }
}

看到这里终于找到了根本问题:

  • 内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常);
  • 当瞬时并发数超出最大线程数时,dubbo调用执行异常。

四、处理

1、修改HystrixFilter,提供是否使用Hystrix开关,对于签名验证等核心接口,选择同步执行。

@Activate(group = Constants.CONSUMER)
public class HystrixFilter implements Filter {

    public HystrixFilter() {
        ApplicationContext springContext = ApplicationContextHolder.getContext();
        if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {
            BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();
            if (beanFactory instanceof DefaultListableBeanFactory) {
                BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);
                beanDefinitionBuilder.setDestroyMethodName("preDestroy");
                beanDefinitionBuilder.setScope("singleton");

                ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());

                //触发初始化
                beanFactory.getBean(HystrixSpringService.class.getSimpleName());
            }
        }
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 异步调用使用hystrix做熔断没有意义
        if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))
                || "false".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "hystrixOpen"))) {
            return invoker.invoke(invocation);
        }

        DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
        return command.execute();
    }

}

2、调整DubboHystrixCommand中线程池参数,增加最大线程数配置、线程活跃时间及等待队列大小

public class DubboHystrixCommand extends HystrixCommand<Result> {

    private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);
    private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 30;
    private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 50;
    private static final int DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES = 5;

    private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;
    private Invoker invoker;
    private Invocation invocation;

    public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName()))
                .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),
                        invocation.getArguments() == null ? 0 : invocation.getArguments().length)))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //10秒钟内请求失败上限值,超过此值熔断器发挥作用
                        .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)
                        //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试
                        .withCircuitBreakerSleepWindowInMilliseconds(30000)
                        //错误率达到50开启熔断保护
                        .withCircuitBreakerErrorThresholdPercentage(50)
                        //使用dubbo的超时,禁用这里的超时
                        .withExecutionTimeoutEnabled(false))
                //根据dubbo配置设置线程池大小
                .andThreadPoolPropertiesDefaults(getThreadPoolSetter(invoker.getUrl())));


        this.invoker = invoker;
        this.invocation = invocation;
    }

    /**
     * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效
     *
     * @param url
     * @return
     */
    private static int getFailQpsThreshold(URL url) {
        if (url != null) {
            int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);
            if (logger.isDebugEnabled()) {
                logger.debug("FailQpsThreshold: " + threshold);
            }
            return threshold;
        }

        return DEFAULT_FAIL_QPS_THRESHOLD;
    }

    private static HystrixThreadPoolProperties.Setter getThreadPoolSetter(URL url) {
        return HystrixThreadPoolProperties.Setter()
                .withCoreSize(getThreadPoolProperties(url, "threadPoolCoreSize", DEFAULT_THREAD_POOL_CORE_SIZE))
                .withMaximumSize(getThreadPoolProperties(url, "threadPoolMaxSize", DEFAULT_THREAD_POOL_MAX_SIZE))
                .withKeepAliveTimeMinutes(getThreadPoolProperties(url, "threadPoolKeepAliveTimeMinutes",
                        DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES));
    }

    /**
     * 获取线程池大小
     *
     * @param url
     * @return
     */
    private static int getThreadPoolProperties(URL url, String name, int defaultProperties) {
        if (url != null) {
            int size = url.getParameter(name, defaultProperties);
            if (logger.isDebugEnabled()) {
                logger.debug(name + ": " + size);
            }
            return size;
        }
        return defaultProperties;
    }

    @Override
    protected Result run() throws Exception {
        Throwable exception;

        Result result = null;
        try {
            result = invoker.invoke(invocation);
            exception = result.getException();
        } catch (Exception e) {
            exception = e;
        }

        // 有异常抛出
        if (exception != null) {
            Logs.error("dubbo exception: ", exception);
            throw new RuntimeException(exception);
        }

        return result;
    }

}

作者介绍

  • 郑亚腾 资深服务端开发工程师

微鲤技术团队

微鲤技术团队承担了中华万年历、Maybe、蘑菇语音、微鲤游戏高达3亿用户的产品研发工作,并构建了完备的大数据平台、基础研发框架、基础运维设施。践行数据驱动理念,相信技术改变世界。