美文网首页
OKHttp源码 - Interceptor

OKHttp源码 - Interceptor

作者: YocnZhao | 来源:发表于2019-02-25 14:16 被阅读0次

接上篇OKHttp源码解析
Okio源码解析
我们通过这篇文章可以看到,每个Interceptor的责任很清晰且单一,OKHttp设计就是巧妙在这里。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

从头开始看,首先是自定义interceptor,我们知道OKHttp提供了两种interceptor或者说两个位置的interceptor供我们使用,这里自定义的interceptor不再这篇的范畴之内,跳过它。
我们顺着request的顺序,从上往下分别是:

1. RetryAndFollowUpInterceptor
2. BridgeInterceptor
3. CacheInterceptor
4. ConnectInterceptor
5. CallServerInterceptor
\color{#ff0000}{RetryAndFollowUpInterceptor}
/**
   * How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox,
   * curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5.
   */
  private static final int MAX_FOLLOW_UPS = 20;//这里我们知道OKHttp支持的最大重定向次数为20
@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();
//管理Connections,Streams和Calls的类,看名字知道是分配和回收Stream的管理类,这里先不铺展开来看。
   StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;
//重定向的次数
    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {//调用下一步interceptor的proceed,把整个chain串起来
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } 
...
//删掉了异常处理
      }
      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }
      Request followUp;
      try {//看下面followUpRequest方法,判断链接是否是重定向的链接等等
        followUp = followUpRequest(response, streamAllocation.route());
      } catch (IOException e) {
        streamAllocation.release();
        throw e;
      }
      if (followUp == null) {
        streamAllocation.release();
        return response;//唯一的return的地点
      }
      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {//最大重定向判断,超过抛异常出去
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }
      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }
//返回null就默认不做处理,直接让intercept返回得到的response
private Request followUpRequest(Response userResponse, Route route) throws IOException {
    if (userResponse == null) throw new IllegalStateException();
    int responseCode = userResponse.code();//得到responseCode

    final String method = userResponse.request().method();
    switch (responseCode) {
      case HTTP_PROXY_AUTH://HTTP Status-Code 407: Proxy Authentication Required.
        Proxy selectedProxy = route != null
            ? route.proxy()
            : client.proxy();
        if (selectedProxy.type() != Proxy.Type.HTTP) {
          throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
        }
//我们看默认情况下的client.proxyAuthenticator()其实是Authenticator.NONE,它的authenticate返回的是null
        return client.proxyAuthenticator().authenticate(route, userResponse);
      case HTTP_UNAUTHORIZED://HTTP Status-Code 401: Unauthorized.
//一样的默认情况的client.authenticator()其实是Authenticator.NONE,它的authenticate返回的是null
        return client.authenticator().authenticate(route, userResponse);

      case HTTP_PERM_REDIRECT://Numeric status code, 307: Temporary Redirect.
      case HTTP_TEMP_REDIRECT:
        // "If the 307 or 308 status code is received in response to a request other than GET
        // or HEAD, the user agent MUST NOT automatically redirect the request"
        if (!method.equals("GET") && !method.equals("HEAD")) {
          return null;
        }
        // fall-through
      case HTTP_MULT_CHOICE://HTTP Status-Code 300: Multiple Choices.
      case HTTP_MOVED_PERM://HTTP Status-Code 301: Moved Permanently.
      case HTTP_MOVED_TEMP://HTTP Status-Code 302: Temporary Redirect.
      case HTTP_SEE_OTHER://HTTP Status-Code 303: See Other.
        // Does the client allow redirects?如果用户不允许重定向,直接返回
        if (!client.followRedirects()) return null;
        //找到重定向信息
        String location = userResponse.header("Location");
        if (location == null) return null;
        HttpUrl url = userResponse.request().url().resolve(location);
        // Don't follow redirects to unsupported protocols.
        if (url == null) return null;
        // If configured, don't follow redirects between SSL and non-SSL.
        boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
        if (!sameScheme && !client.followSslRedirects()) return null;
        // Most redirects don't include a request body. 生成一个不含body的request
        Request.Builder requestBuilder = userResponse.request().newBuilder();
        if (HttpMethod.permitsRequestBody(method)) {
          final boolean maintainBody = HttpMethod.redirectsWithBody(method);
          if (HttpMethod.redirectsToGet(method)) {
            requestBuilder.method("GET", null);
          } else {
            RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
            requestBuilder.method(method, requestBody);
          }
          if (!maintainBody) {
            requestBuilder.removeHeader("Transfer-Encoding");
            requestBuilder.removeHeader("Content-Length");
            requestBuilder.removeHeader("Content-Type");
          }
        }

        if (!sameConnection(userResponse, url)) {
          requestBuilder.removeHeader("Authorization");
        }
        //重新build一个request
        return requestBuilder.url(url).build();

      case HTTP_CLIENT_TIMEOUT://HTTP Status-Code 408: Request Time-Out.
        ...
//条件判断返回null
        return userResponse.request();

      case HTTP_UNAVAILABLE://HTTP Status-Code 503: Service Unavailable.
        if (userResponse.priorResponse() != null
            && userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
          // We attempted to retry and got another timeout. Give up.
          return null;
        }

        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
          // specifically received an instruction to retry without delay
          return userResponse.request();
        }
        return null;
      default:
        return null;
    }
  }

看名字知道这个interceptor是用来做重定向处理的,具体注释看代码里面的注释。
我们看,其实RetryAndFollowUpInterceptor上来把StreamAllocation new出来之后就直接调用了chain.preceed,因为chain一开始调用的时候是没有StreamAllocation的,StreamAllocation传了null(所以用户自定义的interceptor一开始是拿不到StreamAllocation的,也就是说RealInterceptorChain的streamAllocation属性是RetryAndFollowUpInterceptor这里才有的),也就是直接到后面请求去了,我们其实可以看到唯一正常return的地方是followUp == null的情况下,会把response,也就是重定向到没有重定向的那个链接,返回回去。
followUpRequest方法大致就是如果response是重定向的,就重新搞一个request出来,重新去走一遍请求流程。

\color{#ff0000}{BridgeInterceptor}
@Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
    if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive");}

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    //这句之前是包装request
    Response networkResponse = chain.proceed(requestBuilder.build());
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

其实BridgeInterceptor看注释就很简答了,首先把app的代码包装成网络请求的参数,也就是把request的参数写成header里面的key-value,并放到一个String的List里面,Like Below:

    final List<String> namesAndValues = new ArrayList<>(20);
/**
     * Add a field with the specified value without any validation. Only appropriate for headers
     * from the remote peer or cache.
     */
    Builder addLenient(String name, String value) {
      namesAndValues.add(name);
      namesAndValues.add(value.trim());
      return this;
    }

然后把response的body解析出来赋给response。

\color{#ff0000}{CacheInterceptor}
@Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;//如果初始化cache的时候没有设置cache,cache就是null,那么这里得到的就是null

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.网络不可用无缓存,返回fail
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done. 网络不可用,返回缓存的response
    if (networkRequest == null) {
      return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
    }

    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);//需要网络,获取网络response
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.缓存和网络都获取到,合并返回
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // 更新缓存  TODO:不知道这里为什么cache不判空
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    ...
//update cache
    return response;
  }

大体流程上面写了,大体就是OKHttp自己实现了一个使用OKIO读写的DiskLruCache,实现在磁盘的缓存put-get操作,然后又一个类叫做CacheStrategy(顾名思义)来做缓存的策略控制,CacheStrategy用它的两个参数来做控制,分别是strategy.networkRequeststrategy.cacheResponse,控制着这两个参数的有无来做判断。

条件 networkRequest cacheResponse
没有缓存时
2. 请求采用 https 但是缓存没有进行握手的数据
3. 缓存不应该被保存(保留了一些不应该缓存的数据)
4. 请求添加了 Cache-Control:No-Cache, 或者一些条件请求首部,说明不希望使用缓存
传入的request
缓存响应首部包含 Cache-Control:immutable
(不属于 http 协议),说明资源不会改变
传入的缓存响应
新鲜度验证通过 传入的缓存响应
(可能会添加一些首部)
新鲜度验证不通过,使用 Etag 或 Last-Modified
或 Date 首部构造条件请求并返回
条件请求 传入的缓存响应
新鲜度验证不通过,且缓存响应没有 Etag、
Last-Modified 和 Date 中的任何一个
传入的request
上述 5 种情况中 networkRequest 不为空时,若请求通过 Cache-Control:only-if-cached 只允许我们使用当前缓存
networkRequest cacheResponse 处理方法
直接返回 504 Unsatisfiable Request (only-if-cached)响应。
非空 说明缓存有效,直接返回 cacheResponse
非空 说明需要向网络发送请求(原始 request 或新鲜度验证后的条件请求):
1. 如果有缓存数据,在获得再验证的响应后,使用 cache 的 update 方法更新缓存
2. 如果没有缓存数据,判断请求是否可以被缓存,可以的话就使用 cache 的 put 方法缓存下来

表格借鉴了这里,原文链接

\color{#ff0000}{ConnectInterceptor}
@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

ConnectInterceptor相对之前的interceptor代码就简短了很多。这里我们看用到了StreamAllocation,我们应该还记得在RetryAndFollowUpInterceptor中创建的StreamAllocation。
StreamAllocation这个类协调了下面三个类的关系:

  • Connections:连接到远程服务的物理连接,他们创建可能会很慢,所以我们应该能够取掉刚刚创建的连接。
  • Streams:位于connection上的逻辑HTTP request/response对。每个连接有它自己的分配限制,定义了每个连接能承载的流,HTTP/1.x每次能承载一条连接,http/2.x的特色是能承载多条。
  • Calls:一个逻辑上的streams的序列,一个初始化的request和它的后续的requests。我们期望能在同一个connection的里面用一个call,这样能有更好的表现。

StreamAllocation支持cancel(异步的)。这样做的目的是可能会有最小的爆炸半径。如果Http/2的流正在活动,这时候cancel可能会cancel这个流,但不是其他共享它的connection的流。但是如果TLS正在握手中,这可能会终止整个握手过程。
我们继续看ConnectInterceptor,其实就做了两个事儿,先获取了一个HttpCodec,newStream

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {//获取一个健康的Connection
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

继续看到newCodec方法里面,

public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
      StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, chain, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }

我们发现有返回了两种HttpCodec,Http1.x跟Http2.x,我们看到ConnectInterceptor拿到了HttpCodec和RealConnection继续调起后面的Interceptor。

\color{#ff0000}{CallServerInterceptor}

这是整个chain的最后一步了。

public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
//获取到之前创建的各种工具类
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);//准备好http的头信息,并发送给server
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
//是否接收到服务器的100-continue,表明允许客户端发送body。
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {//主要请求方法入口,服务器允许发送body
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();//获取请求体的长度
//构建一个Sink
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        //写入请求体
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }
//构建response
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
...
if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
//写入消息体
          .body(httpCodec.openResponseBody(response))
          .build();
    }
...
    return response;
  }

  static final class CountingSink extends ForwardingSink {
    long successfulCount;

    CountingSink(Sink delegate) {
      super(delegate);
    }

    @Override public void write(Buffer source, long byteCount) throws IOException {
      super.write(source, byteCount);
      successfulCount += byteCount;
    }
  }
}

这里就不需要继续往后调chain的interceptor了,它的任务就是获取到之前准备好的各个类和参数,获取到response返回回去就好了。
其实简单来说的话就是四步

  1. 给服务器发送消息头
  2. 给服务器发送消息体
  3. 根据request信息构建response
  4. 获得服务器返回的消息体

然后组装成response返回给chain。

相关文章

网友评论

      本文标题:OKHttp源码 - Interceptor

      本文链接:https://www.haomeiwen.com/subject/pcaxyqtx.html