接上篇OKHttp源码解析
Okio源码解析
我们通过这篇文章可以看到,每个Interceptor的责任很清晰且单一,OKHttp设计就是巧妙在这里。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
从头开始看,首先是自定义interceptor,我们知道OKHttp提供了两种interceptor或者说两个位置的interceptor供我们使用,这里自定义的interceptor不再这篇的范畴之内,跳过它。
我们顺着request的顺序,从上往下分别是:
1. RetryAndFollowUpInterceptor
2. BridgeInterceptor
3. CacheInterceptor
4. ConnectInterceptor
5. CallServerInterceptor
/**
* How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox,
* curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5.
*/
private static final int MAX_FOLLOW_UPS = 20;//这里我们知道OKHttp支持的最大重定向次数为20
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//管理Connections,Streams和Calls的类,看名字知道是分配和回收Stream的管理类,这里先不铺展开来看。
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
//重定向的次数
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {//调用下一步interceptor的proceed,把整个chain串起来
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
}
...
//删掉了异常处理
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;
try {//看下面followUpRequest方法,判断链接是否是重定向的链接等等
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
if (followUp == null) {
streamAllocation.release();
return response;//唯一的return的地点
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {//最大重定向判断,超过抛异常出去
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
//返回null就默认不做处理,直接让intercept返回得到的response
private Request followUpRequest(Response userResponse, Route route) throws IOException {
if (userResponse == null) throw new IllegalStateException();
int responseCode = userResponse.code();//得到responseCode
final String method = userResponse.request().method();
switch (responseCode) {
case HTTP_PROXY_AUTH://HTTP Status-Code 407: Proxy Authentication Required.
Proxy selectedProxy = route != null
? route.proxy()
: client.proxy();
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
}
//我们看默认情况下的client.proxyAuthenticator()其实是Authenticator.NONE,它的authenticate返回的是null
return client.proxyAuthenticator().authenticate(route, userResponse);
case HTTP_UNAUTHORIZED://HTTP Status-Code 401: Unauthorized.
//一样的默认情况的client.authenticator()其实是Authenticator.NONE,它的authenticate返回的是null
return client.authenticator().authenticate(route, userResponse);
case HTTP_PERM_REDIRECT://Numeric status code, 307: Temporary Redirect.
case HTTP_TEMP_REDIRECT:
// "If the 307 or 308 status code is received in response to a request other than GET
// or HEAD, the user agent MUST NOT automatically redirect the request"
if (!method.equals("GET") && !method.equals("HEAD")) {
return null;
}
// fall-through
case HTTP_MULT_CHOICE://HTTP Status-Code 300: Multiple Choices.
case HTTP_MOVED_PERM://HTTP Status-Code 301: Moved Permanently.
case HTTP_MOVED_TEMP://HTTP Status-Code 302: Temporary Redirect.
case HTTP_SEE_OTHER://HTTP Status-Code 303: See Other.
// Does the client allow redirects?如果用户不允许重定向,直接返回
if (!client.followRedirects()) return null;
//找到重定向信息
String location = userResponse.header("Location");
if (location == null) return null;
HttpUrl url = userResponse.request().url().resolve(location);
// Don't follow redirects to unsupported protocols.
if (url == null) return null;
// If configured, don't follow redirects between SSL and non-SSL.
boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
if (!sameScheme && !client.followSslRedirects()) return null;
// Most redirects don't include a request body. 生成一个不含body的request
Request.Builder requestBuilder = userResponse.request().newBuilder();
if (HttpMethod.permitsRequestBody(method)) {
final boolean maintainBody = HttpMethod.redirectsWithBody(method);
if (HttpMethod.redirectsToGet(method)) {
requestBuilder.method("GET", null);
} else {
RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
requestBuilder.method(method, requestBody);
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding");
requestBuilder.removeHeader("Content-Length");
requestBuilder.removeHeader("Content-Type");
}
}
if (!sameConnection(userResponse, url)) {
requestBuilder.removeHeader("Authorization");
}
//重新build一个request
return requestBuilder.url(url).build();
case HTTP_CLIENT_TIMEOUT://HTTP Status-Code 408: Request Time-Out.
...
//条件判断返回null
return userResponse.request();
case HTTP_UNAVAILABLE://HTTP Status-Code 503: Service Unavailable.
if (userResponse.priorResponse() != null
&& userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
// We attempted to retry and got another timeout. Give up.
return null;
}
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
// specifically received an instruction to retry without delay
return userResponse.request();
}
return null;
default:
return null;
}
}
看名字知道这个interceptor是用来做重定向处理的,具体注释看代码里面的注释。
我们看,其实RetryAndFollowUpInterceptor上来把StreamAllocation new出来之后就直接调用了chain.preceed,因为chain一开始调用的时候是没有StreamAllocation的,StreamAllocation传了null(所以用户自定义的interceptor一开始是拿不到StreamAllocation的,也就是说RealInterceptorChain的streamAllocation属性是RetryAndFollowUpInterceptor这里才有的),也就是直接到后面请求去了,我们其实可以看到唯一正常return的地方是followUp == null的情况下,会把response,也就是重定向到没有重定向的那个链接,返回回去。
followUpRequest方法大致就是如果response是重定向的,就重新搞一个request出来,重新去走一遍请求流程。
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive");}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//这句之前是包装request
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
其实BridgeInterceptor看注释就很简答了,首先把app的代码包装成网络请求的参数,也就是把request的参数写成header里面的key-value,并放到一个String的List里面,Like Below:
final List<String> namesAndValues = new ArrayList<>(20);
/**
* Add a field with the specified value without any validation. Only appropriate for headers
* from the remote peer or cache.
*/
Builder addLenient(String name, String value) {
namesAndValues.add(name);
namesAndValues.add(value.trim());
return this;
}
然后把response的body解析出来赋给response。
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;//如果初始化cache的时候没有设置cache,cache就是null,那么这里得到的就是null
long now = System.currentTimeMillis();
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.网络不可用无缓存,返回fail
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done. 网络不可用,返回缓存的response
if (networkRequest == null) {
return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
}
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);//需要网络,获取网络response
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.缓存和网络都获取到,合并返回
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// 更新缓存 TODO:不知道这里为什么cache不判空
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
...
//update cache
return response;
}
大体流程上面写了,大体就是OKHttp自己实现了一个使用OKIO读写的DiskLruCache,实现在磁盘的缓存put-get操作,然后又一个类叫做CacheStrategy(顾名思义)来做缓存的策略控制,CacheStrategy用它的两个参数来做控制,分别是strategy.networkRequest和strategy.cacheResponse,控制着这两个参数的有无来做判断。
条件 | networkRequest | cacheResponse |
---|---|---|
没有缓存时 2. 请求采用 https 但是缓存没有进行握手的数据 3. 缓存不应该被保存(保留了一些不应该缓存的数据) 4. 请求添加了 Cache-Control:No-Cache, 或者一些条件请求首部,说明不希望使用缓存 |
传入的request | 空 |
缓存响应首部包含 Cache-Control:immutable (不属于 http 协议),说明资源不会改变 |
空 | 传入的缓存响应 |
新鲜度验证通过 | 空 | 传入的缓存响应 (可能会添加一些首部) |
新鲜度验证不通过,使用 Etag 或 Last-Modified 或 Date 首部构造条件请求并返回 |
条件请求 | 传入的缓存响应 |
新鲜度验证不通过,且缓存响应没有 Etag、 Last-Modified 和 Date 中的任何一个 |
传入的request | 空 |
上述 5 种情况中 networkRequest 不为空时,若请求通过 Cache-Control:only-if-cached 只允许我们使用当前缓存 | 空 | 空 |
networkRequest | cacheResponse | 处理方法 |
---|---|---|
空 | 空 | 直接返回 504 Unsatisfiable Request (only-if-cached)响应。 |
空 | 非空 | 说明缓存有效,直接返回 cacheResponse |
非空 | 空 | 说明需要向网络发送请求(原始 request 或新鲜度验证后的条件请求): 1. 如果有缓存数据,在获得再验证的响应后,使用 cache 的 update 方法更新缓存 2. 如果没有缓存数据,判断请求是否可以被缓存,可以的话就使用 cache 的 put 方法缓存下来 |
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
ConnectInterceptor相对之前的interceptor代码就简短了很多。这里我们看用到了StreamAllocation,我们应该还记得在RetryAndFollowUpInterceptor中创建的StreamAllocation。
StreamAllocation这个类协调了下面三个类的关系:
- Connections:连接到远程服务的物理连接,他们创建可能会很慢,所以我们应该能够取掉刚刚创建的连接。
- Streams:位于connection上的逻辑HTTP request/response对。每个连接有它自己的分配限制,定义了每个连接能承载的流,HTTP/1.x每次能承载一条连接,http/2.x的特色是能承载多条。
- Calls:一个逻辑上的streams的序列,一个初始化的request和它的后续的requests。我们期望能在同一个connection的里面用一个call,这样能有更好的表现。
StreamAllocation支持cancel(异步的)。这样做的目的是可能会有最小的爆炸半径。如果Http/2的流正在活动,这时候cancel可能会cancel这个流,但不是其他共享它的connection的流。但是如果TLS正在握手中,这可能会终止整个握手过程。
我们继续看ConnectInterceptor,其实就做了两个事儿,先获取了一个HttpCodec,newStream
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {//获取一个健康的Connection
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
继续看到newCodec方法里面,
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
我们发现有返回了两种HttpCodec,Http1.x跟Http2.x,我们看到ConnectInterceptor拿到了HttpCodec和RealConnection继续调起后面的Interceptor。
这是整个chain的最后一步了。
public final class CallServerInterceptor implements Interceptor {
private final boolean forWebSocket;
public CallServerInterceptor(boolean forWebSocket) {
this.forWebSocket = forWebSocket;
}
@Override public Response intercept(Chain chain) throws IOException {
//获取到之前创建的各种工具类
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);//准备好http的头信息,并发送给server
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
//是否接收到服务器的100-continue,表明允许客户端发送body。
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {//主要请求方法入口,服务器允许发送body
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();//获取请求体的长度
//构建一个Sink
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
//写入请求体
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
//构建response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
...
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
//写入消息体
.body(httpCodec.openResponseBody(response))
.build();
}
...
return response;
}
static final class CountingSink extends ForwardingSink {
long successfulCount;
CountingSink(Sink delegate) {
super(delegate);
}
@Override public void write(Buffer source, long byteCount) throws IOException {
super.write(source, byteCount);
successfulCount += byteCount;
}
}
}
这里就不需要继续往后调chain的interceptor了,它的任务就是获取到之前准备好的各个类和参数,获取到response返回回去就好了。
其实简单来说的话就是四步
- 给服务器发送消息头
- 给服务器发送消息体
- 根据request信息构建response
- 获得服务器返回的消息体
然后组装成response返回给chain。
网友评论