001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.pool; 018 019import java.util.concurrent.CopyOnWriteArrayList; 020 021import javax.jms.Connection; 022import javax.jms.ConnectionConsumer; 023import javax.jms.ConnectionMetaData; 024import javax.jms.Destination; 025import javax.jms.ExceptionListener; 026import javax.jms.JMSException; 027import javax.jms.Queue; 028import javax.jms.QueueConnection; 029import javax.jms.QueueSession; 030import javax.jms.ServerSessionPool; 031import javax.jms.Session; 032import javax.jms.TemporaryQueue; 033import javax.jms.TemporaryTopic; 034import javax.jms.Topic; 035import javax.jms.TopicConnection; 036import javax.jms.TopicSession; 037 038import org.apache.activemq.ActiveMQConnection; 039import org.apache.activemq.ActiveMQSession; 040import org.apache.activemq.AlreadyClosedException; 041import org.apache.activemq.EnhancedConnection; 042import org.apache.activemq.advisory.DestinationSource; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and 048 * {@link QueueConnection} which is pooled and on {@link #close()} will return 049 * itself to the sessionPool. 050 * 051 * <b>NOTE</b> this implementation is only intended for use when sending 052 * messages. It does not deal with pooling of consumers; for that look at a 053 * library like <a href="http://jencks.org/">Jencks</a> such as in <a 054 * href="http://jencks.org/Message+Driven+POJOs">this example</a> 055 * 056 */ 057public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection { 058 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class); 059 060 private ConnectionPool pool; 061 private boolean stopped; 062 private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>(); 063 private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>(); 064 065 public PooledConnection(ConnectionPool pool) { 066 this.pool = pool; 067 this.pool.incrementReferenceCount(); 068 } 069 070 /** 071 * Factory method to create a new instance. 072 */ 073 public PooledConnection newInstance() { 074 return new PooledConnection(pool); 075 } 076 077 public void close() throws JMSException { 078 this.cleanupConnectionTemporaryDestinations(); 079 if (this.pool != null) { 080 this.pool.decrementReferenceCount(); 081 this.pool = null; 082 } 083 } 084 085 public void start() throws JMSException { 086 assertNotClosed(); 087 pool.start(); 088 } 089 090 public void stop() throws JMSException { 091 stopped = true; 092 } 093 094 public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) 095 throws JMSException { 096 return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages); 097 } 098 099 public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 100 return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages); 101 } 102 103 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) 104 throws JMSException { 105 return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i); 106 } 107 108 public String getClientID() throws JMSException { 109 return getConnection().getClientID(); 110 } 111 112 public ExceptionListener getExceptionListener() throws JMSException { 113 return getConnection().getExceptionListener(); 114 } 115 116 public ConnectionMetaData getMetaData() throws JMSException { 117 return getConnection().getMetaData(); 118 } 119 120 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { 121 getConnection().setExceptionListener(exceptionListener); 122 } 123 124 public void setClientID(String clientID) throws JMSException { 125 126 // ignore repeated calls to setClientID() with the same client id 127 // this could happen when a JMS component such as Spring that uses a 128 // PooledConnectionFactory shuts down and reinitializes. 129 if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) { 130 getConnection().setClientID(clientID); 131 } 132 } 133 134 public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 135 return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages); 136 } 137 138 // Session factory methods 139 // ------------------------------------------------------------------------- 140 public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException { 141 return (QueueSession) createSession(transacted, ackMode); 142 } 143 144 public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException { 145 return (TopicSession) createSession(transacted, ackMode); 146 } 147 148 public Session createSession(boolean transacted, int ackMode) throws JMSException { 149 PooledSession result; 150 result = (PooledSession) pool.createSession(transacted, ackMode); 151 152 // Add a temporary destination event listener to the session that notifies us when 153 // the session creates temporary destinations. 154 result.addTempDestEventListener(new PooledSessionEventListener() { 155 156 @Override 157 public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { 158 connTempQueues.add(tempQueue); 159 } 160 161 @Override 162 public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { 163 connTempTopics.add(tempTopic); 164 } 165 }); 166 167 return (Session) result; 168 } 169 170 // EnhancedCollection API 171 // ------------------------------------------------------------------------- 172 173 public DestinationSource getDestinationSource() throws JMSException { 174 return getConnection().getDestinationSource(); 175 } 176 177 // Implementation methods 178 // ------------------------------------------------------------------------- 179 180 public ActiveMQConnection getConnection() throws JMSException { 181 assertNotClosed(); 182 return pool.getConnection(); 183 } 184 185 protected void assertNotClosed() throws AlreadyClosedException { 186 if (stopped || pool == null) { 187 throw new AlreadyClosedException(); 188 } 189 } 190 191 protected ActiveMQSession createSession(SessionKey key) throws JMSException { 192 return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode()); 193 } 194 195 public String toString() { 196 return "PooledConnection { " + pool + " }"; 197 } 198 199 /** 200 * Remove all of the temporary destinations created for this connection. 201 * This is important since the underlying connection may be reused over a 202 * long period of time, accumulating all of the temporary destinations from 203 * each use. However, from the perspective of the lifecycle from the 204 * client's view, close() closes the connection and, therefore, deletes all 205 * of the temporary destinations created. 206 */ 207 protected void cleanupConnectionTemporaryDestinations() { 208 209 for (TemporaryQueue tempQueue : connTempQueues) { 210 try { 211 tempQueue.delete(); 212 } catch (JMSException ex) { 213 LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage()); 214 } 215 } 216 connTempQueues.clear(); 217 218 for (TemporaryTopic tempTopic : connTempTopics) { 219 try { 220 tempTopic.delete(); 221 } catch (JMSException ex) { 222 LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage()); 223 } 224 } 225 connTempTopics.clear(); 226 } 227}