首页 > java > 用JAVA实现无等待数据库连接池

用JAVA实现无等待数据库连接池

2010年1月9日
208 views 评论 发表评论

我们都知道数据库连接是一种有限和非常昂贵的应用资源,怎样对这些资源进行高效的管理,能有效的改善整个系统的性能和健壮性。数据库连接池正是针对这个问题而提出来的。

数据库连接负责分配、释放和管理数据库连接。使数据库连接可以重复利用,而不是用一次建立一次数据库连接。

基本思路

建立一个容器

每次到这个容器里得到连接,如果为空则建立一个新连接。

当连接使用完后归还给这个容器

这里就有二个难点

1.  容器必需是同步的,线程安全的。

2.  连接怎归还连接池

方案:

针对这二个难点,我们分别提出了二个解决方法

1.使用ConcurrentLinkedQueue实现先进先出队列

ConcurrentLinkedQueue无界线程安全队列介绍

这个类在java.util.concurrent包中,我们来看看官方是怎描述这个类的
一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素.此实现采用了有效的“无等待 (wait-free)”算法

2.动态代理实现连接归还连接池

大家也可以参考刘冬在IBM发表的文章

http://www.ibm.com/developerworks/cn/java/l-connpoolproxy/

接下来我们来看看整体代码

  1.  
  2. import java.io.PrintWriter;
  3.  
  4. import java.lang.reflect.InvocationHandler;
  5.  
  6. import java.lang.reflect.Method;
  7.  
  8. import java.lang.reflect.Proxy;
  9.  
  10. import java.sql.Connection;
  11.  
  12. import java.sql.Driver;
  13.  
  14. import java.sql.SQLException;
  15.  
  16. import java.util.ArrayList;
  17.  
  18. import java.util.List;
  19.  
  20. import java.util.Properties;
  21.  
  22. import java.util.concurrent.ConcurrentLinkedQueue;
  23.  
  24. import java.util.concurrent.atomic.AtomicInteger;
  25.  
  26. import java.util.concurrent.atomic.AtomicLong;
  27.  
  28. import java.util.concurrent.locks.ReentrantLock;
  29.  
  30. import javax.sql.DataSource;
  31.  
  32. public class JavaGGDataSource implements DataSource {
  33.  
  34. //连接队列
  35.  
  36. private ConcurrentLinkedQueue<_Connection> connQueue = new ConcurrentLinkedQueue<_Connection>();
  37.  
  38. //存放所有连接容器
  39.  
  40. private List<_Connection> conns = new ArrayList<_Connection>();
  41.  
  42. private Driver driver = null;
  43.  
  44. private String jdbcUrl = null;
  45.  
  46. private String user = null;
  47.  
  48. private String password = null;
  49.  
  50. private int maxActive = -1;// -1为不限制连接数
  51.  
  52. private String driverClass = null;
  53.  
  54. private int timeout = 1000 * 60 * 60 * 4;// 默认为4小时,即4小时没有任何sql操作就把所有连接重新建立连接
  55.  
  56. private AtomicLong lastCheckout = new AtomicLong(System.currentTimeMillis());
  57.  
  58. private AtomicInteger connCount = new AtomicInteger();
  59.  
  60. //线程锁,主要用于新建连接和清空连接时
  61.  
  62. private ReentrantLock lock = new ReentrantLock();
  63.  
  64. public void closeAllConnection() {
  65.  
  66. }
  67.  
  68. /**
  69.  
  70. * 归还连接给连接池
  71.  
  72. *
  73.  
  74. * @param conn
  75.  
  76. *@date 2009-8-13
  77.  
  78. *@author eric.chan
  79.  
  80. */
  81.  
  82. public void offerConnection(_Connection conn) {
  83.  
  84. connQueue.offer(conn);
  85.  
  86. }
  87.  
  88. @Override
  89.  
  90. public Connection getConnection() throws SQLException {
  91.  
  92. return getConnection(user, password);
  93.  
  94. }
  95.  
  96. /**
  97.  
  98. * 从池中得到连接,如果池中没有连接,则建立新的sql连接
  99.  
  100. *
  101.  
  102. * @param username
  103.  
  104. * @param password
  105.  
  106. * @author eric.chan
  107.  
  108. */
  109.  
  110. @Override
  111.  
  112. public Connection getConnection(String username, String password)
  113.  
  114. throws SQLException {
  115.  
  116. checkTimeout();
  117.  
  118. _Connection conn = connQueue.poll();
  119.  
  120. if (conn == null) {
  121.  
  122. if (maxActive > 0 && connCount.get() >= maxActive) {
  123.  
  124. for (;;) {// 采用自旋方法 从已满的池中得到一个连接
  125.  
  126. conn = connQueue.poll();
  127.  
  128. if (conn != null)
  129.  
  130. break;
  131.  
  132. else
  133.  
  134. continue;
  135.  
  136. }
  137.  
  138. }
  139.  
  140. lock.lock();
  141.  
  142. try {
  143.  
  144. if (maxActive > 0 && connCount.get() >= maxActive) {
  145.  
  146. // 处理并发问题
  147.  
  148. return getConnection(username, password);
  149.  
  150. }
  151.  
  152. Properties info = new Properties();
  153.  
  154. info.put(“user”, username);
  155.  
  156. info.put(“password”, password);
  157.  
  158. Connection conn1 = loadDriver().connect(jdbcUrl, info);
  159.  
  160. conn = new _Connection(conn1, this);
  161.  
  162. int c = connCount.incrementAndGet();// 当前连接数加1
  163.  
  164. conns.add(conn);
  165.  
  166. System.out.println(“info : init no. ” + c + ” connectioned”);
  167.  
  168. } finally {
  169.  
  170. lock.unlock();
  171.  
  172. }
  173.  
  174. }
  175.  
  176. lastCheckout.getAndSet(System.currentTimeMillis());
  177.  
  178. return conn.getConnection();
  179.  
  180. }
  181.  
  182. /**
  183.  
  184. * 检查最后一次的连接时间
  185.  
  186. *
  187.  
  188. * @throws SQLException
  189.  
  190. *@date 2009-8-13
  191.  
  192. *@author eric.chan
  193.  
  194. */
  195.  
  196. private void checkTimeout() throws SQLException {
  197.  
  198. long now = System.currentTimeMillis();
  199.  
  200. long lt = lastCheckout.get();
  201.  
  202. if ((now – lt) > timeout) {
  203.  
  204. _Connection conn = null;
  205.  
  206. lock.lock();
  207.  
  208. try {
  209.  
  210. if(connCount.get()==0)return;
  211.  
  212. while ((conn = connQueue.poll()) != null) {
  213.  
  214. System.out.println(“connection ” + conn + ” close “);
  215.  
  216. conn.close();
  217.  
  218. conn = null;
  219.  
  220. }
  221.  
  222. for(_Connection con:conns){
  223.  
  224. con.close();
  225.  
  226. }
  227.  
  228. conns.clear();
  229.  
  230. System.out.println(“info : reset all connections”);
  231.  
  232. connCount.getAndSet(0);// 重置连接数计数器
  233.  
  234. lastCheckout.getAndSet(System.currentTimeMillis());
  235.  
  236. } finally {
  237.  
  238. lock.unlock();
  239.  
  240. }
  241.  
  242. }
  243.  
  244. }
  245.  
  246. /**
  247.  
  248. *
  249.  
  250. * @return
  251.  
  252. *@date 2009-8-13
  253.  
  254. *@author eric.chan
  255.  
  256. */
  257.  
  258. private Driver loadDriver() {
  259.  
  260. if (driver == null) {
  261.  
  262. try {
  263.  
  264. driver = (Driver) Class.forName(driverClass).newInstance();
  265.  
  266.  
  267. System.out.println(“error : can not find driver class ” + driverClass);
  268.  
  269. } catch (Exception e) {
  270.  
  271. e.printStackTrace();
  272.  
  273. }
  274.  
  275. }
  276.  
  277. return driver;
  278.  
  279. }
  280.  
  281. @Override
  282.  
  283. public PrintWriter getLogWriter() throws SQLException {
  284.  
  285. return null;
  286.  
  287. }
  288.  
  289. @Override
  290.  
  291. public int getLoginTimeout() throws SQLException {
  292.  
  293. return 0;
  294.  
  295. }
  296.  
  297. @Override
  298.  
  299. public void setLogWriter(PrintWriter out) throws SQLException {
  300.  
  301. }
  302.  
  303. @Override
  304.  
  305. public void setLoginTimeout(int seconds) throws SQLException {
  306.  
  307. }
  308.  
  309. @Override
  310.  
  311. public boolean isWrapperFor(Class iface) throws SQLException {
  312.  
  313. throw new SQLException(“no Implemented isWrapperFor method”);
  314.  
  315. }
  316.  
  317. @Override
  318.  
  319. public T unwrap(Class iface) throws SQLException {
  320.  
  321. throw new SQLException(“no Implemented unwrap method”);
  322.  
  323. }
  324.  
  325. public String getJdbcUrl() {
  326.  
  327. return jdbcUrl;
  328.  
  329. }
  330.  
  331. public void setJdbcUrl(String jdbcUrl) {
  332.  
  333. this.jdbcUrl = jdbcUrl;
  334.  
  335. }
  336.  
  337. public String getUsername() {
  338.  
  339. return user;
  340.  
  341. }
  342.  
  343. public void setUsername(String user) {
  344.  
  345. this.user = user;
  346.  
  347. }
  348.  
  349. public String getPassword() {
  350.  
  351. return password;
  352.  
  353. }
  354.  
  355. public void setPassword(String password) {
  356.  
  357. this.password = password;
  358.  
  359. }
  360.  
  361. public String getDriverClass() {
  362.  
  363. return driverClass;
  364.  
  365. }
  366.  
  367. public void setDriverClass(String driverClass) {
  368.  
  369. this.driverClass = driverClass;
  370.  
  371. }
  372.  
  373. public int getTimeout() {
  374.  
  375. return timeout;
  376.  
  377. }
  378.  
  379. public void setTimeout(int timeout) {
  380.  
  381. this.timeout = timeout * 1000;
  382.  
  383. }
  384.  
  385. public void setMaxActive(int maxActive) {
  386.  
  387. this.maxActive = maxActive;
  388.  
  389. }
  390.  
  391. public int getMaxActive() {
  392.  
  393. return maxActive;
  394.  
  395. }
  396.  
  397. }
  398.  
  399. /**
  400.  
  401. * 数据连接的自封装 ,是java.sql.Connection的一个钩子,主要是处理close方法
  402.  
  403. *
  404.  
  405. * @author eric
  406.  
  407. *
  408.  
  409. */
  410.  
  411. class _Connection implements InvocationHandler {
  412.  
  413. private final static String CLOSE_METHOD_NAME = “close”;
  414.  
  415. private final Connection conn;
  416.  
  417. private final JavaGGDataSource ds;
  418.  
  419. _Connection(Connection conn, JavaGGDataSource ds) {
  420.  
  421. this.conn = conn;
  422.  
  423. this.ds = ds;
  424.  
  425. }
  426.  
  427. @Override
  428.  
  429. public Object invoke(Object proxy, Method method, Object[] args)
  430.  
  431. throws Throwable {
  432.  
  433. Object obj = null;
  434.  
  435. // 判断是否调用了close的方法,如果调用close方法则把连接置为无用状态
  436.  
  437. if (CLOSE_METHOD_NAME.equals(method.getName())) {
  438.  
  439. // 归还连接给连接池
  440.  
  441. ds.offerConnection(this);
  442.  
  443. } else {
  444.  
  445. // 运行非close的方法
  446.  
  447. obj = method.invoke(conn, args);
  448.  
  449. }
  450.  
  451. return obj;
  452.  
  453. }
  454.  
  455. public Connection getConnection() {
  456.  
  457. // 返回数据库连接conn的接管类,以便截住close方法
  458.  
  459. Connection conn2 = (Connection) Proxy.newProxyInstance(conn.getClass().getClassLoader(), new Class[] { Connection.class }, this);
  460.  
  461. return conn2;
  462.  
  463. }
  464.  
  465. public void close() throws SQLException {
  466.  
  467. // 调用真正的close方法,一但调用此方法就直接关闭连接
  468.  
  469. if(conn!=null&&!conn.isClosed())
  470.  
  471. conn.close();
  472.  
  473. }
  474.  
  475. }
  476.  
  477. _Connection类是一个私有类,主要实现一个对Connection动态代理的功能(有点象windows的钩子)
  478.  
  479. 说白了就是实现调用connection.close方法时我们映射到另一个方法上.
  480.  
  481. 呵呵,是不是好简单呢,代码没有多复杂。
  482.  
  483. 这里有一个问题要说明一吓:如果设置的maxActive值小于当前线程总数,那么当并发非常大时会出现资源争夺情况,一吓子cpu就会提高不小,所以建议设为无限制,或大于线程总数的值。
  484.  
  485. 接下来我们测试测试
  486.  
  487. 开五十个线程,对同一个表进行select/insert 10000次操作,每次select/insert一条记录
  488.  
  489. 代码如下:
  490.  
  491. public static void main(String[] args) {
  492.  
  493. JavaGGDataSource ds = new JavaGGDataSource();
  494.  
  495. ds.setDriverClass(“com.mysql.jdbc.Driver);
  496.  
  497. ds.setJdbcUrl(“jdbc:mysql://192.168.1.6:3306/test”);
  498.  
  499. ds.setPassword(“ps”);
  500.  
  501. ds.setUsername(“name”);
  502.  
  503. ds.setTimeout(300);
  504.  
  505. // ds.setMaxActive(60);
  506.  
  507. for (int i = 0; i < 50; i++) {
  508.  
  509. new GG(ds).start();
  510.  
  511. }
  512.  
  513. }
  514.  
  515. class GG extends Thread {
  516.  
  517. JavaGGDataSource ds = null;
  518.  
  519. long l = System.currentTimeMillis();
  520.  
  521. public GG(JavaGGDataSource ds) {
  522.  
  523. this.ds = ds;
  524.  
  525. }
  526.  
  527. static String sql = “insert into testgg(col1,cols) values (?,?)”;
  528.  
  529. static String selectsql = “select * from testgg where id=?”;
  530.  
  531.  
  532. public void run() {
  533.  
  534. for (int t = 0; t < 10000; t++) {
  535.  
  536. Connection conn = null;
  537.  
  538. try {
  539.  
  540. conn = ds.getConnection();
  541.  
  542. PreparedStatement ps = conn.prepareStatement(sql);
  543.  
  544. //以下为insert
  545.  
  546. ps.setInt(1, 133664);
  547.  
  548. ps.setString(2, “ddd”);
  549.  
  550. ps.executeUpdate();
  551.  
  552. //以下为select
  553.  
  554. ResultSet rs=ps.executeQuery();
  555.  
  556. while(rs.next()){
  557.  
  558. rs.getInt(“id”);
  559.  
  560. rs.getInt(“col1″);
  561.  
  562. }
  563.  
  564. rs.close();
  565.  
  566. ps.close();
  567.  
  568. } catch (SQLException e) {
  569.  
  570. // TODO Auto-generated catch block
  571.  
  572. e.printStackTrace();
  573.  
  574. } finally {
  575.  
  576. try {
  577.  
  578. if (conn != null) {
  579.  
  580. //                   ds.offerConnection(conn);
  581.  
  582. conn.close();
  583.  
  584. }
  585.  
  586. } catch (Exception e) {
  587.  
  588. e.printStackTrace();
  589.  
  590. }
  591.  
  592. }
  593.  
  594. }
  595.  
  596. System.out.println(System.currentTimeMillis() – l);
  597.  
  598. }
  599.  

测试结果

50个线程select 10000*50次结果

Javaggds  406à2156毫秒    连接池有50个连接(和线程数一样)

C3p0              1235à1657毫秒   连接池有500个连接(和设置的最大连接数一样 )

50个线程insert 10000*50次结果

Javaggds  39125à52734  连接池有50个连接(和线程数一样)

C3p0           60000à65141毫秒  连接池有500个连接(和设置的最大连接数一样 )

测试分析:

c3p0是使用同锁或同步块对连接池进行同步的,所以它的时间会控制在一定范围之内

但带来的问题是线程竞争和线程等待。

Javaggds是使用了concurrent包中的无等待算法队列,这个同步是在cpu层面上做的,所以同步的块非常小,大家有兴趣可以看看CAS同步算法。

Hibernate结合

编辑hibernate 加入/修改配置为

<property name=“connection.provider_class”>

com.javagg.datasource.DataSourceConnectionProvider<property>

<property name=“db.driverClass”>com.mysql.jdbc.Driver<property>

<property name=“db.jdbcUrl”>jdbc:mysql://192.168.1.6:3306/test<property>

<property name=“db.username”>name<property>

<property name=“db.password”>password<property>

<property name=“db.datasource”>

com.javagg.datasource.JavaGGDataSource<property>

<property name=“db.maxActive”>-1<property>< 无限制连接数 >

<property name=“db.timeout”>3600<property>< 一小时timeout 单位为秒 >

DataSourceConnectonProvider代码如下:

import java.lang.reflect.Method;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.Iterator;

import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.beanutils.BeanUtils;

import org.hibernate.HibernateException;

import org.hibernate.connection.ConnectionProvider;

public class DataSourceConnectionProvider implements ConnectionProvider {

private final static String BASE_KEY = “db.”;

private final static String DATASOURCE_KEY = “db.datasource”;

protected DataSource dataSource;

/*

* (non-Javadoc)

*

* @see

* org.hibernate.connection.ConnectionProvider#configure(java.util.Properties

* )

*/

public void configure(Properties props) throws HibernateException {

initDataSource(props);

}

/*

* (non-Javadoc)

*

* @see org.hibernate.connection.ConnectionProvider#getConnection()

*/

public Connection getConnection() throws SQLException {

return dataSource.getConnection();

}

/*

* (non-Javadoc)

*

* @see

* org.hibernate.connection.ConnectionProvider#closeConnection(java.sql.

* Connection)

*/

public void closeConnection(Connection conn) throws SQLException {

if (conn != null)

conn.close();

}

/*

* (non-Javadoc)

*

* @see org.hibernate.connection.ConnectionProvider#close()

*/

public void close() throws HibernateException {

if (dataSource != null)

try {

Method mClose = dataSource.getClass().getMethod(“close”);

mClose.invoke(dataSource);

} catch (Exception e) {

throw new HibernateException(e);

}

dataSource = null;

}

/*

* (non-Javadoc)

*

* @see

* org.hibernate.connection.ConnectionProvider#supportsAggressiveRelease()

*/

public boolean supportsAggressiveRelease() {

return false;

}

/**

* Initialize the datasource

*

* @param props

* @throws HibernateException

*/

protected void initDataSource(Properties props) throws HibernateException {

String dataSourceClass = null;

Properties new_props = new Properties();

Iterator keys = props.keySet().iterator();

while (keys.hasNext()) {

String key = (String) keys.next();

if (key.equals(DATASOURCE_KEY)) {

dataSourceClass=props.getProperty(key);

} else if (key.startsWith(BASE_KEY)) {

String value = props.getProperty(key);

new_props.setProperty(key.substring(BASE_KEY.length()), value);

}

}

if (dataSourceClass == null)

throw new HibernateException(“Property ‘db.datasource’ no defined.”);

try {

dataSource = (DataSource) Class.forName(dataSourceClass).newInstance();

BeanUtils.populate(dataSource, new_props);

} catch (Exception e) {

throw new HibernateException(e);

}

}

}

接下来我们测试配置有没有成功

代码如下:

public static void main(String args[]) {

Configuration cfg = new Configuration();

cfg.configure();

SessionFactory sf = cfg.buildSessionFactory();

for (int i = 0; i < 100; i++) {

Session sess = sf.openSession();

TestGGBean pc = new TestGGBean();

pc.setCol1(1111);

pc.setCols(“ddaaaa”);

sess.save(pc);

sess.flush();

sess.close();

}

}

纯净水 java , , , ,

  1. 2010年3月27日03:00 | #1

    原来如此啊。

  2. 2010年3月27日14:29 | #2

    有点意思。

  1. 目前还没有任何 trackbacks 和 pingbacks.
  • 粤ICP备09032914号