欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java实现数据库连接池简易教程

程序员文章站 2024-03-08 12:00:10
一、引言   池化技术在java中应用的很广泛,简而论之,使用对象池存储某个实例数受限制的实例,开发者从对象池中获取实例,使用完之后再换回对象池,从而在一定程度上减少了系...

一、引言

  池化技术在java中应用的很广泛,简而论之,使用对象池存储某个实例数受限制的实例,开发者从对象池中获取实例,使用完之后再换回对象池,从而在一定程度上减少了系统频繁创建对象销毁对象的开销。java线程池和数据库连接池就是典型的应用,但并非所有的对象都适合拿来池化,对于创建开销比较小的对象拿来池化反而会影响性能,因为维护对象池也需要一定的资源开销,对于创建开销较大,又频繁创建使用的对象,采用池化技术会极大提高性能。

  业界有很多成熟的数据库连接池,比如c3p0,dbcp,proxool以及阿里的druid。很多以及开源,在github可以找到源码,开发者可以根据自己的需求结合各种连接池的特点和性能进行选择。本文仅是为了了解学习池化技术,实现的一个简单的数据库连接池,如有错误,还望批评指正。

二、设计

主要类和接口

.connectionparam - 数据库连接池参数类,负责配置数据库连接以及连接池相关参数。使用builder实现。

    driver url user password - 连接数据库所需

    minconnection - 最小连接数

    maxconnection - 最大连接数

    minidle - 最小空闲连接数

    maxwait - 最长等待时间  

 private final string driver;

 private final string url;

 private final string user;

 private final string password;

 private final int minconnection;

 private final int maxconnection;

 private final int minidle;

 private final long maxwait;

.connectionpool - 数据库连接池

    connectionpool构造方法声明为保护,禁止外部创建,交由connectionpoolfactory统一管理。

    connectionpool实现datasource接口,重新getconnection()方法。

    connectionpool持有两个容器 - 一个queue存储空闲的connection,另一个vector(考虑到同步)存储正在使用的connection。

    当开发者使用数据库连接时,从queue中获取,没有则返回空;使用完成close连接时,则放回vector。

    connectionpool提供了一个简单的基于minidle和maxconnection的动态扩容机制。

 private static final int initial_size = 5;

 private static final string close_method = "close";

 private static logger logger;

 private int size;

 private connectionparam connectionparam;

 private arrayblockingqueue<connection> idleconnectionqueue;

 private vector<connection> busyconnectionvector;

.connectionpoolfactory - 连接池管理类

  connectionpoolfactory持有一个静态concurrenthashmap用来存储连接池对象。

  connectionpoolfactory允许创建多个不同配置不同数据库的连接池。

  开发者首次需要使用特定的名称注册(绑定)连接池,以后每次从指定的连接池获取connection。

  如果连接池不再使用,开发者可以注销(解绑)连接池。

 private static map<string, connectionpool> poolmap = new concurrenthashmap<>();

 public static connection getconnection(string poolname) throws sqlexception {
  namecheck(poolname);
  connectionpool connectionpool = poolmap.get(poolname);
  return connectionpool.getconnection();
 }

 public static void registerconnectionpool(string name, connectionparam connectionparam) {
  registercheck(name);
  poolmap.put(name, new connectionpool(connectionparam));
 }

 // let gc
 public static void unregisterconnectionpool(string name) {
  namecheck(name);
  final connectionpool connectionpool = poolmap.get(name);
  poolmap.remove(name);
  new thread(new runnable() {
   @override
   public void run() {
    connectionpool.clear();
   }
  }).start();
 }

核心代码

   数据库连接池核心代码在于getconnection()方法,通常,开发者处理完数据库操作后,都会调用close()方法,connection此时应该被关闭并释放资源。而在数据库连接池中,用户调用close()方法,不应直接关闭connection,而是要放回池中,重复使用,这里就用到java动态代理机制,getconnection返回的并不是“真正”的connection,而是自定义的代理类(此处使用匿名类),当用户调用close()方法时,进行拦截,放回池中。有关动态代理,可以参看另一篇博客《java动态代理简单应用》

 @override
 public connection getconnection() throws sqlexception {
  try {
   final connection connection = idleconnectionqueue.poll(connectionparam.getmaxwait(), timeunit.milliseconds);
   if (connection == null) {
    logger.info(emptymsg());
    ensurecapacity();
    return null;
   }
   busyconnectionvector.add(connection);
   return (connection) proxy.newproxyinstance(this.getclass().getclassloader(),
     new class[]{connection.class}, new invocationhandler() {
      @override
      public object invoke(object proxy, method method, object[] args) throws throwable {
       if (!method.getname().equals(close_method)) {
        return method.invoke(connection, args);
       } else {
        idleconnectionqueue.offer(connection);
        busyconnectionvector.remove(connection);
        return null;
       }
      }
     });
  } catch (interruptedexception e) {
   e.printstacktrace();
  }
  return null;
 }

二、使用

  首先用户构建数据库连接池参数(connectionparam),包括driver、url、user、password必须项,可以自定义minconnection、maxconnection等可选项,如果不设置,则使用系统默认值,这是使用builder构建含有大量属性的好处,其中包括必须属性和可选属性。然后向connectionpoolfactory使用特定的名称注册连接池,最后通过调用connectionpoolfactory静态工厂方法获取connection。   

 string driver = "com.mysql.jdbc.driver";
  string url = "jdbc:mysql://localhost:3306/test";
  string user = "root";
  string password = "root";

  connectionparam connectionparam = new connectionparam.connectionparambuilder(driver, url, user, password).build();
  connectionpoolfactory.registerconnectionpool("test", connectionparam);
  connection connection = connectionpoolfactory.getconnection("test");


三、代码

.paramconfiguration

package database.config;

import java.io.serializable;

/**
 * database connection parameters
 * created by michael wong on 2016/1/18.
 */
public class paramconfiguration implements serializable {

 public static final int min_connection = 5;

 public static final int max_connection = 50;

 public static final int min_idle = 5;

 public static final long max_wait = 30000;

 private paramconfiguration() {}

}

.builder

package database;

/**
 * builder
 * created by michael wong on 2016/1/18.
 */
public interface builder<t> {

 t build();

}

.connectionparam

package database;

import database.config.paramconfiguration;

/**
 * database connection parameters
 * created by michael wong on 2016/1/18.
 */
public class connectionparam {

 private final string driver;

 private final string url;

 private final string user;

 private final string password;

 private final int minconnection;

 private final int maxconnection;

 private final int minidle;

 private final long maxwait;

 private connectionparam(connectionparambuilder builder) {
  this.driver = builder.driver;
  this.url = builder.url;
  this.user = builder.user;
  this.password = builder.password;
  this.minconnection = builder.minconnection;
  this.maxconnection = builder.maxconnection;
  this.minidle = builder.minidle;
  this.maxwait = builder.maxwait;
 }

 public string getdriver() {
  return this.driver;
 }

 public string geturl() {
  return this.url;
 }

 public string getuser() {
  return this.user;
 }

 public string getpassword() {
  return this.password;
 }

 public int getminconnection() {
  return this.minconnection;
 }

 public int getmaxconnection() {
  return this.maxconnection;
 }

 public int getminidle() {
  return this.minidle;
 }

 public long getmaxwait() {
  return this.maxwait;
 }

 public static class connectionparambuilder implements builder<connectionparam> {

  // required parameters
  private final string driver;

  private final string url;

  private final string user;

  private final string password;

  // optional parameters - initialized to default value
  private int minconnection = paramconfiguration.min_connection;

  private int maxconnection = paramconfiguration.max_connection;

  private int minidle = paramconfiguration.min_idle;

  // getting connection wait time
  private long maxwait = paramconfiguration.max_wait;

  public connectionparambuilder(string driver, string url, string user, string password) {
   this.driver = driver;
   this.url = url;
   this.user = user;
   this.password = password;
  }

  public connectionparambuilder minconnection(int minconnection) {
   this.minconnection = minconnection;
   return this;
  }

  public connectionparambuilder maxconnection(int maxconnection) {
   this.maxconnection = maxconnection;
   return this;
  }

  public connectionparambuilder minidle(int minidle) {
   this.minidle = minidle;
   return this;
  }

  public connectionparambuilder maxwait(int maxwait) {
   this.maxwait = maxwait;
   return this;
  }

  @override
  public connectionparam build() {
   return new connectionparam(this);
  }

 }

}

.connectionpool

package database.factory;

import database.connectionparam;

import javax.sql.datasource;
import java.io.printwriter;
import java.lang.reflect.invocationhandler;
import java.lang.reflect.method;
import java.lang.reflect.proxy;
import java.sql.connection;
import java.sql.drivermanager;
import java.sql.sqlexception;
import java.sql.sqlfeaturenotsupportedexception;
import java.util.vector;
import java.util.concurrent.arrayblockingqueue;
import java.util.concurrent.timeunit;
import java.util.logging.logger;

/**
 * connection pool
 * created by michael wong on 2016/1/18.
 */
public class connectionpool implements datasource {

 private static final int initial_size = 5;

 private static final string close_method = "close";

 private static logger logger;

 private int size;

 private connectionparam connectionparam;

 private arrayblockingqueue<connection> idleconnectionqueue;

 private vector<connection> busyconnectionvector;

 protected connectionpool(connectionparam connectionparam) {
  this.connectionparam = connectionparam;
  int maxconnection = connectionparam.getmaxconnection();
  idleconnectionqueue = new arrayblockingqueue<>(maxconnection);
  busyconnectionvector = new vector<>();
  logger = logger.getlogger(this.getclass().getname());
  initconnection();
 }

 private void initconnection() {
  int minconnection = connectionparam.getminconnection();
  int initialsize = initial_size < minconnection ? minconnection : initial_size;
  try {
   class.forname(connectionparam.getdriver());
   for (int i = 0; i < initialsize + connectionparam.getminconnection(); i++) {
    idleconnectionqueue.put(newconnection());
    size++;
   }
  } catch (exception e) {
   throw new exceptionininitializererror(e);
  }
 }

 @override
 public connection getconnection() throws sqlexception {
  try {
   final connection connection = idleconnectionqueue.poll(connectionparam.getmaxwait(), timeunit.milliseconds);
   if (connection == null) {
    logger.info(emptymsg());
    ensurecapacity();
    return null;
   }
   busyconnectionvector.add(connection);
   return (connection) proxy.newproxyinstance(this.getclass().getclassloader(),
     new class[]{connection.class}, new invocationhandler() {
      @override
      public object invoke(object proxy, method method, object[] args) throws throwable {
       if (!method.getname().equals(close_method)) {
        return method.invoke(connection, args);
       } else {
        idleconnectionqueue.offer(connection);
        busyconnectionvector.remove(connection);
        return null;
       }
      }
     });
  } catch (interruptedexception e) {
   e.printstacktrace();
  }
  return null;
 }

 private connection newconnection() throws sqlexception {
  string url = connectionparam.geturl();
  string user = connectionparam.getuser();
  string password = connectionparam.getpassword();
  return drivermanager.getconnection(url, user, password);
 }

 protected int size() {
  return size;
 }

 protected int idleconnectionquantity() {
  return idleconnectionqueue.size();
 }

 protected int busyconnectionquantity() {
  return busyconnectionvector.size();
 }

 private void ensurecapacity() throws sqlexception {
  int minidle = connectionparam.getminidle();
  int maxconnection = connectionparam.getmaxconnection();
  int newcapacity = size + minidle;
  newcapacity = newcapacity > maxconnection ? maxconnection : newcapacity;
  int growcount = 0;
  if (size < newcapacity) {
   try {
    for (int i = 0; i < newcapacity - size; i++) {
     idleconnectionqueue.put(newconnection());
     growcount++;
    }
   } catch (interruptedexception e) {
    e.printstacktrace();
   }
  }
  size = size + growcount;
 }

 protected void clear() {
  try {
   while (size-- > 0) {
    connection connection = idleconnectionqueue.take();
    connection.close();
   }
  } catch (interruptedexception | sqlexception e) {
   e.printstacktrace();
  }
 }

 private string emptymsg() {
  return "database is busy, please wait...";
 }

 @override
 public connection getconnection(string username, string password) throws sqlexception {
  return null;
 }

 @override
 public printwriter getlogwriter() throws sqlexception {
  return null;
 }

 @override
 public void setlogwriter(printwriter out) throws sqlexception {

 }

 @override
 public void setlogintimeout(int seconds) throws sqlexception {

 }

 @override
 public int getlogintimeout() throws sqlexception {
  return 0;
 }

 @override
 public logger getparentlogger() throws sqlfeaturenotsupportedexception {
  return null;
 }

 @override
 public <t> t unwrap(class<t> iface) throws sqlexception {
  return null;
 }

 @override
 public boolean iswrapperfor(class<?> iface) throws sqlexception {
  return false;
 }

}

.connectionpoolfactory

package database.factory;

import database.connectionparam;

import java.sql.connection;
import java.sql.sqlexception;
import java.util.map;
import java.util.concurrent.concurrenthashmap;

/**
 * connection pool factory
 * created by michael wong on 2016/1/18.
 */
public class connectionpoolfactory {

 private connectionpoolfactory() {}

 private static map<string, connectionpool> poolmap = new concurrenthashmap<>();

 public static connection getconnection(string poolname) throws sqlexception {
  namecheck(poolname);
  connectionpool connectionpool = poolmap.get(poolname);
  return connectionpool.getconnection();
 }

 public static void registerconnectionpool(string name, connectionparam connectionparam) {
  registercheck(name);
  poolmap.put(name, new connectionpool(connectionparam));
 }

 // let gc
 public static void unregisterconnectionpool(string name) {
  namecheck(name);
  final connectionpool connectionpool = poolmap.get(name);
  poolmap.remove(name);
  new thread(new runnable() {
   @override
   public void run() {
    connectionpool.clear();
   }
  }).start();
 }

 public static int size(string poolname) {
  namecheck(poolname);
  return poolmap.get(poolname).size();
 }

 public static int getidleconnectionquantity(string poolname) {
  namecheck(poolname);
  return poolmap.get(poolname).idleconnectionquantity();
 }

 public static int getbusyconnectionquantity(string poolname) {
  namecheck(poolname);
  return poolmap.get(poolname).busyconnectionquantity();
 }

 private static void registercheck(string name) {
  if (name == null) {
   throw new illegalargumentexception(nullname());
  }
 }

 private static void namecheck(string name) {
  if (name == null) {
   throw new illegalargumentexception(nullname());
  }
  if (!poolmap.containskey(name)) {
   throw new illegalargumentexception(notexists(name));
  }
 }

 private static string nullname() {
  return "pool name must not be null";
 }

 private static string notexists(string name) {
  return "connection pool named " + name + " does not exists";
 }

}

四、测试
junit单元测试

package database.factory;

import database.connectionparam;
import org.junit.test;

import java.sql.connection;
import java.sql.sqlexception;
import java.util.arraylist;
import java.util.list;

import static org.junit.assert.*;

/**
 * connectionpoolfactory test
 * created by michael wong on 2016/1/20.
 */
public class connectionpoolfactorytest {

 @test
 public void testgetconnection() throws sqlexception {

  string driver = "com.mysql.jdbc.driver";
  string url = "jdbc:mysql://localhost:3306/test";
  string user = "root";
  string password = "root";

  connectionparam connectionparam = new connectionparam.connectionparambuilder(driver, url, user, password).build();
  connectionpoolfactory.registerconnectionpool("test", connectionparam);

  list<connection> connectionlist = new arraylist<>();

  for(int i = 0; i < 12; i++) {
   connectionlist.add(connectionpoolfactory.getconnection("test"));
  }

  print();

  close(connectionlist);

  print();

  connectionlist.clear();

  for(int i = 0; i < 12; i++) {
   connectionlist.add(connectionpoolfactory.getconnection("test"));
  }

  print();

  close(connectionlist);

  connectionpoolfactory.unregisterconnectionpool("test");

 }

 @test(expected = illegalargumentexception.class)
 public void testexception() {
  try {
   connectionpoolfactory.getconnection("test");
  } catch (sqlexception e) {
   e.printstacktrace();
  }
 }

 private void close(list<connection> connectionlist) throws sqlexception {
  for(connection conn : connectionlist) {
   if (conn != null) {
    conn.close();
   }
  }
 }

 private void print() {
  system.out.println("idle: " + connectionpoolfactory.getidleconnectionquantity("test"));
  system.out.println("busy: " + connectionpoolfactory.getbusyconnectionquantity("test"));
  system.out.println("size: " + connectionpoolfactory.size("test"));
 }

}

以上就是本文的全部内容,希望对大家的学习有所帮助。