本文共 8573 字,大约阅读时间需要 28 分钟。
之前在这篇文章中,介绍过Spring Cloud Hystrix的使用,这里再回顾下:
1.启动类添加@EnableHystrix注解。 2.方法上添加@HystrixCommand注解,并指定fallback的方法。既然加了@EnableHystrix注解,那还是先进入这个注解看吧:
/** * Convenience annotation for clients to enable Hystrix circuit breakers (specifically). * Use this (optionally) in case you want discovery and know for sure that it is Hystrix * you want. All it does is turn on circuit breakers and let the autoconfiguration find * the Hystrix classes if they are available (i.e. you need Hystrix on the classpath as * well). * * @author Dave Syer * @author Spencer Gibb */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@EnableCircuitBreakerpublic @interface EnableHystrix {}
这个注解的功能就是开启Hystrix。这个注解还引入了@EnableCircuitBreaker注解。
在代码同一级目录下,还可以看到两个配置类:HystrixAutoConfiguration和HystrixCircuitBreakerConfiguration。
下面是HystrixAutoConfiguration配置类的配置:@Configuration@ConditionalOnClass({ Hystrix.class, HealthIndicator.class })@AutoConfigureAfter({ HealthIndicatorAutoConfiguration.class })public class HystrixAutoConfiguration { @Bean @ConditionalOnEnabledHealthIndicator("hystrix") public HystrixHealthIndicator hystrixHealthIndicator() { return new HystrixHealthIndicator(); }}
从代码中可以看到,HystrixAutoConfiguration这个配置类主要是hystrix的健康检查的配置。再看下HystrixCircuitBreakerConfiguration这个类,这个类里面就配置了很多内容。
@Beanpublic HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect();}
这里返回了HystrixCommandAspect的bean,这个切面中定义了Pointcut:
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")public void hystrixCommandAnnotationPointcut() {}@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")public void hystrixCollapserAnnotationPointcut() {}
所以,这个Aspect就是利用AOP切面对 HystrixCommand 、 HystrixCollapser 两种注解的方法进行扩展处理。
我们在方法上添加@HystrixCommand注解,就会经过这个切面,这个切面中定义了@Around(…)拦截所有请求。下面看下这个方法:
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { result = CommandExecutor.execute(invokable, executionType, metaHolder); } catch (HystrixBadRequestException e) { throw e.getCause(); } return result;}
这个方法中,一开始先获取拦截的Method,然后判断,如果方法上同时加了@HystrixCommand和@HystrixCollapser两个注解的话,就抛异常。
在创建MetaHolder的时候,调用了MetaHolderFactory的create方法,MetaHolderFactory有两个子类,CollapserMetaHolderFactory和CommandMetaHolderFactory,最终执行的是子类的create方法,下面是CommandMetaHolderFactory中的create方法:private static class CommandMetaHolderFactory extends MetaHolderFactory { @Override public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) { HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class); ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType()); MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint); return builder.defaultCommandKey(method.getName()) .hystrixCommand(hystrixCommand) .observableExecutionMode(hystrixCommand.observableExecutionMode()) .executionType(executionType) .observable(ExecutionType.OBSERVABLE == executionType) .build(); }}
MetaHolder.Builder metaHolderBuilder(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) { MetaHolder.Builder builder = MetaHolder.builder() .args(args).method(method).obj(obj).proxyObj(proxy) .defaultGroupKey(obj.getClass().getSimpleName()) .joinPoint(joinPoint); if (isCompileWeaving()) { builder.ajcMethod(getAjcMethodFromTarget(joinPoint)); } FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), method); if (fallbackMethod.isPresent()) { fallbackMethod.validateReturnType(method); builder .fallbackMethod(fallbackMethod.getMethod()) .fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType())); } return builder;}
public HystrixInvokable create(MetaHolder metaHolder) { HystrixInvokable executable; if (metaHolder.isCollapserAnnotationPresent()) { executable = new CommandCollapser(metaHolder); } else if (metaHolder.isObservable()) { executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else { executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable;}
这段代码里定义了后续真正执行HystrixCommand的GenericCommand实例
方法最终会去执行CommandExecutor.execute方法:
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); }}
/** * Used for synchronous execution of command. * * @return R * Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason. * @throws HystrixRuntimeException * if a failure occurs and a fallback cannot be retrieved * @throws HystrixBadRequestException * if invalid arguments or state were used representing a user failure, not a system failure * @throws IllegalStateException * if invoked more than once */public R execute() { try { return queue().get(); } catch (Exception e) { throw decomposeException(e); }}
@Overrideprotected Object getFallback() { if (getFallbackAction() != null) { final CommandAction commandAction = getFallbackAction(); try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(commandAction.getMetaHolder().getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(e.getCause()); } } else { return super.getFallback(); }}
看到这里,大体的一个流程也就知道了,就是通过HystrixCommandAspect,请求成功返回接口的结果,请求失败执行fallback的逻辑。感兴趣的同学可以再仔细梳理下代码。
转载地址:http://hxadi.baihongyu.com/