/** Finds a new or pooled connection to carry a forthcoming request and response. */ internalfuninitExchange(chain: RealInterceptorChain): Exchange { synchronized(this) { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) } val exchangeFinder = this.exchangeFinder!! //找到一个ExchangeCode val codec = exchangeFinder.find(client, chain) //把ExchangeCode用于构造一个Exchange val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result synchronized(this) { this.requestBodyOpen = true this.responseBodyOpen = true }
if (canceled) throw IOException("Canceled") //返回创建的Exchagne对象 return result }
/** * Transmits a single HTTP request and a response pair. This layers connection management and events * on [ExchangeCodec], which handles the actual I/O. */ classExchange(//...){ //... }
// Confirm that the connection is good. //检查连接是否健康,该连接是否已准备好 if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate }
// If it isn't, take it out of the pool. //如果该连接不健康,则标记该连接不在使用 candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes // would happen if we made a new connection and it immediately is detected as unhealthy. //确保有可尝试的路由 if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true if (routesSelectionLeft) continue //如果没有可用的路由抛出异常 throw IOException("exhausted all routes") } }
// Attempt to reuse the connection from the call. //1、尝试去重用call中当前的连接 val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { //判断当前存在connection是否符合 if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { //connection不符合释放,则在call中把connection置为空 toClose = call.releaseConnectionNoEvents() } }
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here // because we already acquired it. //4、如果connection没有被释放,则返回该当前的连接 if (call.connection != null) { check(toClose == null) return callConnection }
// The call's connection was released. //关闭socket toClose?.closeQuietly() //connection被释放的监听 eventListener.connectionReleased(call, callConnection) }
// We need a new connection. Give it fresh stats. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0
// Attempt to get a connection from the pool. //2、 第一次尝试从连接池中获取连接,注意此时的第三个参数routes为空,第三个参数requireMultiplexed为false // callAcquirePooledConnection内部判断是个同步方法 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) //返回连接 return result }
// Nothing in the pool. Figure out what route we'll try next. //如果第一次没从池子中获取到连接,那弄清楚我们接下来要尝试的路由 val routes: List<Route>? val route: Route //第一次进来nextRouteToTry if (nextRouteToTry != null) { // Use a route from a preceding coalesced connection. routes = null route = nextRouteToTry!! nextRouteToTry = null } elseif (routeSelection != null && routeSelection!!.hasNext()) { // Use a route from an existing route selection. routes = null route = routeSelection!!.next() } else { // Compute a new route selection. This is a blocking operation! //计算一个新的routeSelection,这是一个阻塞的操作 var localRouteSelector = routeSelector //如果localRouteSelector为则创建一个 if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } //从routeSelector中获取一个新的routeSelection val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection //获取从routeSelector中的route列表 routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. We have a better chance of matching thanks to connection coalescing. //3、第二次从连接池中获取连接,此时是routes不为空,也就是有一组IP地址。 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) //返回连接 return result } //如果第二次从连接池没拿到,则从routeSelection获取下一个路由,用于创建新的连接。 route = localRouteSelection.next() }
// Connect. Tell the call about the connecting call so async cancels work. // 4、因为第二次没有从连接池获取到,则创建一个新的连接 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { //执行TCP或TCP+TLS握手。 newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3 // different lookups in the connection pool! //5、第三次从连接池中获取连接。这是因为如果已经创建了一个指向相同主机的连接,并且连接放到连接池里去了。 //注意此时requireMultiplexed为true,也就是只适用于HTTP2。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { //获取call当前的connection val result = call.connection!! //保存路由 nextRouteToTry = route //将当前的连接的socket关闭 newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } //如果第三次从连接池中没有获取到连接,那这次创建的连接放入到连接池中,然后把创建的连接赋值给call中的connection synchronized(newConnection) { connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) }