基本使用

val client = OkHttpClient.Builder().build()
val request = Request.Builder().build()

// 同步请求
client.newCall(request).execute()

// 异步请求
client.newCall(request).enqueue(object: Callback{
override fun onFailure(call: Call, e: IOException) {

}

override fun onResponse(call: Call, response: Response) {

}
})

OkHttpClient

OkHttpClient 通过 Builder 建造者模式,进行一些参数配置

internal var dispatcher: Dispatcher = Dispatcher()
internal var connectionPool: ConnectionPool = ConnectionPool()
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var fastFallback = false
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
internal var taskRunner: TaskRunner? = null

Request

Request也是通过 Builder 建造者配置一些参数

internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null

/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()

OkhttpClient 通过 newCall 方法生成一个 RealCall 实例对象,RealCall 实现了 Call 接口

Call

请求调用接口,表示这个请求已经准备好可以执行,也可以取消只能执行一次

interface Call : Cloneable {
fun request(): Request

/**
* 同步请求会抛出 `IOException` 异常
*/
@Throws(IOException::class)
fun execute(): Response

fun enqueue(responseCallback: Callback)

fun cancel()

fun isExecuted(): Boolean

fun isCanceled(): Boolean

fun timeout(): Timeout

public override fun clone(): Call

fun interface Factory {
fun newCall(request: Request): Call
}
}

RealCall

class RealCall(
val client: OkHttpClient,
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// .... 省略部分

override fun execute(): Response {
// ...
try {
// 通过 dispatcher 发送请求,
// 通过分析 Dispatcher 源码可以知道,就是将 call 添加到 runningSyncCalls 列表
client.dispatcher.executed(this)

// 获取到服务器的 response
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// 添加一些拦截器,用于生成 RealInterceptorChain 对象
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

//
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false

// 最终的获取 response 操作通过 RealInterceptorChain.process 来执行
try {
// 获取到返回信息
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

override fun enqueue(responseCallback: Callback) {
// ...
// 通过 dispatcher 发送异步请求
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
}

call 发送请求将通过 Dispatcher 调度器实例进行处理,发送异步请求时,会涉及 AsyncCall

AsyncCall

AsyncCallRealCall 的内部类,实现 Runnable 然后被调度器内部的线程池进行处理

inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
// ... Dispatcher 最终将 AsyncCall 通过这边执行
fun executeOn(executorService: ExecutorService) {
var success = false
try {
// 调度器内部线程池执行操作
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
// 异常回调
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 结束 call 执行
client.dispatcher.finished(this) // This call is no longer running!
}
}
}

override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// 获取服务器返回的 response,同 excute
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 通过回调将 response 传递出去
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
// 请求失败
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
// 请求过程发生异常则取消请求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 将异常传递出去
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 结束 call 执行
client.dispatcher.finished(this)
}
}
}
}

Dispatcher

Dispatcher 用来调度 call 对象,内部包含线程池和请求队列,用来存放和执行 AsyncCall 对象

class Dispatcher() {
private var executorServiceOrNull: ExecutorService? = null

// 线程池对象
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}

/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()

//
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 将 asyncCall 添加到列表
readyAsyncCalls.add(call)

if (!call.call.forWebSocket) {
// 根据域名去 readyAsyncCalls,runningAsyncCalls 列表查找是否有存在的请求
// 查找到了则复用
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 执行请求操作
promoteAndExecute()
}

private fun promoteAndExecute(): Boolean {
// ...
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()

if (runningAsyncCalls.size >= this.maxRequests) break
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue

i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}

isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 通过线程池执行请求操作,回到 AsyncCall 进行操作
asyncCall.executeOn(executorService)
}

return isRunning
}

// 同步请求就是将 realCall 添加到 runningSyncCalls 列表
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}

//
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}

val isRunning = promoteAndExecute()

if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
}

RealInterceptorChain

通过上面的分析,最终去执行请求的是 RealInterceptorChain,通过 process 方法来返回 ResponseRealInterceptorChainInterceptor.Chain 的具体实现类

class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {

private var calls: Int = 0

internal fun copy(
index: Int = this.index,
exchange: Exchange? = this.exchange,
request: Request = this.request,
connectTimeoutMillis: Int = this.connectTimeoutMillis,
readTimeoutMillis: Int = this.readTimeoutMillis,
writeTimeoutMillis: Int = this.writeTimeoutMillis
) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,readTimeoutMillis, writeTimeoutMillis)

// .... 省略部分代码

@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)

calls++
// ....

// 获取下个 interceptor
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]

// 通过拦截器 interceptor 获取 response 信息
// 针对具体的 interceptor 处理不同的情况,返回 RealCall 的 getResponseWithInterceptorChain
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")

// ...
return response
}
}

Interceptor

RealCallgetResponseWithInterceptorChain 方法中,添加了一系列的 interceptor

fun getResponseWithInterceptorChain(): Response {
val interceptors = mutableListOf<Interceptor>()
// 自定义 Interceptors,例如可以在请求的时候统一添加 token 信息
interceptors += client.interceptors
// 对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作
interceptors += RetryAndFollowUpInterceptor(client)
// 负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应
interceptors += BridgeInterceptor(client.cookieJar)
// 负责缓存的相关处理
interceptors += CacheInterceptor(client.cache)
// 负责建立连接,会建立TCP连接或者TLS连接
interceptors += ConnectInterceptor
if (!forWebSocket) {
// 自定义 Interceptors
interceptors += client.networkInterceptors
}
// 网络数据的请求和响应,即网络 IO 操作
interceptors += CallServerInterceptor(forWebSocket)
}

RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor 负责对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// ...
while (true) {

var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}

try {
// 通过 RealInterceptorChain 的 proceed 返回 response
response = realChain.proceed(request)
newRoutePlanner = true
} catch (e: IOException) {
// 尝试与服务器通信失败,请求可能已发送。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newRoutePlanner = false
continue
}

// 尝试关联上一个 response,注意:body 是为 null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}

val exchange = call.interceptorScopedExchange
// 会根据 responseCode 来判断,构建一个新的 request并返回重试或者重定向
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}

// ...
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}

response.body?.closeQuietly()

// ...
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}

// ...
}

BridgeInterceptor

BridgeInterceptor 负责将用户构建的请求转换为服务器需要的请求,例如在请求中添加 Content-Type 等信息,移除响应头中的 Content-Encoding

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// ...
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}

val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}

var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}

val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}

val networkResponse = chain.proceed(requestBuilder.build())

cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

val responseBuilder = networkResponse.newBuilder()
.request(userRequest)

if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()
}

// ...
}

CacheInterceptor

CacheInterceptor 用于配制一些缓存策略,开发者可通过 OkHttpClient.Builder().cache 进行配置

class CacheInterceptor(internal val cache: Cache?) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// ...

// 缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 为空表示不使用网络,反之,则表示使用网络
val networkRequest = strategy.networkRequest
// 为空表示不使用缓存,反之,则表示使用缓存
val cacheResponse = strategy.cacheResponse

cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}

// 如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT) // GATEWAY_TIMEOUT
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}

// 如果禁用了网络,且有缓存,直接根据缓存内容构建并返回response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}

if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}

var networkResponse: Response? = null
try {
// 责任链往下处理,从服务器返回response 赋值给 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}

if (cacheResponse != null) {
// 网络返回response code为304的时候,使用缓存内容新构建一个Response返回
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

networkResponse.body!!.close()

cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}

val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

// 如果cache不为null,即用户在OkHttpClient中配置了缓存,
// 则将上一步新构建的网络请求response存到cache中
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}

// 根据请求方法来判断缓存是否有效,只对 GET 请求进行缓存,其它方法的请求则移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}

return response
}

// ....
}

ConnectInterceptor

ConnectInterceptor 负责实现与服务器真正建立起连接

object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(realChain) // 初始化一个exchange对象
// 根据 exchange 对象重新生成一个责任链
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
// RealCall initExchange
internal fun initExchange(chain: RealInterceptorChain): Exchange {
// ...
val routePlanner = this.routePlanner!!

// 根据情况获取连接
val connection = when {
// 推测性地连接到目标地址的每个 IP 地址,一旦其中一个连接成功就返回。
// 会 250 毫秒启动一次新的尝试,直到连接成功
client.fastFallback -> FastFallbackExchangeFinder(routePlanner, client.taskRunner).find()
else -> ExchangeFinder(routePlanner).find()
}

// 根据情况返回 Http1ExchangeCodec 或者 Http2ExchangeCodec
// 这边会通过 RealConnection 处理连接
val codec = connection.newCodec(client, chain)
// 根据 codec 重新创建 exchange 对象并返回
val result = Exchange(this, eventListener, routePlanner, codec)
// ...
return result
}

RealConnection

RealConnection 用来与远程服务器连接,承载1个或多个流

class RealConnection(
val taskRunner: TaskRunner,
val connectionPool: RealConnectionPool,
private val route: Route
) : Http2Connection.Listener(), Connection{

//...

fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
check(isNew) { "already connected" }

var firstException: IOException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)

// ...

while (true) {
try {
// 根据情况判断是创建 Socket 还是 Tunnel
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
break
}
} else {
connectSocket(connectTimeout, readTimeout, call, eventListener)
}

// 建立对应的协议
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
// ...
}
}

// ...
}

@Throws(IOException::class)
private fun connectTunnel(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
call: Call,
eventListener: EventListener
) {
// 通常用于发送到代理服务器以建立隧道连接
var tunnelRequest: Request = createTunnelRequest()
val url = tunnelRequest.url

for (i in 0 until MAX_TUNNEL_ATTEMPTS) {
// 建立原始的底层连接,通常用于连接到代理服务器
connectSocket(connectTimeout, readTimeout, call, eventListener)

// 创建代理隧道,如果创建成功则返回 null,否则会返回新的 Request 对象,用于重新创建连接
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url)
?: break // Tunnel successfully created.

//
eventListener.connectEnd(call, route.socketAddress, route.proxy, null)
}
}

@Throws(IOException::class)
private fun connectSocket(
connectTimeout: Int,
readTimeout: Int,
call: Call,
eventListener: EventListener
) {
val proxy = route.proxy
val address = route.address

// 创建套接字
val rawSocket = when (proxy.type()) {
Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
else -> Socket(proxy)
}

// ...

try {
// 连接套接字
Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
} catch (e: ConnectException) {
throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
initCause(e)
}
}

try {
// 通过 rawSocket 创建输入流 source 和输出流 sink,并分别用缓冲流包装它们
source = rawSocket.source().buffer()
sink = rawSocket.sink().buffer()
} catch (npe: NullPointerException) {
if (npe.message == NPE_THROW_WITH_NULL) {
throw IOException(npe)
}
}
}

// 建立协议,根据路由地址的属性来决定采用哪种协议来与服务器通信
@Throws(IOException::class)
private fun establishProtocol(
connectionSpecSelector: ConnectionSpecSelector,
pingIntervalMillis: Int,
call: Call,
eventListener: EventListener
) {

// 如果 sslSocketFactory 则不需要加密,执行普通的 HTTP 请求
if (route.address.sslSocketFactory == null) {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
socket = rawSocket
protocol = Protocol.H2_PRIOR_KNOWLEDGE
startHttp2(pingIntervalMillis)
return
}

socket = rawSocket
protocol = Protocol.HTTP_1_1
return
}

// 加密请求,建立安全连接
eventListener.secureConnectStart(call)
// 创建和配置 TLS 连接
connectTls(connectionSpecSelector)
eventListener.secureConnectEnd(call, handshake)

if (protocol === Protocol.HTTP_2) {
startHttp2(pingIntervalMillis)
}
}

// 连接 TLS,并在建立连接的过程中完成SSL/TLS握手,验证主机名,检查证书等操作
@Throws(IOException::class)
private fun connectTls(connectionSpecSelector: ConnectionSpecSelector) {
val address = route.address
val sslSocketFactory = address.sslSocketFactory
var success = false // 连接是否成功
var sslSocket: SSLSocket? = null // 用于存储 SSL 套接字

try {
sslSocket = sslSocketFactory!!.createSocket(
rawSocket, address.url.host, address.url.port, true /* autoClose */
) as SSLSocket

// 配置套接字的连接规范,这包括了密码、TLS版本
val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket)
if (connectionSpec.supportsTlsExtensions) {
// 配置TLS扩展
Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols)
}

// 启动TLS握手过程,包括客户端和服务器之间的密钥协商等
sslSocket.startHandshake()
val sslSocketSession = sslSocket.session
// 获取了握手信息,包括TLS版本、加密套件和证书信息
val unverifiedHandshake = sslSocketSession.handshake()

// ... 验证证书信息等操作

// 用于验证证书链
val certificatePinner = address.certificatePinner!!

// 创建一个 Handshake 对象,记录TLS版本、加密套件和本地证书
handshake = Handshake(
unverifiedHandshake.tlsVersion,
unverifiedHandshake.cipherSuite,
unverifiedHandshake.localCertificates
) {
certificatePinner.certificateChainCleaner!!.clean(
unverifiedHandshake.peerCertificates,
address.url.host
)
}

// 检查证书链是否满足 certificatePinner 的要求
certificatePinner.check(address.url.host) {
handshake!!.peerCertificates.map { it as X509Certificate }
}

// 如果所有验证通过,将成功建立的SSL套接字赋给 socket,
// 并设置输入和输出流为已经缓冲的 source 和 sink。
// 根据应用层协议,设置 protocol 为 Protocol.HTTP_1_1 或者从TLS扩展中获取ALPN协商的协议
val maybeProtocol = if (connectionSpec.supportsTlsExtensions) {
Platform.get().getSelectedProtocol(sslSocket)
} else {
null
}

socket = sslSocket
source = sslSocket.source().buffer()
sink = sslSocket.sink().buffer()
protocol = if (maybeProtocol != null) Protocol.get(maybeProtocol) else Protocol.HTTP_1_1
success = true
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket)
}
if (!success) {
sslSocket?.closeQuietly()
}
}
}
}

CallServerInterceptor

CallServerInterceptor 会将请求头与请求体发送给服务器,以及解析服务器返回的response

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()

var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
var sendRequestException: IOException? = null
try {

// 写入请求头信息
exchange.writeRequestHeaders(request)

if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回
// "HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
// POST请求,先发送请求头,在获取到100继续状态后继续发送请求体
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
// 刷新请求,即发送请求头
exchange.flushRequest()
// 解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}

if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// 如果请求体是双公体,就先发送请求头,稍后在发送请求体
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
// 写入请求体
requestBody.writeTo(bufferedRequestBody)
} else {
// 如果获取到了"Expect: 100-continue"响应,写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
}

/// ...

if (requestBody == null || !requestBody.isDuplex()) {

// 结束请求
exchange.finishRequest()
}
} catch (e: IOException) {
// ...
}

try {
if (responseBuilder == null) {
// 读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}

// 写入响应信息
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()

//...
return response
} catch (e: IOException) {
if (sendRequestException != null) {
sendRequestException.addSuppressed(e)
throw sendRequestException
}
throw e
}
}
}

CallServerInterceptor 会写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response,并返回。 这里CallServerInterceptor是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()方法往下执行,而是将这个构建的response往上传递给责任链中的每个拦截器。

拦截器顺序

总结

OkHttp 通过 OkHttpClient.newCall 方法生成一个 RealCall 对象去执行请求。

同步请求 excete 和异步请求 enqueue 通过 Dispatcher 调度器,将其添加到请求队列中,然后通过 RealCallgetResponseWithInterceptorChain() 方法去请求接口信息。

getResponseWithInterceptorChain() 方法中,会添加一系列的拦截器,最终生成 RealInterceptorChain 对象去获取服务器的 Response

RealInterceptorChainproceed() 方法中,拦截器不断通过 chain.proceed() 方法向下传递请求信息,到最后 CallServerInterceptor 拦截器则不会继续执行,而是将 Response 一层层往上传递给责任链中的每个拦截器。

整体的流程图如下

OkHttp process