001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.io.Serializable;
020import java.util.Iterator;
021import java.util.concurrent.CopyOnWriteArrayList;
022
023import javax.jms.BytesMessage;
024import javax.jms.Destination;
025import javax.jms.JMSException;
026import javax.jms.MapMessage;
027import javax.jms.Message;
028import javax.jms.MessageConsumer;
029import javax.jms.MessageListener;
030import javax.jms.MessageProducer;
031import javax.jms.ObjectMessage;
032import javax.jms.Queue;
033import javax.jms.QueueBrowser;
034import javax.jms.QueueReceiver;
035import javax.jms.QueueSender;
036import javax.jms.QueueSession;
037import javax.jms.Session;
038import javax.jms.StreamMessage;
039import javax.jms.TemporaryQueue;
040import javax.jms.TemporaryTopic;
041import javax.jms.TextMessage;
042import javax.jms.Topic;
043import javax.jms.TopicPublisher;
044import javax.jms.TopicSession;
045import javax.jms.TopicSubscriber;
046import javax.jms.XASession;
047import javax.transaction.xa.XAResource;
048
049import org.apache.activemq.ActiveMQMessageProducer;
050import org.apache.activemq.ActiveMQQueueSender;
051import org.apache.activemq.ActiveMQSession;
052import org.apache.activemq.ActiveMQTopicPublisher;
053import org.apache.activemq.AlreadyClosedException;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057public class PooledSession implements Session, TopicSession, QueueSession, XASession {
058    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
059
060    private ActiveMQSession session;
061    private SessionPool sessionPool;
062    private ActiveMQMessageProducer messageProducer;
063    private ActiveMQQueueSender queueSender;
064    private ActiveMQTopicPublisher topicPublisher;
065    private boolean transactional = true;
066    private boolean ignoreClose;
067
068    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
069    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
070    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
071        new CopyOnWriteArrayList<PooledSessionEventListener>();
072    private boolean isXa;
073
074    public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
075        this.session = aSession;
076        this.sessionPool = sessionPool;
077        this.transactional = session.isTransacted();
078    }
079
080    public void addTempDestEventListener(PooledSessionEventListener listener) {
081        this.tempDestEventListeners.add(listener);
082    }
083
084    protected boolean isIgnoreClose() {
085        return ignoreClose;
086    }
087
088    protected void setIgnoreClose(boolean ignoreClose) {
089        this.ignoreClose = ignoreClose;
090    }
091
092    public void close() throws JMSException {
093        if (!ignoreClose) {
094            // TODO a cleaner way to reset??
095
096            boolean invalidate = false;
097            try {
098                // lets reset the session
099                getInternalSession().setMessageListener(null);
100
101                // Close any consumers and browsers that may have been created.
102                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
103                    MessageConsumer consumer = iter.next();
104                    consumer.close();
105                }
106
107                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
108                    QueueBrowser browser = iter.next();
109                    browser.close();
110                }
111
112                if (transactional && !isXa) {
113                    try {
114                        getInternalSession().rollback();
115                    } catch (JMSException e) {
116                        invalidate = true;
117                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
118                    }
119                }
120            } catch (JMSException ex) {
121                invalidate = true;
122                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
123            } finally {
124                consumers.clear();
125                browsers.clear();
126            }
127
128            if (invalidate) {
129                // lets close the session and not put the session back into
130                // the pool
131                if (session != null) {
132                    try {
133                        session.close();
134                    } catch (JMSException e1) {
135                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
136                    }
137                    session = null;
138                }
139                sessionPool.invalidateSession(this);
140            } else {
141                sessionPool.returnSession(this);
142            }
143        }
144    }
145
146    public void commit() throws JMSException {
147        getInternalSession().commit();
148    }
149
150    public BytesMessage createBytesMessage() throws JMSException {
151        return getInternalSession().createBytesMessage();
152    }
153
154    public MapMessage createMapMessage() throws JMSException {
155        return getInternalSession().createMapMessage();
156    }
157
158    public Message createMessage() throws JMSException {
159        return getInternalSession().createMessage();
160    }
161
162    public ObjectMessage createObjectMessage() throws JMSException {
163        return getInternalSession().createObjectMessage();
164    }
165
166    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
167        return getInternalSession().createObjectMessage(serializable);
168    }
169
170    public Queue createQueue(String s) throws JMSException {
171        return getInternalSession().createQueue(s);
172    }
173
174    public StreamMessage createStreamMessage() throws JMSException {
175        return getInternalSession().createStreamMessage();
176    }
177
178    public TemporaryQueue createTemporaryQueue() throws JMSException {
179        TemporaryQueue result;
180
181        result = getInternalSession().createTemporaryQueue();
182
183        // Notify all of the listeners of the created temporary Queue.
184        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
185            listener.onTemporaryQueueCreate(result);
186        }
187
188        return result;
189    }
190
191    public TemporaryTopic createTemporaryTopic() throws JMSException {
192        TemporaryTopic result;
193
194        result = getInternalSession().createTemporaryTopic();
195
196        // Notify all of the listeners of the created temporary Topic.
197        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
198            listener.onTemporaryTopicCreate(result);
199        }
200
201        return result;
202    }
203
204    public void unsubscribe(String s) throws JMSException {
205        getInternalSession().unsubscribe(s);
206    }
207
208    public TextMessage createTextMessage() throws JMSException {
209        return getInternalSession().createTextMessage();
210    }
211
212    public TextMessage createTextMessage(String s) throws JMSException {
213        return getInternalSession().createTextMessage(s);
214    }
215
216    public Topic createTopic(String s) throws JMSException {
217        return getInternalSession().createTopic(s);
218    }
219
220    public int getAcknowledgeMode() throws JMSException {
221        return getInternalSession().getAcknowledgeMode();
222    }
223
224    public boolean getTransacted() throws JMSException {
225        return getInternalSession().getTransacted();
226    }
227
228    public void recover() throws JMSException {
229        getInternalSession().recover();
230    }
231
232    public void rollback() throws JMSException {
233        getInternalSession().rollback();
234    }
235
236    public XAResource getXAResource() {
237        if (session == null) {
238            throw new IllegalStateException("Session is closed");
239        }
240        return session.getTransactionContext();
241    }
242
243    public Session getSession() {
244        return this;
245    }
246
247    public void run() {
248        if (session != null) {
249            session.run();
250        }
251    }
252
253    // Consumer related methods
254    // -------------------------------------------------------------------------
255    public QueueBrowser createBrowser(Queue queue) throws JMSException {
256        return addQueueBrowser(getInternalSession().createBrowser(queue));
257    }
258
259    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
260        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
261    }
262
263    public MessageConsumer createConsumer(Destination destination) throws JMSException {
264        return addConsumer(getInternalSession().createConsumer(destination));
265    }
266
267    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
268        return addConsumer(getInternalSession().createConsumer(destination, selector));
269    }
270
271    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
272        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
273    }
274
275    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
276        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
277    }
278
279    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
280        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
281    }
282
283    public MessageListener getMessageListener() throws JMSException {
284        return getInternalSession().getMessageListener();
285    }
286
287    public void setMessageListener(MessageListener messageListener) throws JMSException {
288        getInternalSession().setMessageListener(messageListener);
289    }
290
291    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
292        return addTopicSubscriber(getInternalSession().createSubscriber(topic));
293    }
294
295    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
296        return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
297    }
298
299    public QueueReceiver createReceiver(Queue queue) throws JMSException {
300        return addQueueReceiver(getInternalSession().createReceiver(queue));
301    }
302
303    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
304        return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
305    }
306
307    // Producer related methods
308    // -------------------------------------------------------------------------
309    public MessageProducer createProducer(Destination destination) throws JMSException {
310        return new PooledProducer(getMessageProducer(), destination);
311    }
312
313    public QueueSender createSender(Queue queue) throws JMSException {
314        return new PooledQueueSender(getQueueSender(), queue);
315    }
316
317    public TopicPublisher createPublisher(Topic topic) throws JMSException {
318        return new PooledTopicPublisher(getTopicPublisher(), topic);
319    }
320
321    /**
322     * Callback invoked when the consumer is closed.
323     * <p/>
324     * This is used to keep track of an explicit closed consumer created by this
325     * session, by which we know do not need to keep track of the consumer, as
326     * its already closed.
327     *
328     * @param consumer
329     *            the consumer which is being closed
330     */
331    protected void onConsumerClose(MessageConsumer consumer) {
332        consumers.remove(consumer);
333    }
334
335    public ActiveMQSession getInternalSession() throws AlreadyClosedException {
336        if (session == null) {
337            throw new AlreadyClosedException("The session has already been closed");
338        }
339        return session;
340    }
341
342    public ActiveMQMessageProducer getMessageProducer() throws JMSException {
343        if (messageProducer == null) {
344            messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
345        }
346        return messageProducer;
347    }
348
349    public ActiveMQQueueSender getQueueSender() throws JMSException {
350        if (queueSender == null) {
351            queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
352        }
353        return queueSender;
354    }
355
356    public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
357        if (topicPublisher == null) {
358            topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
359        }
360        return topicPublisher;
361    }
362
363    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
364        browsers.add(browser);
365        return browser;
366    }
367
368    private MessageConsumer addConsumer(MessageConsumer consumer) {
369        consumers.add(consumer);
370        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
371        // method is invoked
372        // when the returned consumer is closed, to avoid memory leak in this
373        // session class
374        // in case many consumers is created
375        return new PooledMessageConsumer(this, consumer);
376    }
377
378    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
379        consumers.add(subscriber);
380        return subscriber;
381    }
382
383    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
384        consumers.add(receiver);
385        return receiver;
386    }
387
388    public void setIsXa(boolean isXa) {
389        this.isXa = isXa;
390    }
391
392    public String toString() {
393        return "PooledSession { " + session + " }";
394    }
395}