001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.util.concurrent.CopyOnWriteArrayList;
020
021import javax.jms.Connection;
022import javax.jms.ConnectionConsumer;
023import javax.jms.ConnectionMetaData;
024import javax.jms.Destination;
025import javax.jms.ExceptionListener;
026import javax.jms.JMSException;
027import javax.jms.Queue;
028import javax.jms.QueueConnection;
029import javax.jms.QueueSession;
030import javax.jms.ServerSessionPool;
031import javax.jms.Session;
032import javax.jms.TemporaryQueue;
033import javax.jms.TemporaryTopic;
034import javax.jms.Topic;
035import javax.jms.TopicConnection;
036import javax.jms.TopicSession;
037
038import org.apache.activemq.ActiveMQConnection;
039import org.apache.activemq.ActiveMQSession;
040import org.apache.activemq.AlreadyClosedException;
041import org.apache.activemq.EnhancedConnection;
042import org.apache.activemq.advisory.DestinationSource;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
048 * {@link QueueConnection} which is pooled and on {@link #close()} will return
049 * itself to the sessionPool.
050 *
051 * <b>NOTE</b> this implementation is only intended for use when sending
052 * messages. It does not deal with pooling of consumers; for that look at a
053 * library like <a href="http://jencks.org/">Jencks</a> such as in <a
054 * href="http://jencks.org/Message+Driven+POJOs">this example</a>
055 *
056 */
057public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
058    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
059
060    private ConnectionPool pool;
061    private boolean stopped;
062    private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
063    private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
064
065    public PooledConnection(ConnectionPool pool) {
066        this.pool = pool;
067        this.pool.incrementReferenceCount();
068    }
069
070    /**
071     * Factory method to create a new instance.
072     */
073    public PooledConnection newInstance() {
074        return new PooledConnection(pool);
075    }
076
077    public void close() throws JMSException {
078        this.cleanupConnectionTemporaryDestinations();
079        if (this.pool != null) {
080            this.pool.decrementReferenceCount();
081            this.pool = null;
082        }
083    }
084
085    public void start() throws JMSException {
086        assertNotClosed();
087        pool.start();
088    }
089
090    public void stop() throws JMSException {
091        stopped = true;
092    }
093
094    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
095            throws JMSException {
096        return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
097    }
098
099    public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
100        return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
101    }
102
103    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
104            throws JMSException {
105        return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
106    }
107
108    public String getClientID() throws JMSException {
109        return getConnection().getClientID();
110    }
111
112    public ExceptionListener getExceptionListener() throws JMSException {
113        return getConnection().getExceptionListener();
114    }
115
116    public ConnectionMetaData getMetaData() throws JMSException {
117        return getConnection().getMetaData();
118    }
119
120    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
121        getConnection().setExceptionListener(exceptionListener);
122    }
123
124    public void setClientID(String clientID) throws JMSException {
125
126        // ignore repeated calls to setClientID() with the same client id
127        // this could happen when a JMS component such as Spring that uses a
128        // PooledConnectionFactory shuts down and reinitializes.
129        if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
130            getConnection().setClientID(clientID);
131        }
132    }
133
134    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
135        return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
136    }
137
138    // Session factory methods
139    // -------------------------------------------------------------------------
140    public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
141        return (QueueSession) createSession(transacted, ackMode);
142    }
143
144    public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
145        return (TopicSession) createSession(transacted, ackMode);
146    }
147
148    public Session createSession(boolean transacted, int ackMode) throws JMSException {
149        PooledSession result;
150        result = (PooledSession) pool.createSession(transacted, ackMode);
151
152        // Add a temporary destination event listener to the session that notifies us when
153        // the session creates temporary destinations.
154        result.addTempDestEventListener(new PooledSessionEventListener() {
155
156            @Override
157            public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
158                connTempQueues.add(tempQueue);
159            }
160
161            @Override
162            public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
163                connTempTopics.add(tempTopic);
164            }
165        });
166
167        return (Session) result;
168    }
169
170    // EnhancedCollection API
171    // -------------------------------------------------------------------------
172
173    public DestinationSource getDestinationSource() throws JMSException {
174        return getConnection().getDestinationSource();
175    }
176
177    // Implementation methods
178    // -------------------------------------------------------------------------
179
180    public ActiveMQConnection getConnection() throws JMSException {
181        assertNotClosed();
182        return pool.getConnection();
183    }
184
185    protected void assertNotClosed() throws AlreadyClosedException {
186        if (stopped || pool == null) {
187            throw new AlreadyClosedException();
188        }
189    }
190
191    protected ActiveMQSession createSession(SessionKey key) throws JMSException {
192        return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
193    }
194
195    public String toString() {
196        return "PooledConnection { " + pool + " }";
197    }
198
199    /**
200     * Remove all of the temporary destinations created for this connection.
201     * This is important since the underlying connection may be reused over a
202     * long period of time, accumulating all of the temporary destinations from
203     * each use. However, from the perspective of the lifecycle from the
204     * client's view, close() closes the connection and, therefore, deletes all
205     * of the temporary destinations created.
206     */
207    protected void cleanupConnectionTemporaryDestinations() {
208
209        for (TemporaryQueue tempQueue : connTempQueues) {
210            try {
211                tempQueue.delete();
212            } catch (JMSException ex) {
213                LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
214            }
215        }
216        connTempQueues.clear();
217
218        for (TemporaryTopic tempTopic : connTempTopics) {
219            try {
220                tempTopic.delete();
221            } catch (JMSException ex) {
222                LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
223            }
224        }
225        connTempTopics.clear();
226    }
227}