001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.pool; 018 019import java.util.HashMap; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.Map; 023import java.util.concurrent.atomic.AtomicBoolean; 024import javax.jms.Connection; 025import javax.jms.ConnectionFactory; 026import javax.jms.JMSException; 027import org.apache.activemq.ActiveMQConnection; 028import org.apache.activemq.ActiveMQConnectionFactory; 029import org.apache.activemq.Service; 030import org.apache.activemq.util.IOExceptionSupport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.commons.pool.ObjectPoolFactory; 034import org.apache.commons.pool.impl.GenericObjectPool; 035import org.apache.commons.pool.impl.GenericObjectPoolFactory; 036 037/** 038 * A JMS provider which pools Connection, Session and MessageProducer instances 039 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a 040 * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>. 041 * Connections, sessions and producers are returned to a pool after use so that they can be reused later 042 * without having to undergo the cost of creating them again. 043 * 044 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers, 045 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which 046 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually 047 * just created at startup and left active, handling incoming messages as they come. When a consumer is 048 * complete, it is best to close it rather than return it to a pool for later reuse: this is because, 049 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer, 050 * where they'll get held until the consumer is active again. 051 * 052 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you 053 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that 054 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: 055 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html 056 * 057 * @org.apache.xbean.XBean element="pooledConnectionFactory" 058 * 059 * 060 */ 061public class PooledConnectionFactory implements ConnectionFactory, Service { 062 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); 063 private ConnectionFactory connectionFactory; 064 private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>(); 065 private ObjectPoolFactory poolFactory; 066 private int maximumActive = 500; 067 private int maxConnections = 1; 068 private int idleTimeout = 30 * 1000; 069 private boolean blockIfSessionPoolIsFull = true; 070 private AtomicBoolean stopped = new AtomicBoolean(false); 071 private long expiryTimeout = 0l; 072 073 public PooledConnectionFactory() { 074 this(new ActiveMQConnectionFactory()); 075 } 076 077 public PooledConnectionFactory(String brokerURL) { 078 this(new ActiveMQConnectionFactory(brokerURL)); 079 } 080 081 public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { 082 this.connectionFactory = connectionFactory; 083 } 084 085 public ConnectionFactory getConnectionFactory() { 086 return connectionFactory; 087 } 088 089 public void setConnectionFactory(ConnectionFactory connectionFactory) { 090 this.connectionFactory = connectionFactory; 091 } 092 093 public Connection createConnection() throws JMSException { 094 return createConnection(null, null); 095 } 096 097 public synchronized Connection createConnection(String userName, String password) throws JMSException { 098 if (stopped.get()) { 099 LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); 100 return null; 101 } 102 103 ConnectionKey key = new ConnectionKey(userName, password); 104 LinkedList<ConnectionPool> pools = cache.get(key); 105 106 if (pools == null) { 107 pools = new LinkedList<ConnectionPool>(); 108 cache.put(key, pools); 109 } 110 111 ConnectionPool connection = null; 112 if (pools.size() == maxConnections) { 113 connection = pools.removeFirst(); 114 } 115 116 // Now.. we might get a connection, but it might be that we need to 117 // dump it.. 118 if (connection != null && connection.expiredCheck()) { 119 connection = null; 120 } 121 122 if (connection == null) { 123 ActiveMQConnection delegate = createConnection(key); 124 connection = createConnectionPool(delegate); 125 } 126 pools.add(connection); 127 return new PooledConnection(connection); 128 } 129 130 protected ConnectionPool createConnectionPool(ActiveMQConnection connection) { 131 ConnectionPool result = new ConnectionPool(connection, getPoolFactory()); 132 result.setIdleTimeout(getIdleTimeout()); 133 result.setExpiryTimeout(getExpiryTimeout()); 134 return result; 135 } 136 137 protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException { 138 if (key.getUserName() == null && key.getPassword() == null) { 139 return (ActiveMQConnection)connectionFactory.createConnection(); 140 } else { 141 return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword()); 142 } 143 } 144 145 /** 146 * @see org.apache.activemq.service.Service#start() 147 */ 148 public void start() { 149 try { 150 stopped.set(false); 151 createConnection(); 152 } catch (JMSException e) { 153 LOG.warn("Create pooled connection during start failed.", e); 154 IOExceptionSupport.create(e); 155 } 156 } 157 158 public void stop() { 159 LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size()); 160 stopped.set(true); 161 for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) { 162 for (ConnectionPool connection : iter.next()) { 163 try { 164 connection.close(); 165 }catch(Exception e) { 166 LOG.warn("Close connection failed",e); 167 } 168 } 169 } 170 cache.clear(); 171 } 172 173 public ObjectPoolFactory getPoolFactory() { 174 if (poolFactory == null) { 175 poolFactory = createPoolFactory(); 176 } 177 return poolFactory; 178 } 179 180 /** 181 * Sets the object pool factory used to create individual session pools for 182 * each connection 183 */ 184 public void setPoolFactory(ObjectPoolFactory poolFactory) { 185 this.poolFactory = poolFactory; 186 } 187 188 public int getMaximumActive() { 189 return maximumActive; 190 } 191 192 /** 193 * Sets the maximum number of active sessions per connection 194 */ 195 public void setMaximumActive(int maximumActive) { 196 this.maximumActive = maximumActive; 197 } 198 199 /** 200 * Controls the behavior of the internal session pool. By default the call to 201 * Connection.getSession() will block if the session pool is full. If the 202 * argument false is given, it will change the default behavior and instead the 203 * call to getSession() will throw a JMSException. 204 * 205 * The size of the session pool is controlled by the @see #maximumActive 206 * property. 207 * 208 * @param block - if true, the call to getSession() blocks if the pool is full 209 * until a session object is available. defaults to true. 210 */ 211 public void setBlockIfSessionPoolIsFull(boolean block) { 212 this.blockIfSessionPoolIsFull = block; 213 } 214 215 /** 216 * @return the maxConnections 217 */ 218 public int getMaxConnections() { 219 return maxConnections; 220 } 221 222 /** 223 * @param maxConnections the maxConnections to set 224 */ 225 public void setMaxConnections(int maxConnections) { 226 this.maxConnections = maxConnections; 227 } 228 229 /** 230 * Creates an ObjectPoolFactory. Its behavior is controlled by the two 231 * properties @see #maximumActive and @see #blockIfSessionPoolIsFull. 232 * 233 * @return the newly created but empty ObjectPoolFactory 234 */ 235 protected ObjectPoolFactory createPoolFactory() { 236 if (blockIfSessionPoolIsFull) { 237 return new GenericObjectPoolFactory(null, maximumActive); 238 } else { 239 return new GenericObjectPoolFactory(null, 240 maximumActive, 241 GenericObjectPool.WHEN_EXHAUSTED_FAIL, 242 GenericObjectPool.DEFAULT_MAX_WAIT); 243 } 244 } 245 246 public int getIdleTimeout() { 247 return idleTimeout; 248 } 249 250 public void setIdleTimeout(int idleTimeout) { 251 this.idleTimeout = idleTimeout; 252 } 253 254 /** 255 * allow connections to expire, irrespective of load or idle time. This is useful with failover 256 * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery 257 * 258 * @param expiryTimeout non zero in milliseconds 259 */ 260 public void setExpiryTimeout(long expiryTimeout) { 261 this.expiryTimeout = expiryTimeout; 262 } 263 264 public long getExpiryTimeout() { 265 return expiryTimeout; 266 } 267}