Java实现数据库连接池简易教程
一、引言
池化技术在java中应用的很广泛,简而论之,使用对象池存储某个实例数受限制的实例,开发者从对象池中获取实例,使用完之后再换回对象池,从而在一定程度上减少了系统频繁创建对象销毁对象的开销。java线程池和数据库连接池就是典型的应用,但并非所有的对象都适合拿来池化,对于创建开销比较小的对象拿来池化反而会影响性能,因为维护对象池也需要一定的资源开销,对于创建开销较大,又频繁创建使用的对象,采用池化技术会极大提高性能。
业界有很多成熟的数据库连接池,比如c3p0,dbcp,proxool以及阿里的druid。很多以及开源,在github可以找到源码,开发者可以根据自己的需求结合各种连接池的特点和性能进行选择。本文仅是为了了解学习池化技术,实现的一个简单的数据库连接池,如有错误,还望批评指正。
二、设计
主要类和接口
.connectionparam - 数据库连接池参数类,负责配置数据库连接以及连接池相关参数。使用builder实现。
driver url user password - 连接数据库所需
minconnection - 最小连接数
maxconnection - 最大连接数
minidle - 最小空闲连接数
maxwait - 最长等待时间
private final string driver; private final string url; private final string user; private final string password; private final int minconnection; private final int maxconnection; private final int minidle; private final long maxwait;
.connectionpool - 数据库连接池
connectionpool构造方法声明为保护,禁止外部创建,交由connectionpoolfactory统一管理。
connectionpool实现datasource接口,重新getconnection()方法。
connectionpool持有两个容器 - 一个queue存储空闲的connection,另一个vector(考虑到同步)存储正在使用的connection。
当开发者使用数据库连接时,从queue中获取,没有则返回空;使用完成close连接时,则放回vector。
connectionpool提供了一个简单的基于minidle和maxconnection的动态扩容机制。
private static final int initial_size = 5; private static final string close_method = "close"; private static logger logger; private int size; private connectionparam connectionparam; private arrayblockingqueue<connection> idleconnectionqueue; private vector<connection> busyconnectionvector;
.connectionpoolfactory - 连接池管理类
connectionpoolfactory持有一个静态concurrenthashmap用来存储连接池对象。
connectionpoolfactory允许创建多个不同配置不同数据库的连接池。
开发者首次需要使用特定的名称注册(绑定)连接池,以后每次从指定的连接池获取connection。
如果连接池不再使用,开发者可以注销(解绑)连接池。
private static map<string, connectionpool> poolmap = new concurrenthashmap<>(); public static connection getconnection(string poolname) throws sqlexception { namecheck(poolname); connectionpool connectionpool = poolmap.get(poolname); return connectionpool.getconnection(); } public static void registerconnectionpool(string name, connectionparam connectionparam) { registercheck(name); poolmap.put(name, new connectionpool(connectionparam)); } // let gc public static void unregisterconnectionpool(string name) { namecheck(name); final connectionpool connectionpool = poolmap.get(name); poolmap.remove(name); new thread(new runnable() { @override public void run() { connectionpool.clear(); } }).start(); }
核心代码
数据库连接池核心代码在于getconnection()方法,通常,开发者处理完数据库操作后,都会调用close()方法,connection此时应该被关闭并释放资源。而在数据库连接池中,用户调用close()方法,不应直接关闭connection,而是要放回池中,重复使用,这里就用到java动态代理机制,getconnection返回的并不是“真正”的connection,而是自定义的代理类(此处使用匿名类),当用户调用close()方法时,进行拦截,放回池中。有关动态代理,可以参看另一篇博客《java动态代理简单应用》
@override public connection getconnection() throws sqlexception { try { final connection connection = idleconnectionqueue.poll(connectionparam.getmaxwait(), timeunit.milliseconds); if (connection == null) { logger.info(emptymsg()); ensurecapacity(); return null; } busyconnectionvector.add(connection); return (connection) proxy.newproxyinstance(this.getclass().getclassloader(), new class[]{connection.class}, new invocationhandler() { @override public object invoke(object proxy, method method, object[] args) throws throwable { if (!method.getname().equals(close_method)) { return method.invoke(connection, args); } else { idleconnectionqueue.offer(connection); busyconnectionvector.remove(connection); return null; } } }); } catch (interruptedexception e) { e.printstacktrace(); } return null; }
二、使用
首先用户构建数据库连接池参数(connectionparam),包括driver、url、user、password必须项,可以自定义minconnection、maxconnection等可选项,如果不设置,则使用系统默认值,这是使用builder构建含有大量属性的好处,其中包括必须属性和可选属性。然后向connectionpoolfactory使用特定的名称注册连接池,最后通过调用connectionpoolfactory静态工厂方法获取connection。
string driver = "com.mysql.jdbc.driver"; string url = "jdbc:mysql://localhost:3306/test"; string user = "root"; string password = "root"; connectionparam connectionparam = new connectionparam.connectionparambuilder(driver, url, user, password).build(); connectionpoolfactory.registerconnectionpool("test", connectionparam); connection connection = connectionpoolfactory.getconnection("test");
三、代码
.paramconfiguration
package database.config; import java.io.serializable; /** * database connection parameters * created by michael wong on 2016/1/18. */ public class paramconfiguration implements serializable { public static final int min_connection = 5; public static final int max_connection = 50; public static final int min_idle = 5; public static final long max_wait = 30000; private paramconfiguration() {} }
.builder
package database; /** * builder * created by michael wong on 2016/1/18. */ public interface builder<t> { t build(); }
.connectionparam
package database; import database.config.paramconfiguration; /** * database connection parameters * created by michael wong on 2016/1/18. */ public class connectionparam { private final string driver; private final string url; private final string user; private final string password; private final int minconnection; private final int maxconnection; private final int minidle; private final long maxwait; private connectionparam(connectionparambuilder builder) { this.driver = builder.driver; this.url = builder.url; this.user = builder.user; this.password = builder.password; this.minconnection = builder.minconnection; this.maxconnection = builder.maxconnection; this.minidle = builder.minidle; this.maxwait = builder.maxwait; } public string getdriver() { return this.driver; } public string geturl() { return this.url; } public string getuser() { return this.user; } public string getpassword() { return this.password; } public int getminconnection() { return this.minconnection; } public int getmaxconnection() { return this.maxconnection; } public int getminidle() { return this.minidle; } public long getmaxwait() { return this.maxwait; } public static class connectionparambuilder implements builder<connectionparam> { // required parameters private final string driver; private final string url; private final string user; private final string password; // optional parameters - initialized to default value private int minconnection = paramconfiguration.min_connection; private int maxconnection = paramconfiguration.max_connection; private int minidle = paramconfiguration.min_idle; // getting connection wait time private long maxwait = paramconfiguration.max_wait; public connectionparambuilder(string driver, string url, string user, string password) { this.driver = driver; this.url = url; this.user = user; this.password = password; } public connectionparambuilder minconnection(int minconnection) { this.minconnection = minconnection; return this; } public connectionparambuilder maxconnection(int maxconnection) { this.maxconnection = maxconnection; return this; } public connectionparambuilder minidle(int minidle) { this.minidle = minidle; return this; } public connectionparambuilder maxwait(int maxwait) { this.maxwait = maxwait; return this; } @override public connectionparam build() { return new connectionparam(this); } } }
.connectionpool
package database.factory; import database.connectionparam; import javax.sql.datasource; import java.io.printwriter; import java.lang.reflect.invocationhandler; import java.lang.reflect.method; import java.lang.reflect.proxy; import java.sql.connection; import java.sql.drivermanager; import java.sql.sqlexception; import java.sql.sqlfeaturenotsupportedexception; import java.util.vector; import java.util.concurrent.arrayblockingqueue; import java.util.concurrent.timeunit; import java.util.logging.logger; /** * connection pool * created by michael wong on 2016/1/18. */ public class connectionpool implements datasource { private static final int initial_size = 5; private static final string close_method = "close"; private static logger logger; private int size; private connectionparam connectionparam; private arrayblockingqueue<connection> idleconnectionqueue; private vector<connection> busyconnectionvector; protected connectionpool(connectionparam connectionparam) { this.connectionparam = connectionparam; int maxconnection = connectionparam.getmaxconnection(); idleconnectionqueue = new arrayblockingqueue<>(maxconnection); busyconnectionvector = new vector<>(); logger = logger.getlogger(this.getclass().getname()); initconnection(); } private void initconnection() { int minconnection = connectionparam.getminconnection(); int initialsize = initial_size < minconnection ? minconnection : initial_size; try { class.forname(connectionparam.getdriver()); for (int i = 0; i < initialsize + connectionparam.getminconnection(); i++) { idleconnectionqueue.put(newconnection()); size++; } } catch (exception e) { throw new exceptionininitializererror(e); } } @override public connection getconnection() throws sqlexception { try { final connection connection = idleconnectionqueue.poll(connectionparam.getmaxwait(), timeunit.milliseconds); if (connection == null) { logger.info(emptymsg()); ensurecapacity(); return null; } busyconnectionvector.add(connection); return (connection) proxy.newproxyinstance(this.getclass().getclassloader(), new class[]{connection.class}, new invocationhandler() { @override public object invoke(object proxy, method method, object[] args) throws throwable { if (!method.getname().equals(close_method)) { return method.invoke(connection, args); } else { idleconnectionqueue.offer(connection); busyconnectionvector.remove(connection); return null; } } }); } catch (interruptedexception e) { e.printstacktrace(); } return null; } private connection newconnection() throws sqlexception { string url = connectionparam.geturl(); string user = connectionparam.getuser(); string password = connectionparam.getpassword(); return drivermanager.getconnection(url, user, password); } protected int size() { return size; } protected int idleconnectionquantity() { return idleconnectionqueue.size(); } protected int busyconnectionquantity() { return busyconnectionvector.size(); } private void ensurecapacity() throws sqlexception { int minidle = connectionparam.getminidle(); int maxconnection = connectionparam.getmaxconnection(); int newcapacity = size + minidle; newcapacity = newcapacity > maxconnection ? maxconnection : newcapacity; int growcount = 0; if (size < newcapacity) { try { for (int i = 0; i < newcapacity - size; i++) { idleconnectionqueue.put(newconnection()); growcount++; } } catch (interruptedexception e) { e.printstacktrace(); } } size = size + growcount; } protected void clear() { try { while (size-- > 0) { connection connection = idleconnectionqueue.take(); connection.close(); } } catch (interruptedexception | sqlexception e) { e.printstacktrace(); } } private string emptymsg() { return "database is busy, please wait..."; } @override public connection getconnection(string username, string password) throws sqlexception { return null; } @override public printwriter getlogwriter() throws sqlexception { return null; } @override public void setlogwriter(printwriter out) throws sqlexception { } @override public void setlogintimeout(int seconds) throws sqlexception { } @override public int getlogintimeout() throws sqlexception { return 0; } @override public logger getparentlogger() throws sqlfeaturenotsupportedexception { return null; } @override public <t> t unwrap(class<t> iface) throws sqlexception { return null; } @override public boolean iswrapperfor(class<?> iface) throws sqlexception { return false; } }
.connectionpoolfactory
package database.factory; import database.connectionparam; import java.sql.connection; import java.sql.sqlexception; import java.util.map; import java.util.concurrent.concurrenthashmap; /** * connection pool factory * created by michael wong on 2016/1/18. */ public class connectionpoolfactory { private connectionpoolfactory() {} private static map<string, connectionpool> poolmap = new concurrenthashmap<>(); public static connection getconnection(string poolname) throws sqlexception { namecheck(poolname); connectionpool connectionpool = poolmap.get(poolname); return connectionpool.getconnection(); } public static void registerconnectionpool(string name, connectionparam connectionparam) { registercheck(name); poolmap.put(name, new connectionpool(connectionparam)); } // let gc public static void unregisterconnectionpool(string name) { namecheck(name); final connectionpool connectionpool = poolmap.get(name); poolmap.remove(name); new thread(new runnable() { @override public void run() { connectionpool.clear(); } }).start(); } public static int size(string poolname) { namecheck(poolname); return poolmap.get(poolname).size(); } public static int getidleconnectionquantity(string poolname) { namecheck(poolname); return poolmap.get(poolname).idleconnectionquantity(); } public static int getbusyconnectionquantity(string poolname) { namecheck(poolname); return poolmap.get(poolname).busyconnectionquantity(); } private static void registercheck(string name) { if (name == null) { throw new illegalargumentexception(nullname()); } } private static void namecheck(string name) { if (name == null) { throw new illegalargumentexception(nullname()); } if (!poolmap.containskey(name)) { throw new illegalargumentexception(notexists(name)); } } private static string nullname() { return "pool name must not be null"; } private static string notexists(string name) { return "connection pool named " + name + " does not exists"; } }
四、测试
junit单元测试
package database.factory; import database.connectionparam; import org.junit.test; import java.sql.connection; import java.sql.sqlexception; import java.util.arraylist; import java.util.list; import static org.junit.assert.*; /** * connectionpoolfactory test * created by michael wong on 2016/1/20. */ public class connectionpoolfactorytest { @test public void testgetconnection() throws sqlexception { string driver = "com.mysql.jdbc.driver"; string url = "jdbc:mysql://localhost:3306/test"; string user = "root"; string password = "root"; connectionparam connectionparam = new connectionparam.connectionparambuilder(driver, url, user, password).build(); connectionpoolfactory.registerconnectionpool("test", connectionparam); list<connection> connectionlist = new arraylist<>(); for(int i = 0; i < 12; i++) { connectionlist.add(connectionpoolfactory.getconnection("test")); } print(); close(connectionlist); print(); connectionlist.clear(); for(int i = 0; i < 12; i++) { connectionlist.add(connectionpoolfactory.getconnection("test")); } print(); close(connectionlist); connectionpoolfactory.unregisterconnectionpool("test"); } @test(expected = illegalargumentexception.class) public void testexception() { try { connectionpoolfactory.getconnection("test"); } catch (sqlexception e) { e.printstacktrace(); } } private void close(list<connection> connectionlist) throws sqlexception { for(connection conn : connectionlist) { if (conn != null) { conn.close(); } } } private void print() { system.out.println("idle: " + connectionpoolfactory.getidleconnectionquantity("test")); system.out.println("busy: " + connectionpoolfactory.getbusyconnectionquantity("test")); system.out.println("size: " + connectionpoolfactory.size("test")); } }
以上就是本文的全部内容,希望对大家的学习有所帮助。