@get:JvmName("callTimeoutMillis") val callTimeoutMillis: Int = builder.callTimeout @get:JvmName("connectTimeoutMillis") val connectTimeoutMillis: Int = builder.connectTimeout @get:JvmName("readTimeoutMillis") val readTimeoutMillis: Int = builder.readTimeout @get:JvmName("writeTimeoutMillis") val writeTimeoutMillis: Int = builder.writeTimeout
publicsynchronizedvoidsetSoTimeout(int timeout)throws SocketException { if (isClosed()) thrownew SocketException("Socket is closed"); if (timeout < 0) thrownew IllegalArgumentException("timeout can't be negative");
getImpl().setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout)); }
object ConnectInterceptor : Interceptor { @Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request) } }
关键的代码在 realChain.call.initExchange(chain) 中
1 2 3 4 5 6 7 8 9
/** Finds a new or pooled connection to carry a forthcoming request and response. */ internalfuninitExchange(chain: RealInterceptorChain): Exchange { ... val exchangeFinder = this.exchangeFinder!! val codec = exchangeFinder.find(client, chain) val result = Exchange(this, eventListener, exchangeFinder, codec) ... return result }
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ @Throws(IOException::class) privatefunfindHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled )
// Confirm that the connection is good. if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate }
// If it isn't, take it out of the pool. candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes // would happen if we made a new connection and it immediately is detected as unhealthy. if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true if (routesSelectionLeft) continue
// Attempt to reuse the connection from the call. val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! // 第一步:先查查现有的连接,如果有的话就用(例如重定向) if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { // 同步检查是否可以用,不能用的话创建 toClose if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } }
// We need a new connection. Give it fresh stats. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0
// 第二步:查找连接池 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
// 第三步:自己新建一个连接 // 以下的 if esle 逻辑的作用是建立重试路径 val routes: List<Route>? val route: Route if (nextRouteToTry != null) { // Use a route from a preceding coalesced connection. routes = null route = nextRouteToTry!! nextRouteToTry = null } elseif (routeSelection != null && routeSelection!!.hasNext()) { // Use a route from an existing route selection. routes = null route = routeSelection!!.next() } else { // Compute a new route selection. This is a blocking operation! var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. We have a better chance of matching thanks to connection coalescing. if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
funcleanup(now: Long): Long { var inUseConnectionCount = 0// 正在使用的连接数 var idleConnectionCount = 0// 空闲的连接数 var longestIdleConnection: RealConnection? = null// 空闲时间最长的连接 var longestIdleDurationNs = Long.MIN_VALUE // 最长的空闲时间
// 遍历池子中的所有链接,不断对上面的四个值进行赋值修改 for (connection in connections) { synchronized(connection) { // If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++ } else { idleConnectionCount++
// If the connection is ready to be evicted, we're done. val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection } else { Unit } } } }
when { // 如果最长连接时间已经超过了 5 分钟,或者说闲置的连接数大于 5 个了,就移除限制时间最长的连接 longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections -> { // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it. val connection = longestIdleConnection!! synchronized(connection) { if (connection.calls.isNotEmpty()) return0L// No longer idle. if (connection.idleAtNs + longestIdleDurationNs != now) return0L// No longer oldest. connection.noNewExchanges = true connections.remove(longestIdleConnection) }
connection.socket().closeQuietly() if (connections.isEmpty()) cleanupQueue.cancelAll()