欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

HttpClient解读(1)-缓存池、借出

程序员文章站 2022-03-07 13:53:00
...

工作以来在使用java代码调用一个http协议的请求时都是使用的httpClient,但是工作四年多了一直没有好好研究一下httpClient的内部原理,只知道他和数据库的连接池一样也是有缓存池的,数据库缓存池也没有看原理,所以最近有时间看了一下httpClent的内部代码,受益匪浅,顺便记录一下。

httpClient里面使用的缓存池的具体实现类是:org.apache.http.impl.conn.CPool,他的父类是:AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> ,他是基于泛型的,先说一下这三个泛型的意思,泛型的第一个参数就是http请求的服务端的ip+端口的对象,比如www.baidu.com:80,用对象封装起来就是用的HttoRouter,第二个参数是真正的http连接(里面封装了一个socket和连接的具体信息,比如包的大小,等待时间、编码,错误处理机制),第三个参数CPoolEntry是对于第前两个的封装,即一个CPoolEntry包含一个HttpRouter+ManagedHttpClientConnection,这么做的目的是缓存池最后返回的对象是CPoolEntry,只要返回这一个对象就可以获得其他两个对象了。(我写博客一般用倒叙,因为我都是看了一遍代码已经知道了大概的意思,所以不是从最基础的类开始,而是从程序的调用开始的)。于是可以理解为缓存池缓存的是ManagedHttpClientConnection(也就是连接),但时使用PoolEntry包装的,最后返回的是PoolEntry(CpoolEntry是一个具体的实现类而已),获得的key是HttoRouter。

 

缓存池最重要的方法就是获得一个连接了,前面说了他返回的是一个CPoolEntry对象,获得对象的方法是:AbstractConnPool.lease(T, Object, FutureCallback<E>),即借出一个HttpClientConnection,他的key(也就是这里的T)就是一个HttpRoute,但是这里的Object我没有懂是什么意思(有一个httpClient传递的是userToken,我猜测是使用用户登录的时候使用的,有的没有传,我在看代码的时候都假设不传),还有最后的参数Callable程序里面也没有用到(在查看了这个方法的调用方法之后确定的,所有的调用方法都没有传一个callable)。代码如下:注意这里的泛型E就是PoolEntry的意思

public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
	Args.notNull(route, "Route");
	Asserts.check(!this.isShutDown, "Connection pool shut down");
	return new PoolEntryFuture<E>(this.lock, callback) {//这个方法返回的是一个Futrue对象,就是并发编程里面的那个Futrue,从名字上可以看出最终的返回结果应该就是PoolEntry,也就是缓存的对象。
		@Override
		public E getPoolEntry(//这里用了一个模板模式,由当前的CPool类确定最终返回的Entry的类型
				final long timeout,
				final TimeUnit tunit)
					throws InterruptedException, TimeoutException, IOException {
			final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
			onLease(entry);
			return entry;
		}
	};
}
 上面的方法没有返回EntryPool对象,而是返回了一个Future对象,Future对象都是要调用他的get方法(通过再次调用get方法获得最终的PoolEntry),我们看看PoolEntryFuture的代码,尤其是注意他的get方法(获得连接)和cancel(取消任务)方法
abstract class PoolEntryFuture<T> implements Future<T> {//由之前的Cpool确定了这里的泛型T就是CPoolEntry,

    private final Lock lock;//
    private final FutureCallback<T> callback;//这个属性之前说过了,没有调用方传过值,可以认为不存在
    private final Condition condition;//这里的condition要和上面的lock一起看,这里的lock是从Pool(也就是缓存池中)中传过来的,传过来的目的是为了生成一个condition,因为当前获得connection可能因为不能获得而阻塞,所以要生成一个condition
    private volatile boolean cancelled;
    private volatile boolean completed;
    private T result;

    PoolEntryFuture(final Lock lock, final FutureCallback<T> callback) {
        super();
        this.lock = lock;
        this.condition = lock.newCondition();//不能够获得条件
        this.callback = callback;
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        this.lock.lock();
        try {
            if (this.completed) {
                return false;
            }
            this.completed = true;
            this.cancelled = true;
            if (this.callback != null) {
                this.callback.cancelled();
            }
            this.condition.signalAll();
            return true;
        } finally {
            this.lock.unlock();
        }
    }

    @Override
    public boolean isCancelled() {
        return this.cancelled;
    }

    @Override
    public boolean isDone() {
        return this.completed;
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {//获得最终的connection,不过下面调用的超时时间是0,即永远不超时。
        try {
            return get(0, TimeUnit.MILLISECONDS);
        } catch (final TimeoutException ex) {
            throw new ExecutionException(ex);
        }
    }

    @Override
    public T get(
            final long timeout,
            final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        Args.notNull(unit, "Time unit");
        this.lock.lock();//这个lock会导致性能有些低,因为即使是不同的router也会因为这个锁而阻塞,不过是必须的,因为不同的router也是需要从一个Hashmap中先获得对应的RouteSpecificPool,后面有
        try {
            if (this.completed) {//这里出现的原因是可能在进入后这个任务已经被取消了,在cancel方法中会标记this.completed=true
                return this.result;
            }
//在最终的实现里面这个方法可能会因为现在无法提供连接池而阻塞
            this.result = getPoolEntry(timeout, unit);这个就是一个模板,获得最终的缓存对象,在AbstractConnPool类中会有实现,AbstractConnPool中返回的Future是一个匿名内部类,实现了这个方法
            this.completed = true;
            if (this.callback != null) {
                this.callback.completed(this.result);
            }
            return result;
        } catch (final IOException ex) {
            this.completed = true;
            this.result = null;
            if (this.callback != null) {
                this.callback.failed(ex);
            }
            throw new ExecutionException(ex);
        } finally {
            this.lock.unlock();
        }
    }

    protected abstract T getPoolEntry(
            long timeout, TimeUnit unit) throws IOException, InterruptedException, TimeoutException;//模板方法,由缓存池实现,要从缓存池中获得一个具体的Entry

    public boolean await(final Date deadline) throws InterruptedException {//如果现在借不到连接,就要使用这个方法等待,参数是等待的最终时间,等待就是使用的condition的功能
        this.lock.lock();
        try {
            if (this.cancelled) {
                throw new InterruptedException("Operation interrupted");
            }
            final boolean success;
            if (deadline != null) {
                success = this.condition.awaitUntil(deadline);//等待到具体的时间
            } else {
                this.condition.await();//永远等待
                success = true;
            }
            if (this.cancelled) {
                throw new InterruptedException("Operation interrupted");
            }
            return success;
        } finally {
            this.lock.unlock();
        }

    }

    public void wakeup() {//如果现在条件满足了,比如其他的请求现在归还了一个连接,就要唤醒之前等待的自己
        this.lock.lock();
        try {
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

}
 上面的future并没有真正的看到借连接的代码,他的代码就是在上面的模板方法getPoolEntry()中,真正的实现在AbstractConnPool的lease方法中,在第一个段代码中已经可以看到了,里面会调用.AbstractConnPool.getPoolEntryBlocking(T, Object, long, TimeUnit, PoolEntryFuture<E>)方法,即获得一个PoolEntry,如果获得不了就阻塞。看一下这个方法:
private E getPoolEntryBlocking(
		final T route, final Object state,//这里的route也就是key,即是连接到哪里的,state假设都不传
		final long timeout, final TimeUnit tunit,//timeout即指定的超时时间,unit是时间的单位
		final PoolEntryFuture<E> future)//future即调用lease方法返回的额future
			throws IOException, InterruptedException, TimeoutException {

	Date deadline = null;//等待的最终时间,即当前时间+超时时间
	if (timeout > 0) {
		deadline = new Date(System.currentTimeMillis() + tunit.toMillis(timeout));
	}

	this.lock.lock();//防止并发,因为是多个线程从一个缓存中获得连接。
	try {
		final RouteSpecificPool<T, C, E> pool = getPool(route);//根据route先获得route对应的缓存,从这里可以发现缓存池是对应一个route的,这就是与一个PoolingHttpClientConnectionManager的一个配置:每个路由的最大连接数联系到一起了。这个方法下面还有详细的说明
		E entry = null;
		while (entry == null) {//这个是一个循环,因为有可能借不到这个时候就要进入下一个循环
			Asserts.check(!this.isShutDown, "Connection pool shut down");
			for (;;) {//这里也是一个循环,因为可能借到的entry已经过期了
				entry = pool.getFree(state);//借一个,如果借不到就返回null
				if (entry == null) {
					break;
				}
				if (entry.isExpired(System.currentTimeMillis())) {//如果已经过期了,则关闭
					entry.close();
				} else if (this.validateAfterInactivity > 0) {//虽然没有过期,但是可能底层出现了问题,这个时候就要校验一下这个entry是否是可用的,如果不是,则关闭
					if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
						if (!validate(entry)) {
							entry.close();
						}
					}
				}
				if (entry.isClosed()) {
					this.available.remove(entry);//this.available就是所有可用的entry集合
					pool.free(entry, false);//放到对应的pool里面,false表示不能再复用了
				} else {
					break;
				}
			}
			if (entry != null) {//如果借到了,就准备借出去,
				this.available.remove(entry);//表示不再使用了,从可用的里面去掉
				this.leased.add(entry);//添加到借出的集合里面
				onReuse(entry);//一个回调函数,没有任何实现。
				return entry;
			}

			// New connection is needed   如果进入到这里表示没有借到一个连接,此时就要产生一个新的连接了,此时可能受到限制,因为httpClient中有两个限制条件,一个是总的连接的数量,一个是对应于每一个route的连接的数量,如果超过了任何一个就要等待。
			final int maxPerRoute = getMax(route);//查看此route对应的的连接的最大数量
			// Shrink the pool prior to allocating a new connection
			final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);//这段代码我的感觉是为了防止bug出现才设置的,除了这个理由之外我没有想到他的其他目的,如果有人看到这里并且有其他的想法,请留言
			if (excess > 0) {//只有pool.getAllocatedCount() == maxPerRoute的时候才会>0,此时表示已经开辟的连接的数量达到了上限了。此时如果不出意外应该是等待才对的,所以我不搞不懂这里为啥要这么做,我猜测是为了防止程序有bug,
				for (int i = 0; i < excess; i++) {
					final E lastUsed = pool.getLastUsed();
					if (lastUsed == null) {
						break;
					}
					lastUsed.close();
					this.available.remove(lastUsed);
					pool.remove(lastUsed);
				}
			}

			if (pool.getAllocatedCount() < maxPerRoute) {//开辟的数量小于上限,此时要检查总的connection的数量的上限了,如果那个限制没有达到,就可以继续创建一个,如果达到了就要等待,进入condition.await方法
				final int totalUsed = this.leased.size();//已经借出去的
				final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);//maxTotal表示整个连接池的最大的上限,减去借出去的,表示剩下的可以借出去的,在这些里面有的已经被占用了,比如其他的route的连接(不可能是本route的连接,不然早就借走了)
				if (freeCapacity > 0) {//存在剩下的可以借出去的,别忘了不是直接再生成就行,因为这些数量可能被别的route的连接占用了。所以只有在freeCapacity大于0的时候才可以创建
					final int totalAvailable = this.available.size();//available表示可以使用的连接,即已经归还的连接,注意这里一定是其他的route对应的连接,本route如果有的话早就可以借出去了。
					if (totalAvailable > freeCapacity - 1) {//如果totalAvailable,也就是其他的可用的连接的总数量,如果等于总的可以借出去的数量,此时不能再继续生成了,必须要关掉一个其他的route的连接了。
						if (!this.available.isEmpty()) {//此时available一定不是empty,因为freeCapacity至少是1,即totalAvailable大于0,所以一定不是empty
							final E lastUsed = this.available.removeLast();//关掉一个connection<,腾出空间来,用于生成一个新的connection
							lastUsed.close();
							final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());//刚关闭的connection对应的pool中也要移出。
							otherpool.remove(lastUsed);
						}
					}
					final C conn = this.connFactory.create(route);//开辟一个新的连接
					entry = pool.add(conn);//这个方法会将conn变为一个entry,下面有详细的介绍。
					this.leased.add(entry);
					return entry;
				}
			}

			boolean success = false;//进入到这个地方说明既没有借到任何的connection,也没有能够开辟一个新的connection,此时就要等待,将这个任务放入到pool的等待队列并且放入到整体的等待对垒
			try {
				pool.queue(future);
				this.pending.add(future);
				success = future.await(deadline);//等待,并指定最大的等待时间
			} finally {
				// In case of 'success', we were woken up by the connection pool and should now have a connection waiting for us, or else we're shutting down.
				// Just continue in the loop, both cases are checked.
				pool.unqueue(future);
				this.pending.remove(future);
			}
			// check for spurious wakeup vs. timeout
			if (!success && (deadline != null) && (deadline.getTime() <= System.currentTimeMillis())) {//等待超时了,退出循环,报错。
				break;
			}
		}
		throw new TimeoutException("Timeout waiting for connection");//报错。
	} finally {
		this.lock.unlock();
	}
}

 从上面的代码中可以看出缓存池是根据route进行缓存的,在ConnectionPool中有个属性是:private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;,这个就是整体的缓存,他的key是route,value是一个RouteSpecificPool,他就是对应于某个Route的缓存,里面保存了对应route的所有的连接。具体的代码不看了,这个类还是很简单的。在获得一个connection的时候,会优先从route对应的RouteSpecificPool中查看有没有可以使用的,即已经开辟的其他的请求归还的连接,如果有的话判断一下是否已经过期了,如果没有过期还要检验一下是否可是使用,校验的时间默认是5秒(思路是如果connection的最后更新时间+五秒 < 当前的时间,就要进行校验,修改校验时间的方法是:public void setValidateAfterInactivity(final int ms) {}),如果没有借到connection,就要看看能否生成一个新的,如果既没有达到单个route的上线且没有达到总的数量的上线,就可以生成一个新的,否则就等待,等待过程中其他的线程可能会归还一个connection,此时当前的线程就会被唤醒,然后继续进入while循环。   注意上面最终返回的不是直接的HttpClientConnection,而是用一个entry包装的。entry的具体产生,是在上面的 entry = pool.add(conn); 其中pool指的是对应某个route的缓存池,因为最终借出去的是Entry,所以有必要看看这个entry到底是如何产生的,所以我们从这个pool是如何产生的看起:即上面的 getPool(route)方法,

private RouteSpecificPool<T, C, E> getPool(final T route) {
	RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);//从一个map中根据route获取对应的RouteSpecificPool
	if (pool == null) {
		pool = new RouteSpecificPool<T, C, E>(route) {//没有则创建,创建的是加强过的
			@Override
			protected E createEntry(final C conn) {
				return AbstractConnPool.this.createEntry(route, conn);//返回的Entry是调用本类的createEntry方法产生
			}
		};
		this.routeToPool.put(route, pool);
	}
	return pool;
}

 最终调用的还是CPool的createEntry方法:

@Override
protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {
    final String id = Long.toString(COUNTER.getAndIncrement());
    return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.tunit);
}

传入的参数是route和HttpClientConnection,然后用一个Entry包装起来,看一下CpoolEntry,尤其要关注里面的更新时间、过期时间。里面有这几个参数:

private final long created;//这个entry的创建时间
private final long validityDeadline;//这个entry的最终失效时间,这个时间在一个entry刚刚建立的时候确定的,由当前时间+构造方法中的timeToLive确定,这个值确定了一个entry的expiry的上限。
@GuardedBy("this")
private long updated;//上一次更新时间,当使用完这个entry再次归还到连接池的时候,更新这个时间。之前说的校验,就用到了这个时间,如果updated+校验间隔时间<当前的时间,那么就要校验这个entry,
@GuardedBy("this")
private long expiry;//这里是由http连接确定的时间,当使用完一个连接之后,如果这个连接是1.1版本,即keepalive的,那么要更新这个连接的剩余时间,也就是过期时间,当然这个过期时间是不能超过上面的validityDeadline的。这个

 上面的时间updated、expiry是可以更新的,更新的方法:

public synchronized void updateExpiry(final long time, final TimeUnit tunit) {//这里的time是指还剩余的过期时间,由http连接的服务端确定。(暂定,我时这么理解的,但是不确定)
	Args.notNull(tunit, "Time unit");
	this.updated = System.currentTimeMillis();//更新上一次的更新时间
	final long newExpiry;
	if (time > 0) {
		newExpiry = this.updated + tunit.toMillis(time);
	} else {
		newExpiry = Long.MAX_VALUE;
	}
	this.expiry = Math.min(newExpiry, this.validityDeadline);//过期时间,
}

 上面方法的调用是在归还一个connection的时候,参数time就是有tcp连接确定的剩余时间,如果不是1.1的keepliave的,连接马上就关掉了(代码后续会有的)。

 

这篇就写到这里了,主要说明了http连接是池化的、如何存储以及具体的借出一个连接的过程。还没有具体到说明一个httpConnection是什么东西,先不要进,先把整体的框架理清楚再往细了看。

 

相关标签: httpClient