基本使用
val client = OkHttpClient.Builder().build()
val request = Request.Builder().build()
// 同步请求
client.newCall(request).execute()
// 异步请求
client.newCall(request).enqueue(object: Callback{
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
}
})
OkHttpClient
OkHttpClient 通过 Builder 建造者模式,进行一些参数配置
internal var dispatcher: Dispatcher = Dispatcher()
internal var connectionPool: ConnectionPool = ConnectionPool()
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var fastFallback = false
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
internal var taskRunner: TaskRunner? = null
Request
Request也是通过 Builder 建造者配置一些参数
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null
/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
OkhttpClient 通过 newCall 方法生成一个 RealCall 实例对象,RealCall 实现了 Call 接口
Call
请求调用接口,表示这个请求已经准备好可以执行,也可以取消,只能执行一次。
interface Call : Cloneable {
fun request(): Request
/**
* 同步请求会抛出 `IOException` 异常
*/
fun execute(): Response
fun enqueue(responseCallback: Callback)
fun cancel()
fun isExecuted(): Boolean
fun isCanceled(): Boolean
fun timeout(): Timeout
public override fun clone(): Call
fun interface Factory {
fun newCall(request: Request): Call
}
}
RealCall
class RealCall(
val client: OkHttpClient,
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// .... 省略部分
override fun execute(): Response {
// ...
try {
// 通过 dispatcher 发送请求,
// 通过分析 Dispatcher 源码可以知道,就是将 call 添加到 runningSyncCalls 列表
client.dispatcher.executed(this)
// 获取到服务器的 response
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
internal fun getResponseWithInterceptorChain(): Response {
// 添加一些拦截器,用于生成 RealInterceptorChain 对象
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
// 最终的获取 response 操作通过 RealInterceptorChain.process 来执行
try {
// 获取到返回信息
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
override fun enqueue(responseCallback: Callback) {
// ...
// 通过 dispatcher 发送异步请求
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
}
call 发送请求将通过 Dispatcher 调度器实例进行处理,发送异步请求时,会涉及 AsyncCall
AsyncCall
AsyncCall 是 RealCall 的内部类,实现 Runnable 然后被调度器内部的线程池进行处理
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
// ... Dispatcher 最终将 AsyncCall 通过这边执行
fun executeOn(executorService: ExecutorService) {
var success = false
try {
// 调度器内部线程池执行操作
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
// 异常回调
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 结束 call 执行
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// 获取服务器返回的 response,同 excute
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 通过回调将 response 传递出去
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
// 请求失败
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
// 请求过程发生异常则取消请求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 将异常传递出去
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 结束 call 执行
client.dispatcher.finished(this)
}
}
}
}
Dispatcher
Dispatcher 用来调度 call 对象,内部包含线程池和请求队列,用来存放和执行 AsyncCall 对象
class Dispatcher() {
private var executorServiceOrNull: ExecutorService? = null
// 线程池对象
("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
//
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 将 asyncCall 添加到列表
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
// 根据域名去 readyAsyncCalls,runningAsyncCalls 列表查找是否有存在的请求
// 查找到了则复用
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 执行请求操作
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
// ...
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 通过线程池执行请求操作,回到 AsyncCall 进行操作
asyncCall.executeOn(executorService)
}
return isRunning
}
// 同步请求就是将 realCall 添加到 runningSyncCalls 列表
internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
//
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
}
RealInterceptorChain
通过上面的分析,最终去执行请求的是 RealInterceptorChain,通过 process 方法来返回 Response,RealInterceptorChain 是 Interceptor.Chain 的具体实现类
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
private var calls: Int = 0
internal fun copy(
index: Int = this.index,
exchange: Exchange? = this.exchange,
request: Request = this.request,
connectTimeoutMillis: Int = this.connectTimeoutMillis,
readTimeoutMillis: Int = this.readTimeoutMillis,
writeTimeoutMillis: Int = this.writeTimeoutMillis
) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,readTimeoutMillis, writeTimeoutMillis)
// .... 省略部分代码
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
// ....
// 获取下个 interceptor
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
// 通过拦截器 interceptor 获取 response 信息
// 针对具体的 interceptor 处理不同的情况,返回 RealCall 的 getResponseWithInterceptorChain
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
// ...
return response
}
}
Interceptor
在 RealCall 的 getResponseWithInterceptorChain 方法中,添加了一系列的 interceptor
fun getResponseWithInterceptorChain(): Response {
val interceptors = mutableListOf<Interceptor>()
// 自定义 Interceptors,例如可以在请求的时候统一添加 token 信息
interceptors += client.interceptors
// 对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作
interceptors += RetryAndFollowUpInterceptor(client)
// 负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应
interceptors += BridgeInterceptor(client.cookieJar)
// 负责缓存的相关处理
interceptors += CacheInterceptor(client.cache)
// 负责建立连接,会建立TCP连接或者TLS连接
interceptors += ConnectInterceptor
if (!forWebSocket) {
// 自定义 Interceptors
interceptors += client.networkInterceptors
}
// 网络数据的请求和响应,即网络 IO 操作
interceptors += CallServerInterceptor(forWebSocket)
}
RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor 负责对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
// ...
while (true) {
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
// 通过 RealInterceptorChain 的 proceed 返回 response
response = realChain.proceed(request)
newRoutePlanner = true
} catch (e: IOException) {
// 尝试与服务器通信失败,请求可能已发送。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newRoutePlanner = false
continue
}
// 尝试关联上一个 response,注意:body 是为 null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 会根据 responseCode 来判断,构建一个新的 request并返回重试或者重定向
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// ...
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// ...
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
// ...
}
BridgeInterceptor
BridgeInterceptor 负责将用户构建的请求转换为服务器需要的请求,例如在请求中添加 Content-Type 等信息,移除响应头中的 Content-Encoding 等
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
// ...
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
// ...
}
CacheInterceptor
CacheInterceptor 用于配制一些缓存策略,开发者可通过 OkHttpClient.Builder().cache 进行配置
class CacheInterceptor(internal val cache: Cache?) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
// ...
// 缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 为空表示不使用网络,反之,则表示使用网络
val networkRequest = strategy.networkRequest
// 为空表示不使用缓存,反之,则表示使用缓存
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// 如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT) // GATEWAY_TIMEOUT
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 如果禁用了网络,且有缓存,直接根据缓存内容构建并返回response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
// 责任链往下处理,从服务器返回response 赋值给 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
if (cacheResponse != null) {
// 网络返回response code为304的时候,使用缓存内容新构建一个Response返回
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
// 如果cache不为null,即用户在OkHttpClient中配置了缓存,
// 则将上一步新构建的网络请求response存到cache中
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
// 根据请求方法来判断缓存是否有效,只对 GET 请求进行缓存,其它方法的请求则移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
// ....
}
ConnectInterceptor
ConnectInterceptor 负责实现与服务器真正建立起连接
object ConnectInterceptor : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(realChain) // 初始化一个exchange对象
// 根据 exchange 对象重新生成一个责任链
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
// RealCall initExchange
internal fun initExchange(chain: RealInterceptorChain): Exchange {
// ...
val routePlanner = this.routePlanner!!
// 根据情况获取连接
val connection = when {
// 推测性地连接到目标地址的每个 IP 地址,一旦其中一个连接成功就返回。
// 会 250 毫秒启动一次新的尝试,直到连接成功
client.fastFallback -> FastFallbackExchangeFinder(routePlanner, client.taskRunner).find()
else -> ExchangeFinder(routePlanner).find()
}
// 根据情况返回 Http1ExchangeCodec 或者 Http2ExchangeCodec
// 这边会通过 RealConnection 处理连接
val codec = connection.newCodec(client, chain)
// 根据 codec 重新创建 exchange 对象并返回
val result = Exchange(this, eventListener, routePlanner, codec)
// ...
return result
}
RealConnection
RealConnection 用来与远程服务器连接,承载1个或多个流
class RealConnection(
val taskRunner: TaskRunner,
val connectionPool: RealConnectionPool,
private val route: Route
) : Http2Connection.Listener(), Connection{
//...
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
check(isNew) { "already connected" }
var firstException: IOException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
// ...
while (true) {
try {
// 根据情况判断是创建 Socket 还是 Tunnel
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
break
}
} else {
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
// 建立对应的协议
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
// ...
}
}
// ...
}
private fun connectTunnel(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
call: Call,
eventListener: EventListener
) {
// 通常用于发送到代理服务器以建立隧道连接
var tunnelRequest: Request = createTunnelRequest()
val url = tunnelRequest.url
for (i in 0 until MAX_TUNNEL_ATTEMPTS) {
// 建立原始的底层连接,通常用于连接到代理服务器
connectSocket(connectTimeout, readTimeout, call, eventListener)
// 创建代理隧道,如果创建成功则返回 null,否则会返回新的 Request 对象,用于重新创建连接
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url)
?: break // Tunnel successfully created.
//
eventListener.connectEnd(call, route.socketAddress, route.proxy, null)
}
}
private fun connectSocket(
connectTimeout: Int,
readTimeout: Int,
call: Call,
eventListener: EventListener
) {
val proxy = route.proxy
val address = route.address
// 创建套接字
val rawSocket = when (proxy.type()) {
Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
else -> Socket(proxy)
}
// ...
try {
// 连接套接字
Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
} catch (e: ConnectException) {
throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
initCause(e)
}
}
try {
// 通过 rawSocket 创建输入流 source 和输出流 sink,并分别用缓冲流包装它们
source = rawSocket.source().buffer()
sink = rawSocket.sink().buffer()
} catch (npe: NullPointerException) {
if (npe.message == NPE_THROW_WITH_NULL) {
throw IOException(npe)
}
}
}
// 建立协议,根据路由地址的属性来决定采用哪种协议来与服务器通信
private fun establishProtocol(
connectionSpecSelector: ConnectionSpecSelector,
pingIntervalMillis: Int,
call: Call,
eventListener: EventListener
) {
// 如果 sslSocketFactory 则不需要加密,执行普通的 HTTP 请求
if (route.address.sslSocketFactory == null) {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
socket = rawSocket
protocol = Protocol.H2_PRIOR_KNOWLEDGE
startHttp2(pingIntervalMillis)
return
}
socket = rawSocket
protocol = Protocol.HTTP_1_1
return
}
// 加密请求,建立安全连接
eventListener.secureConnectStart(call)
// 创建和配置 TLS 连接
connectTls(connectionSpecSelector)
eventListener.secureConnectEnd(call, handshake)
if (protocol === Protocol.HTTP_2) {
startHttp2(pingIntervalMillis)
}
}
// 连接 TLS,并在建立连接的过程中完成SSL/TLS握手,验证主机名,检查证书等操作
private fun connectTls(connectionSpecSelector: ConnectionSpecSelector) {
val address = route.address
val sslSocketFactory = address.sslSocketFactory
var success = false // 连接是否成功
var sslSocket: SSLSocket? = null // 用于存储 SSL 套接字
try {
sslSocket = sslSocketFactory!!.createSocket(
rawSocket, address.url.host, address.url.port, true /* autoClose */
) as SSLSocket
// 配置套接字的连接规范,这包括了密码、TLS版本
val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket)
if (connectionSpec.supportsTlsExtensions) {
// 配置TLS扩展
Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols)
}
// 启动TLS握手过程,包括客户端和服务器之间的密钥协商等
sslSocket.startHandshake()
val sslSocketSession = sslSocket.session
// 获取了握手信息,包括TLS版本、加密套件和证书信息
val unverifiedHandshake = sslSocketSession.handshake()
// ... 验证证书信息等操作
// 用于验证证书链
val certificatePinner = address.certificatePinner!!
// 创建一个 Handshake 对象,记录TLS版本、加密套件和本地证书
handshake = Handshake(
unverifiedHandshake.tlsVersion,
unverifiedHandshake.cipherSuite,
unverifiedHandshake.localCertificates
) {
certificatePinner.certificateChainCleaner!!.clean(
unverifiedHandshake.peerCertificates,
address.url.host
)
}
// 检查证书链是否满足 certificatePinner 的要求
certificatePinner.check(address.url.host) {
handshake!!.peerCertificates.map { it as X509Certificate }
}
// 如果所有验证通过,将成功建立的SSL套接字赋给 socket,
// 并设置输入和输出流为已经缓冲的 source 和 sink。
// 根据应用层协议,设置 protocol 为 Protocol.HTTP_1_1 或者从TLS扩展中获取ALPN协商的协议
val maybeProtocol = if (connectionSpec.supportsTlsExtensions) {
Platform.get().getSelectedProtocol(sslSocket)
} else {
null
}
socket = sslSocket
source = sslSocket.source().buffer()
sink = sslSocket.sink().buffer()
protocol = if (maybeProtocol != null) Protocol.get(maybeProtocol) else Protocol.HTTP_1_1
success = true
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket)
}
if (!success) {
sslSocket?.closeQuietly()
}
}
}
}
CallServerInterceptor
CallServerInterceptor 会将请求头与请求体发送给服务器,以及解析服务器返回的response了
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
var sendRequestException: IOException? = null
try {
// 写入请求头信息
exchange.writeRequestHeaders(request)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回
// "HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
// POST请求,先发送请求头,在获取到100继续状态后继续发送请求体
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
// 刷新请求,即发送请求头
exchange.flushRequest()
// 解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// 如果请求体是双公体,就先发送请求头,稍后在发送请求体
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
// 写入请求体
requestBody.writeTo(bufferedRequestBody)
} else {
// 如果获取到了"Expect: 100-continue"响应,写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
}
/// ...
if (requestBody == null || !requestBody.isDuplex()) {
// 结束请求
exchange.finishRequest()
}
} catch (e: IOException) {
// ...
}
try {
if (responseBuilder == null) {
// 读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
// 写入响应信息
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
//...
return response
} catch (e: IOException) {
if (sendRequestException != null) {
sendRequestException.addSuppressed(e)
throw sendRequestException
}
throw e
}
}
}
CallServerInterceptor 会写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response,并返回。 这里CallServerInterceptor是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()方法往下执行,而是将这个构建的response往上传递给责任链中的每个拦截器。
总结
OkHttp 通过 OkHttpClient.newCall 方法生成一个 RealCall 对象去执行请求。
同步请求 excete 和异步请求 enqueue 通过 Dispatcher 调度器,将其添加到请求队列中,然后通过 RealCall 的 getResponseWithInterceptorChain() 方法去请求接口信息。
在getResponseWithInterceptorChain() 方法中,会添加一系列的拦截器,最终生成 RealInterceptorChain 对象去获取服务器的 Response
RealInterceptorChain 的 proceed() 方法中,拦截器不断通过 chain.proceed() 方法向下传递请求信息,到最后 CallServerInterceptor 拦截器则不会继续执行,而是将 Response 一层层往上传递给责任链中的每个拦截器。
整体的流程图如下