用JAVA实现无等待数据库连接池
我们都知道数据库连接是一种有限和非常昂贵的应用资源,怎样对这些资源进行高效的管理,能有效的改善整个系统的性能和健壮性。数据库连接池正是针对这个问题而提出来的。
数据库连接负责分配、释放和管理数据库连接。使数据库连接可以重复利用,而不是用一次建立一次数据库连接。
基本思路
建立一个容器
每次到这个容器里得到连接,如果为空则建立一个新连接。
当连接使用完后归还给这个容器
这里就有二个难点
1. 容器必需是同步的,线程安全的。
2. 连接怎归还连接池
方案:
针对这二个难点,我们分别提出了二个解决方法
1.使用ConcurrentLinkedQueue实现先进先出队列
ConcurrentLinkedQueue无界线程安全队列介绍
这个类在java.util.concurrent包中,我们来看看官方是怎描述这个类的
一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素.此实现采用了有效的“无等待 (wait-free)”算法
2.动态代理实现连接归还连接池
大家也可以参考刘冬在IBM发表的文章
http://www.ibm.com/developerworks/cn/java/l-connpoolproxy/
接下来我们来看看整体代码
-
-
import java.io.PrintWriter;
-
-
import java.lang.reflect.InvocationHandler;
-
-
import java.lang.reflect.Method;
-
-
import java.lang.reflect.Proxy;
-
-
import java.sql.Connection;
-
-
import java.sql.Driver;
-
-
import java.sql.SQLException;
-
-
import java.util.ArrayList;
-
-
import java.util.List;
-
-
import java.util.Properties;
-
-
import java.util.concurrent.ConcurrentLinkedQueue;
-
-
import java.util.concurrent.atomic.AtomicInteger;
-
-
import java.util.concurrent.atomic.AtomicLong;
-
-
import java.util.concurrent.locks.ReentrantLock;
-
-
import javax.sql.DataSource;
-
-
public class JavaGGDataSource implements DataSource {
-
-
//连接队列
-
-
private ConcurrentLinkedQueue<_Connection> connQueue = new ConcurrentLinkedQueue<_Connection>();
-
-
//存放所有连接容器
-
-
private List<_Connection> conns = new ArrayList<_Connection>();
-
-
-
-
-
-
private int maxActive = -1;// -1为不限制连接数
-
-
-
private int timeout = 1000 * 60 * 60 * 4;// 默认为4小时,即4小时没有任何sql操作就把所有连接重新建立连接
-
-
-
private AtomicInteger connCount = new AtomicInteger();
-
-
//线程锁,主要用于新建连接和清空连接时
-
-
private ReentrantLock lock = new ReentrantLock();
-
-
public void closeAllConnection() {
-
-
}
-
-
/**
-
-
* 归还连接给连接池
-
-
*
-
-
* @param conn
-
-
*@date 2009-8-13
-
-
*@author eric.chan
-
-
*/
-
-
public void offerConnection(_Connection conn) {
-
-
connQueue.offer(conn);
-
-
}
-
-
@Override
-
-
-
return getConnection(user, password);
-
-
}
-
-
/**
-
-
* 从池中得到连接,如果池中没有连接,则建立新的sql连接
-
-
*
-
-
* @param username
-
-
* @param password
-
-
* @author eric.chan
-
-
*/
-
-
@Override
-
-
-
-
checkTimeout();
-
-
_Connection conn = connQueue.poll();
-
-
if (conn == null) {
-
-
if (maxActive > 0 && connCount.get() >= maxActive) {
-
-
for (;;) {// 采用自旋方法 从已满的池中得到一个连接
-
-
conn = connQueue.poll();
-
-
if (conn != null)
-
-
break;
-
-
else
-
-
continue;
-
-
}
-
-
}
-
-
lock.lock();
-
-
try {
-
-
if (maxActive > 0 && connCount.get() >= maxActive) {
-
-
// 处理并发问题
-
-
return getConnection(username, password);
-
-
}
-
-
-
info.put(“user”, username);
-
-
info.put(“password”, password);
-
-
-
conn = new _Connection(conn1, this);
-
-
int c = connCount.incrementAndGet();// 当前连接数加1
-
-
conns.add(conn);
-
-
-
} finally {
-
-
lock.unlock();
-
-
}
-
-
}
-
-
-
return conn.getConnection();
-
-
}
-
-
/**
-
-
* 检查最后一次的连接时间
-
-
*
-
-
* @throws SQLException
-
-
*@date 2009-8-13
-
-
*@author eric.chan
-
-
*/
-
-
-
-
long lt = lastCheckout.get();
-
-
if ((now – lt) > timeout) {
-
-
_Connection conn = null;
-
-
lock.lock();
-
-
try {
-
-
if(connCount.get()==0)return;
-
-
while ((conn = connQueue.poll()) != null) {
-
-
-
conn.close();
-
-
conn = null;
-
-
}
-
-
for(_Connection con:conns){
-
-
con.close();
-
-
}
-
-
conns.clear();
-
-
-
connCount.getAndSet(0);// 重置连接数计数器
-
-
-
} finally {
-
-
lock.unlock();
-
-
}
-
-
}
-
-
}
-
-
/**
-
-
*
-
-
* @return
-
-
*@date 2009-8-13
-
-
*@author eric.chan
-
-
*/
-
-
-
if (driver == null) {
-
-
try {
-
-
-
-
-
-
e.printStackTrace();
-
-
}
-
-
}
-
-
return driver;
-
-
}
-
-
@Override
-
-
-
return null;
-
-
}
-
-
@Override
-
-
-
return 0;
-
-
}
-
-
@Override
-
-
-
}
-
-
@Override
-
-
-
}
-
-
@Override
-
-
-
-
}
-
-
@Override
-
-
-
-
}
-
-
-
return jdbcUrl;
-
-
}
-
-
-
this.jdbcUrl = jdbcUrl;
-
-
}
-
-
-
return user;
-
-
}
-
-
-
this.user = user;
-
-
}
-
-
-
return password;
-
-
}
-
-
-
this.password = password;
-
-
}
-
-
-
return driverClass;
-
-
}
-
-
-
this.driverClass = driverClass;
-
-
}
-
-
public int getTimeout() {
-
-
return timeout;
-
-
}
-
-
public void setTimeout(int timeout) {
-
-
this.timeout = timeout * 1000;
-
-
}
-
-
public void setMaxActive(int maxActive) {
-
-
this.maxActive = maxActive;
-
-
}
-
-
public int getMaxActive() {
-
-
return maxActive;
-
-
}
-
-
}
-
-
/**
-
-
* 数据连接的自封装 ,是java.sql.Connection的一个钩子,主要是处理close方法
-
-
*
-
-
* @author eric
-
-
*
-
-
*/
-
-
-
-
-
private final JavaGGDataSource ds;
-
-
-
this.conn = conn;
-
-
this.ds = ds;
-
-
}
-
-
@Override
-
-
-
-
Object obj = null;
-
-
// 判断是否调用了close的方法,如果调用close方法则把连接置为无用状态
-
-
if (CLOSE_METHOD_NAME.equals(method.getName())) {
-
-
// 归还连接给连接池
-
-
ds.offerConnection(this);
-
-
} else {
-
-
// 运行非close的方法
-
-
obj = method.invoke(conn, args);
-
-
}
-
-
return obj;
-
-
}
-
-
-
// 返回数据库连接conn的接管类,以便截住close方法
-
-
Connection conn2 = (Connection) Proxy.newProxyInstance(conn.getClass().getClassLoader(), new Class[] { Connection.class }, this);
-
-
return conn2;
-
-
}
-
-
-
// 调用真正的close方法,一但调用此方法就直接关闭连接
-
-
if(conn!=null&&!conn.isClosed())
-
-
conn.close();
-
-
}
-
-
}
-
-
-
说白了就是实现调用connection.close方法时我们映射到另一个方法上.
-
-
呵呵,是不是好简单呢,代码没有多复杂。
-
-
这里有一个问题要说明一吓:如果设置的maxActive值小于当前线程总数,那么当并发非常大时会出现资源争夺情况,一吓子cpu就会提高不小,所以建议设为无限制,或大于线程总数的值。
-
-
接下来我们测试测试
-
-
开五十个线程,对同一个表进行select/insert 10000次操作,每次select/insert一条记录
-
-
代码如下:
-
-
-
JavaGGDataSource ds = new JavaGGDataSource();
-
-
-
ds.setJdbcUrl(“jdbc:mysql://192.168.1.6:3306/test”);
-
-
ds.setPassword(“ps”);
-
-
ds.setUsername(“name”);
-
-
ds.setTimeout(300);
-
-
// ds.setMaxActive(60);
-
-
for (int i = 0; i < 50; i++) {
-
-
new GG(ds).start();
-
-
}
-
-
}
-
-
-
JavaGGDataSource ds = null;
-
-
-
public GG(JavaGGDataSource ds) {
-
-
this.ds = ds;
-
-
}
-
-
-
static String selectsql = “select * from testgg where id=?”;
-
-
-
public void run() {
-
-
for (int t = 0; t < 10000; t++) {
-
-
Connection conn = null;
-
-
try {
-
-
conn = ds.getConnection();
-
-
-
//以下为insert
-
-
ps.setInt(1, 133664);
-
-
ps.setString(2, “ddd”);
-
-
ps.executeUpdate();
-
-
//以下为select
-
-
-
while(rs.next()){
-
-
rs.getInt(“id”);
-
-
rs.getInt(“col1″);
-
-
}
-
-
rs.close();
-
-
ps.close();
-
-
-
// TODO Auto-generated catch block
-
-
e.printStackTrace();
-
-
} finally {
-
-
try {
-
-
if (conn != null) {
-
-
// ds.offerConnection(conn);
-
-
conn.close();
-
-
}
-
-
-
e.printStackTrace();
-
-
}
-
-
}
-
-
}
-
-
-
}
-
测试结果
50个线程select 10000*50次结果
Javaggds 406à2156毫秒 连接池有50个连接(和线程数一样)
C3p0 1235à1657毫秒 连接池有500个连接(和设置的最大连接数一样 )
50个线程insert 10000*50次结果
Javaggds 39125à52734 连接池有50个连接(和线程数一样)
C3p0 60000à65141毫秒 连接池有500个连接(和设置的最大连接数一样 )
测试分析:
c3p0是使用同锁或同步块对连接池进行同步的,所以它的时间会控制在一定范围之内
但带来的问题是线程竞争和线程等待。
Javaggds是使用了concurrent包中的无等待算法队列,这个同步是在cpu层面上做的,所以同步的块非常小,大家有兴趣可以看看CAS同步算法。
Hibernate结合
编辑hibernate 加入/修改配置为
<property name=“connection.provider_class”>
com.javagg.datasource.DataSourceConnectionProvider<property>
<property name=“db.driverClass”>com.mysql.jdbc.Driver<property>
<property name=“db.jdbcUrl”>jdbc:mysql://192.168.1.6:3306/test<property>
<property name=“db.username”>name<property>
<property name=“db.password”>password<property>
<property name=“db.datasource”>
com.javagg.datasource.JavaGGDataSource<property>
<property name=“db.maxActive”>-1<property>< 无限制连接数 >
<property name=“db.timeout”>3600<property>< 一小时timeout 单位为秒 >
DataSourceConnectonProvider代码如下:
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.beanutils.BeanUtils;
import org.hibernate.HibernateException;
import org.hibernate.connection.ConnectionProvider;
public class DataSourceConnectionProvider implements ConnectionProvider {
private final static String BASE_KEY = “db.”;
private final static String DATASOURCE_KEY = “db.datasource”;
protected DataSource dataSource;
/*
* (non-Javadoc)
*
* @see
* org.hibernate.connection.ConnectionProvider#configure(java.util.Properties
* )
*/
public void configure(Properties props) throws HibernateException {
initDataSource(props);
}
/*
* (non-Javadoc)
*
* @see org.hibernate.connection.ConnectionProvider#getConnection()
*/
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
/*
* (non-Javadoc)
*
* @see
* org.hibernate.connection.ConnectionProvider#closeConnection(java.sql.
* Connection)
*/
public void closeConnection(Connection conn) throws SQLException {
if (conn != null)
conn.close();
}
/*
* (non-Javadoc)
*
* @see org.hibernate.connection.ConnectionProvider#close()
*/
public void close() throws HibernateException {
if (dataSource != null)
try {
Method mClose = dataSource.getClass().getMethod(“close”);
mClose.invoke(dataSource);
} catch (Exception e) {
throw new HibernateException(e);
}
dataSource = null;
}
/*
* (non-Javadoc)
*
* @see
* org.hibernate.connection.ConnectionProvider#supportsAggressiveRelease()
*/
public boolean supportsAggressiveRelease() {
return false;
}
/**
* Initialize the datasource
*
* @param props
* @throws HibernateException
*/
protected void initDataSource(Properties props) throws HibernateException {
String dataSourceClass = null;
Properties new_props = new Properties();
Iterator keys = props.keySet().iterator();
while (keys.hasNext()) {
String key = (String) keys.next();
if (key.equals(DATASOURCE_KEY)) {
dataSourceClass=props.getProperty(key);
} else if (key.startsWith(BASE_KEY)) {
String value = props.getProperty(key);
new_props.setProperty(key.substring(BASE_KEY.length()), value);
}
}
if (dataSourceClass == null)
throw new HibernateException(“Property ‘db.datasource’ no defined.”);
try {
dataSource = (DataSource) Class.forName(dataSourceClass).newInstance();
BeanUtils.populate(dataSource, new_props);
} catch (Exception e) {
throw new HibernateException(e);
}
}
}
接下来我们测试配置有没有成功
代码如下:
public static void main(String args[]) {
Configuration cfg = new Configuration();
cfg.configure();
SessionFactory sf = cfg.buildSessionFactory();
for (int i = 0; i < 100; i++) {
Session sess = sf.openSession();
TestGGBean pc = new TestGGBean();
pc.setCol1(1111);
pc.setCols(“ddaaaa”);
sess.save(pc);
sess.flush();
sess.close();
}
}
原来如此啊。
有点意思。