return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } // 根据特定的策略执行命令: 线程池或者信号量 private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd){ // 线程池 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call(){ // 标记executionResult: 执行已发生 executionResult = executionResult.setExecutionOccurred(); // 将commandState设置为USER_CODE_EXECUTED。若设置失败,调用Observable.error方法返回Observable if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
//检查是否超时。若超时,调用Observable.error方法返回Observable if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { return Observable.error(new RuntimeException("timed out before executing run()")); } // 将 ThreadState 设置为 STARTED if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //增加 统计count HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread();
try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // 执行真正的请求 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override publicvoidcall(){ if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override publicvoidcall(){ if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call(){ return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { // 信号量 // 忽略 } } }
privatestaticclassHystrixObservableTimeoutOperator<R> implementsOperator<R, R> { // 忽略 @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); child.add(s); final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable( originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {