本节简单介绍 feign-hystrix-ribbon 的超时问题,由于对RxJava不熟悉,出现错误的地方请见谅。
Hystrix 和 Ribbon 超时配置 hystrix的超时时间 和 ribbon的超时时间 正确关系:
1 hystrixTimeout > (ribbonReadTimeout + ribbonConnectTimeout) * (maxAutoRetries + 1) * (maxAutoRetriesNextServer + 1)
##Ribbon 的 ConnectTimeout 和 ReadTimeout 生效位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class FeignLoadBalancer extends AbstractLoadBalancerAwareClient <FeignLoadBalancer .RibbonRequest , FeignLoadBalancer .RibbonResponse > { @Override public RibbonResponse execute (RibbonRequest request, IClientConfig configOverride) throws IOException { Request.Options options; if (configOverride != null ) { RibbonProperties override = RibbonProperties.from(configOverride); options = new Request.Options( override.connectTimeout(this .connectTimeout), override.readTimeout(this .readTimeout)); } else { options = new Request.Options(this .connectTimeout, this .readTimeout); } Response response = request.client().execute(request.toRequest(), options); return new RibbonResponse(request.getUri(), response); } }
Hystrix 超时生效 Hystrix 任务的执行 分为 两个部分: 业务线程和超时检测线程。这两个部分是并发执行的。 谁先更改任务执行的状态,就会主动停止另一方的执行。如下图:
AbstractCommand.executeCommandAndObserve()
并行执行业务代码和超时检测代码。
设置服务降级
等等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 abstract class AbstractCommand <R > implements HystrixInvokableInfo <R >, HystrixObservable <R > { private Observable<R> executeCommandAndObserve (final AbstractCommand<R> _cmd) { final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call (Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { } } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } }
HystrixObservableTimeoutOperator
启动一个定时任务: 以 executionTimeoutInMilliseconds 为周期、延迟 executionTimeoutInMilliseconds 来执行。 一旦 tick() 进入 if 语句,则抛出 超时异常。
添加一个观察者,当原任务执行完成后,取消 定时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 private static class HystrixObservableTimeoutOperator <R > implements Operator <R , R > { @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); child.add(s); final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); TimerListener listener = new TimerListener() { @Override public void tick () { if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable( originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run () { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); } } @Override public int getIntervalTimeInMilliseconds () { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); originalCommand.timeoutTimer.set(tl); Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted () { if (isNotTimedOut()) { tl.clear(); child.onCompleted(); } } @Override public void onError (Throwable e) { if (isNotTimedOut()) { tl.clear(); child.onError(e); } } @Override public void onNext (R v) { if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut () { return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; s.add(parent); return parent; } }
相关文章 https://deanwangpro.com/2018/04/13/zuul-hytrix-ribbon-timeout/ https://www.jianshu.com/p/60074fe1bd86 https://github.com/spring-cloud/spring-cloud-netflix/blob/master/spring-cloud-netflix-zuul/src/main/java/org/springframework/cloud/netflix/zuul/filters/route/support/AbstractRibbonCommand.java
Hystrix超时处理和异常类型 Hystrix超时机制实现原理
http://www.itmuch.com/spring-cloud-sum/spring-cloud-timeout/