OkHttp 超时设置

1
2
3
4
@get:JvmName("callTimeoutMillis") val callTimeoutMillis: Int = builder.callTimeout
@get:JvmName("connectTimeoutMillis") val connectTimeoutMillis: Int = builder.connectTimeout
@get:JvmName("readTimeoutMillis") val readTimeoutMillis: Int = builder.readTimeout
@get:JvmName("writeTimeoutMillis") val writeTimeoutMillis: Int = builder.writeTimeout

这些存在于 OkHttpClient.Builder。

callTimeoutMillis 代表调用的超时时间,默认情况下不会超时。

connectTimeoutMillis、readTimeoutMillis、writeTimeoutMillis 分别代表建立连接、读操作、写操作的超时时间,默认都是 10 秒。

检查超时的类为 Timeout ,一般的调用实现为 AsyncTimeout,建立连接的实现稍有不同,为 SocketAsyncTimeout,Http2Stream.StreamTimeout ,均为 AsyncTimeout 的子类。该类会在方法调用的前后进行检查。

1
2
3
4
5
6
7
8
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
if (timeout < 0)
throw new IllegalArgumentException("timeout can't be negative");

getImpl().setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout));
}

readTimeoutMillis 会被设置给 Socket,超时会抛出 java.net.SocketTimeoutException ,这个看起来是从网络上阻塞导致的超时。

这里的超时设置是按 HTTP1.1 和 HTTP2 分别来设置的,HTTP2 的超时是直接设置到流上面的。

1
2
3
4
5
6
7
8
9
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
...
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
...
}
}

如下为 timeout 在读取数据时的使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long {
while (true) {
synchronized(this@Http2Stream) {
readTimeout.enter() // 进入检查
try {
...
} finally {
readTimeout.exitAndThrowIfTimedOut() // 结束并判断是否超时
}
}
...
return -1L // This source is exhausted.
}
}

由代码可知这个 readTimeout 是作用在一次读取上的,只要这个操作没超过指定时间就不会报错,writeTimeout 也是这样。

连接复用逻辑

负责进行连接的拦截器是 ConnectInterceptor,从这里看起。

1
2
3
4
5
6
7
8
9
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}

关键的代码在 realChain.call.initExchange(chain)

1
2
3
4
5
6
7
8
9
/** Finds a new or pooled connection to carry a forthcoming request and response. */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
...
return result
}

exchangeFinder 是负责查找链接的类,查看 exchangeFinder.find(client, chain)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
// 部分代码简化
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
}

一眼就能知道 findHealthyConnection 是负责查找可用链接的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)

// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}

// If it isn't, take it out of the pool.
candidate.noNewExchanges()

// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if (nextRouteToTry != null) continue

val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue

val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue

throw IOException("exhausted all routes")
}
}

这里的逻辑主要是调用 findConnection 来查找连接,如果连接可用就直接返回进行使用,如果不可用的话就从连接池中扔出去,查找下一个链接,重复找,直到试过所有路径,都失败了就报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* 查找顺序:
* 1.现有连接(如果存在)
* 2.连接池
* 3.建立一个新连接。
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")

// Attempt to reuse the connection from the call.
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
// 第一步:先查查现有的连接,如果有的话就用(例如重定向)
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
// 同步检查是否可以用,不能用的话创建 toClose
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}

// 每次阻塞操作后都再检查一下这个连接有没有被置为空
if (call.connection != null) {
// 检查一下是否需要关闭,不需要的话将其返回给外界
check(toClose == null)
return callConnection
}

// 走到这里说明链接可能被其它线程置为 null 了,这里将其安全退出
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}

// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0

// 第二步:查找连接池
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}

// 第三步:自己新建一个连接
// 以下的 if esle 逻辑的作用是建立重试路径
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes

if (call.isCanceled()) throw IOException("Canceled")

// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}

route = localRouteSelection.next()
}

// 用刚才建立的 route 来建立一个连接对象
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
// 真正进行连接的地方
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())

// 因为这里可能是并行的,所以再查一下连接池里有没有相同的链接,有的话就把现在的这个关了
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}

// 把新建立的连接放连接池里
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}

eventListener.connectionAcquired(call, newConnection)
return newConnection
}

连接池是如何清理连接的?

继续依照上面代码看,connectionPool 的真实身份是 RealConnectionPool ,打开这个类可以看到一个很明显的

1
2
3
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}

这个就是清理连接的 task ,它会在 fun put(connection: RealConnection) 放入新链接 和 fun connectionBecameIdle(connection: RealConnection): Boolean 被通知某个链接限制的时候被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun put(connection: RealConnection) {
connections.add(connection)
cleanupQueue.schedule(cleanupTask)
}

fun connectionBecameIdle(connection: RealConnection): Boolean {
return if (connection.noNewExchanges || maxIdleConnections == 0) {
connection.noNewExchanges = true
connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
true
} else {
cleanupQueue.schedule(cleanupTask)
false
}
}

查看一下 cleanup 的真实实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
fun cleanup(now: Long): Long {
var inUseConnectionCount = 0 // 正在使用的连接数
var idleConnectionCount = 0 // 空闲的连接数
var longestIdleConnection: RealConnection? = null // 空闲时间最长的连接
var longestIdleDurationNs = Long.MIN_VALUE // 最长的空闲时间

// 遍历池子中的所有链接,不断对上面的四个值进行赋值修改
for (connection in connections) {
synchronized(connection) {
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++
} else {
idleConnectionCount++

// If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
} else {
Unit
}
}
}
}

when {
// 如果最长连接时间已经超过了 5 分钟,或者说闲置的连接数大于 5 个了,就移除限制时间最长的连接
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
// We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
val connection = longestIdleConnection!!
synchronized(connection) {
if (connection.calls.isNotEmpty()) return 0L // No longer idle.
if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
connection.noNewExchanges = true
connections.remove(longestIdleConnection)
}

connection.socket().closeQuietly()
if (connections.isEmpty()) cleanupQueue.cancelAll()

// 返回 0 代表立刻再次清理
return 0L
}

idleConnectionCount > 0 -> {
// 返回一个数值,等待这么多时间后再清理一下
return keepAliveDurationNs - longestIdleDurationNs
}

inUseConnectionCount > 0 -> {
// 没有限制的连接,所有链接都在使用,5 分钟之后再运行此清理任务
return keepAliveDurationNs
}

else -> {
// 没有链接,返回 -1 说明不再需要再次运行此清理任务了
return -1
}
}
}