您好,登錄后才能下訂單哦!
小編給大家分享一下Spring Cloud中Zuul重試機制的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
具體內容如下:
開啟Zuul功能
通過源碼了解Zuul的一次轉發
怎么開啟zuul的重試機制
Edgware.RC1版本的優化
開啟Zuul的功能
首先如何使用spring cloud zuul完成路由轉發的功能,這個問題很簡單,只需要進行如下準備工作即可:
注冊中心(Eureka Server)
zuul(同時也是Eureka Client)
應用服務(同時也是Eureka Client)
我們希望zuul和后端的應用服務同時都注冊到Eureka Server上,當我們訪問Zuul的某一個地址時,對應其實訪問的是后端應用的某個地址,從而從這個地址返回一段內容,并展現到瀏覽器上。
注冊中心(Eureka Server)
創建一個Eureka Server只需要在主函數上添加@EnableEurekaServer,并在properties文件進行簡單配置即可,具體內容如下:
@EnableEurekaServer @RestController @SpringBootApplication public class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class, args); } }
server.port=8761 eureka.client.register-with-eureka=false eureka.client.fetch-registry=false
Zuul
主函數添加@EnableZuulProxy注解(因為集成Eureka,需要另外添加@EnableDiscoveryClient注解)。并配置properties文件,具體內容如下所示:
@EnableZuulProxy @EnableDiscoveryClient @SpringBootApplication public class ZuulDemoApplication { /** * 省略代碼... */ }
server.port=8081 spring.application.name=ZUUL-CLIENT zuul.routes.api-a.serviceId=EUREKA-CLIENT zuul.routes.api-a.path=/api-a/** eureka.client.service-url.defaultZone=http://localhost:8761/eureka
應用服務
@RestController @EnableEurekaClient @SpringBootApplication public class EurekaClientApplication { public static void main(String[] args) { SpringApplication.run(EurekaClientApplication.class, args); } @RequestMapping(value = "/hello") public String index() { return "hello spring..."; } }
spring.application.name=EUREKA-CLIENT eureka.client.service-url.defaultZone=http://localhost:8761/eureka
三個工程全部啟動,這時當我們訪問localhost:8081/api-a/hello時,你會看到瀏覽器輸出的內容是hello spring...
通過源碼了解Zuul的一次轉發
接下來我們通過源碼層面來了解下,一次轉發內部都做了哪些事情。
首先我們查看Zuul的配置類ZuulProxyAutoConfiguration在這個類中有一項工作是初始化Zuul默認自帶的Filter,其中有一個Filter很重要,它就是RibbonRoutingFilter。它主要是完成請求的路由轉發。接下來我們看下他的run方法
@Override public Object run() { RequestContext context = RequestContext.getCurrentContext(); try { RibbonCommandContext commandContext = buildCommandContext(context); ClientHttpResponse response = forward(commandContext); setResponse(response); return response; } catch (ZuulException ex) { throw new ZuulRuntimeException(ex); } catch (Exception ex) { throw new ZuulRuntimeException(ex); } }
可以看到進行轉發的方法是forward,我們進一步查看這個方法,具體內容如下:
省略部分代碼
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception { RibbonCommand command = this.ribbonCommandFactory.create(context); try { ClientHttpResponse response = command.execute(); return response; } catch (HystrixRuntimeException ex) { return handleException(info, ex); } }
ribbonCommandFactory指的是HttpClientRibbonCommandFactory這個類是在RibbonCommandFactoryConfiguration完成初始化的(觸發RibbonCommandFactoryConfiguration的加載動作是利用ZuulProxyAutoConfiguration類上面的@Import標簽),具體代碼如下:
@Configuration @ConditionalOnRibbonHttpClient protected static class HttpClientRibbonConfiguration { @Autowired(required = false) private Set<ZuulFallbackProvider> zuulFallbackProviders = Collections.emptySet(); @Bean @ConditionalOnMissingBean public RibbonCommandFactory<?> ribbonCommandFactory( SpringClientFactory clientFactory, ZuulProperties zuulProperties) { return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, zuulFallbackProviders); } }
知道了這個ribbonCommandFactory具體的實現類(HttpClientRibbonCommandFactory),接下來我們看看它的create方法具體做了那些事情
@Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId()); final String serviceId = context.getServiceId(); final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient( serviceId, RibbonLoadBalancingHttpClient.class); client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, clientFactory.getClientConfig(serviceId)); }
這個方法按照我的理解主要做了以下幾件事情:
@Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { /** *獲取所有ZuulFallbackProvider,即當Zuul *調用失敗后的降級方法 */ ZuulFallbackProvider = xxxxx /** *創建處理請求轉發類,該類會利用 *Apache的Http client進行請求的轉發 */ RibbonLoadBalancingHttpClient = xxxxx /** *將降級方法、處理請求轉發類、以及其他一些內容 *包裝成HttpClientRibbonCommand(這個類繼承了HystrixCommand) */ return new HttpClientRibbonCommand(xxxxx); }
到這里我們很清楚的知道了RibbonRoutingFilter類的forward方法中RibbonCommand command = this.ribbonCommandFactory.create(context);這一行代碼都做了哪些內容.
接下來調用的是command.execute();方法,通過剛剛的分析我們知道了command其實指的是HttpClientRibbonCommand,同時我們也知道HttpClientRibbonCommand繼承了HystrixCommand所以當執行command.execute();時其實執行的是HttpClientRibbonCommand的run方法。查看源碼我們并沒有發現run方法,但是我們發現HttpClientRibbonCommand直接繼承了AbstractRibbonCommand。所以其實執行的是AbstractRibbonCommand的run方法,接下來我們看看run方法里面都做了哪些事情:
@Override protected ClientHttpResponse run() throws Exception { final RequestContext context = RequestContext.getCurrentContext(); RQ request = createRequest(); RS response = this.client.executeWithLoadBalancer(request, config); context.set("ribbonResponse", response); if (this.isResponseTimedOut()) { if (response != null) { response.close(); } } return new RibbonHttpResponse(response); }
可以看到在run方法中會調用client的executeWithLoadBalancer方法,通過上面介紹我們知道client指的是RibbonLoadBalancingHttpClient,而RibbonLoadBalancingHttpClient里面并沒有executeWithLoadBalancer方法。(這里面會最終調用它的父類AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法。)
具體代碼如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { /** * 創建一個RetryHandler,這個很重要它是用來 * 決定利用RxJava的Observable是否進行重試的標準。 */ RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig); /** * 創建一個LoadBalancerCommand,這個類用來創建Observable * 以及根據RetryHandler來判斷是否進行重試操作。 */ LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) .withRetryHandler(handler) .withLoadBalancerURI(request.getUri()) .build(); try { /** *command.submit()方法主要是創建了一個Observable(RxJava) *并且為這個Observable設置了重試次數,這個Observable最終 *會回調AbstractLoadBalancerAwareClient.this.execute() *方法。 */ return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }
下面針對于每一塊內容做詳細說明:
首先getRequestSpecificRetryHandler(request, requestConfig);這個方法其實調用的是RibbonLoadBalancingHttpClient的getRequestSpecificRetryHandler方法,這個方法主要是返回一個RequestSpecificRetryHandler
@Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) { /** *這個很關鍵,請注意該類構造器中的前兩個參數的值 *正因為一開始我也忽略了這兩個值,所以后續給我造 *成一定的干擾。 */ return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, requestConfig); }
接下來創建LoadBalancerCommand并將上一步獲得的RequestSpecificRetryHandler作為參數內容。
最后調用LoadBalancerCommand的submit方法。該方法內容太長具體代碼細節就不在這里貼出了,按照我個人的理解,只貼出相應的偽代碼:
public Observable<T> submit(final ServerOperation<T> operation) { //相同server的重試次數(去除首次請求) final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); //集群內其他Server的重試個數 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); /** *創建一個Observable(RxJava),selectServer()方法是 *利用Ribbon選擇一個Server,并將其包裝成Observable */ Observable<T> o = selectServer().concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { /** *這里會回調submit方法入參ServerOperation類的call方法, */ return operation.call(server).doOnEach(new Observer<T>() {} } } if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { /** *轉發請求失敗時,會進入此方法。通過此方法進行判斷 *是否超過重試次數maxRetrysSame、maxRetrysNext。 */ } }); }
operation.call()方法最終會調用RibbonLoadBalancingHttpClient的execute方法,該方法內容如下:
@Override public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { /** * 組裝參數(RequestConfig) */ final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this.config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this.connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this.readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this.followRedirects)); final RequestConfig requestConfig = builder.build(); if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(request.getUri()) .scheme("https").build().toUri(); request = request.withNewUri(secureUri); } final HttpUriRequest httpUriRequest = request.toRequest(requestConfig); /** * 發送轉發請求 */ final HttpResponse httpResponse = this.delegate.execute(httpUriRequest); /** * 返回結果 */ return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); }
可以看到上面方法主要做的就是組裝請求參數(包括各種超時時間),然后發起轉發請求,最終獲取相應結果。
說到這里,zuul轉發一次請求的基本原理就說完了。讓我們再回顧下整個流程。
zuul的轉發是通過RibbonRoutingFilter這個Filter進行操作的。
在轉發之前,zuul利用Hystrix將此次轉發請求包裝成一個HystrixCommand,正應為這樣才使得zuul具有了降級(Fallback)的功能,同時HystrixCommand是具備超時時間的(默認是1s)。而且Zuul默認采用的隔離級別是信號量模式。
在HystrixCommand內部zuul再次將請求包裝成一個Observable,(有關RxJava的知識請參照其官方文檔)。并且為Observable設置了重試次數。
事實真的是這樣嗎?當我看到源碼中為Observable設置重試次數的時候,我以為這就是zuul的重試邏輯。遺憾的是我的想法是錯誤的。還記得上面我說的getRequestSpecificRetryHandler(request, requestConfig);這個方法嗎?(不記得的同學可以回過頭來再看下),這個方法返回的是RequestSpecificRetryHandler這個類,而且在創建該類時,構造器的前兩個參數都為false。(這一點非常重要)。這兩個參數分別是okToRetryOnConnectErrors和okToRetryOnAllErrors。
我原本的想法是這個請求被包裝成Observable,如果這次請求因為超時出現異常或者其他異常,這樣就會觸發Observable的重試機制(RxJava),但是事實并非如此,為什么呢?原因就是上面的那兩個參數,當出現了超時異常的時候,在觸發重試機制之前會調用RequestSpecificRetryHandler的isRetriableException()方法,該方法的作用是用來判斷是否執行重試動作,具體代碼如下:
@Override public boolean isRetriableException(Throwable e, boolean sameServer) { //此時該值為false if (okToRetryOnAllErrors) { return true; } else if (e instanceof ClientException) { ClientException ce = (ClientException) e; if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) { return !sameServer; } else { return false; } } else { //此時該值為false return okToRetryOnConnectErrors && isConnectionException(e); } }
說道這里zuul轉發一次請求的基本原理大概了解了,同時也驗證了一個事實就是實現zuul進行重試的邏輯并不是Observable的重試機制。那么問題來了?是什么使zuul具有重試功能的呢?
怎么開啟zuul的重試機制
開啟Zuul重試的功能在原有的配置基礎上需要額外進行以下設置:
在pom中添加spring-retry的依賴(maven工程)
設置zuul.retryable=true(該參數默認為false)
具體properties文件內容如下:
server.port=8081 spring.application.name=ZUUL-CLIENT #路由信息 zuul.routes.api-a.serviceId=EUREKA-CLIENT zuul.routes.api-a.path=/api-a/** #是否開啟重試功能 zuul.retryable=true #同一個Server重試的次數(除去首次) ribbon.MaxAutoRetries=3 #切換相同Server的次數 ribbon.MaxAutoRetriesNextServer=0 eureka.client.service-url.defaultZone=http://localhost:8761/eureka
為了模擬出Zuul重試的功能,需要對后端應用服務進行改造,改造后的內容如下:
@RequestMapping(value = "/hello") public String index() { System.out.println("request is coming..."); try { Thread.sleep(100000); } catch (InterruptedException e) { System.out.println("線程被打斷... " + e.getMessage()); } return "hello spring ..."; }
通過使用Thread.sleep(100000)達到Zuul轉發超時情況(Zuul默認連接超時未2s、read超時時間為5s),從而觸發Zuul的重試功能。這時候在此訪問localhost:8081/api-a/hello時,查看應用服務后臺,會發現最終打印三次"request is coming..."
通過現象看本質,接下來簡單介紹下Zuul重試的原理。首先如果你工程classpath中存在spring-retry,那么zuul在初始化的時候就不會創建RibbonLoadBalancingHttpClient而是創建RetryableRibbonLoadBalancingHttpClient具體源代碼如下:
@ConditionalOnClass(name = "org.apache.http.client.HttpClient") @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true) public class HttpClientRibbonConfiguration { @Value("${ribbon.client.name}") private String name = "client"; @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate") public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler) { RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient( config, serverIntrospector); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory) { RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient( config, serverIntrospector, loadBalancedRetryPolicyFactory); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } }
所以請求到來需要轉發的時候(AbstractLoadBalancerAwareClient類中executeWithLoadBalancer方法會調用AbstractLoadBalancerAwareClient.this.execute())其實調用的是RetryableRibbonLoadBalancingHttpClient的execute方法(而不是沒有重試時候RibbonLoadBalancingHttpClient的execute方法),源碼內容如下:
@Override public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this.config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this.connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this.readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this.followRedirects)); final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this); RetryCallback retryCallback = new RetryCallback() { @Override public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception { //on retries the policy will choose the server and set it in the context //extract the server and update the request being made RibbonApacheHttpRequest newRequest = request; if(context instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance(); if(service != null) { //Reconstruct the request URI using the host and port set in the retry context newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(), newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(), newRequest.getURI().getPath(), newRequest.getURI().getQuery(), newRequest.getURI().getFragment())); } } if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri()) .scheme("https").build().toUri(); newRequest = newRequest.withNewUri(secureUri); } HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig); final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest); if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) { if(CloseableHttpResponse.class.isInstance(httpResponse)) { ((CloseableHttpResponse)httpResponse).close(); } throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName, httpResponse.getStatusLine().getStatusCode()); } return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } }; return this.executeWithRetry(request, retryPolicy, retryCallback); }
executeWithRetry方法內容如下:
private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception { RetryTemplate retryTemplate = new RetryTemplate(); boolean retryable = request.getContext() == null ? true : BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true); retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy() : new RetryPolicy(request, retryPolicy, this, this.getClientName())); return retryTemplate.execute(callback); }
按照我的理解,主要邏輯如下:
@Override public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { /** *創建RequestConfig(請求信息) */ final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this); /** * 創建RetryCallbck的實現類,用來完成重試邏輯 */ RetryCallback retryCallback = new RetryCallback() {}; //創建Spring-retry的模板類,RetryTemplate。 RetryTemplate retryTemplate = new RetryTemplate(); /** *設置重試規則,即在什么情況下進行重試 *什么情況下停止重試。源碼中這部分存在 *一個判斷,判斷的根據就是在zuul工程 *的propertris中配置的zuul.retryable *該參數內容為true才可以具有重試功能。 */ retryTemplate.setRetryPolicy(xxx); /** *發起請求 */ return retryTemplate.execute(callback); }
以上是“Spring Cloud中Zuul重試機制的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。