001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.concurrent.*;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.Connection;
032import javax.jms.ConnectionConsumer;
033import javax.jms.ConnectionMetaData;
034import javax.jms.DeliveryMode;
035import javax.jms.Destination;
036import javax.jms.ExceptionListener;
037import javax.jms.IllegalStateException;
038import javax.jms.InvalidDestinationException;
039import javax.jms.JMSException;
040import javax.jms.Queue;
041import javax.jms.QueueConnection;
042import javax.jms.QueueSession;
043import javax.jms.ServerSessionPool;
044import javax.jms.Session;
045import javax.jms.Topic;
046import javax.jms.TopicConnection;
047import javax.jms.TopicSession;
048import javax.jms.XAConnection;
049
050import org.apache.activemq.advisory.DestinationSource;
051import org.apache.activemq.blob.BlobTransferPolicy;
052import org.apache.activemq.command.ActiveMQDestination;
053import org.apache.activemq.command.ActiveMQMessage;
054import org.apache.activemq.command.ActiveMQTempDestination;
055import org.apache.activemq.command.ActiveMQTempQueue;
056import org.apache.activemq.command.ActiveMQTempTopic;
057import org.apache.activemq.command.BrokerInfo;
058import org.apache.activemq.command.Command;
059import org.apache.activemq.command.CommandTypes;
060import org.apache.activemq.command.ConnectionControl;
061import org.apache.activemq.command.ConnectionError;
062import org.apache.activemq.command.ConnectionId;
063import org.apache.activemq.command.ConnectionInfo;
064import org.apache.activemq.command.ConsumerControl;
065import org.apache.activemq.command.ConsumerId;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.ControlCommand;
068import org.apache.activemq.command.DestinationInfo;
069import org.apache.activemq.command.ExceptionResponse;
070import org.apache.activemq.command.Message;
071import org.apache.activemq.command.MessageDispatch;
072import org.apache.activemq.command.MessageId;
073import org.apache.activemq.command.ProducerAck;
074import org.apache.activemq.command.ProducerId;
075import org.apache.activemq.command.RemoveInfo;
076import org.apache.activemq.command.RemoveSubscriptionInfo;
077import org.apache.activemq.command.Response;
078import org.apache.activemq.command.SessionId;
079import org.apache.activemq.command.ShutdownInfo;
080import org.apache.activemq.command.WireFormatInfo;
081import org.apache.activemq.management.JMSConnectionStatsImpl;
082import org.apache.activemq.management.JMSStatsImpl;
083import org.apache.activemq.management.StatsCapable;
084import org.apache.activemq.management.StatsImpl;
085import org.apache.activemq.state.CommandVisitorAdapter;
086import org.apache.activemq.thread.Scheduler;
087import org.apache.activemq.thread.TaskRunnerFactory;
088import org.apache.activemq.transport.FutureResponse;
089import org.apache.activemq.transport.ResponseCallback;
090import org.apache.activemq.transport.Transport;
091import org.apache.activemq.transport.TransportListener;
092import org.apache.activemq.transport.failover.FailoverTransport;
093import org.apache.activemq.util.IdGenerator;
094import org.apache.activemq.util.IntrospectionSupport;
095import org.apache.activemq.util.JMSExceptionSupport;
096import org.apache.activemq.util.LongSequenceGenerator;
097import org.apache.activemq.util.ServiceSupport;
098import org.slf4j.Logger;
099import org.slf4j.LoggerFactory;
100
101public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
102
103    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
104    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
105    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
106
107    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
108
109    public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
110
111    protected boolean dispatchAsync=true;
112    protected boolean alwaysSessionAsync = true;
113
114    private TaskRunnerFactory sessionTaskRunner;
115    private final ThreadPoolExecutor executor;
116
117    // Connection state variables
118    private final ConnectionInfo info;
119    private ExceptionListener exceptionListener;
120    private ClientInternalExceptionListener clientInternalExceptionListener;
121    private boolean clientIDSet;
122    private boolean isConnectionInfoSentToBroker;
123    private boolean userSpecifiedClientID;
124
125    // Configuration options variables
126    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
127    private BlobTransferPolicy blobTransferPolicy;
128    private RedeliveryPolicy redeliveryPolicy;
129    private MessageTransformer transformer;
130
131    private boolean disableTimeStampsByDefault;
132    private boolean optimizedMessageDispatch = true;
133    private boolean copyMessageOnSend = true;
134    private boolean useCompression;
135    private boolean objectMessageSerializationDefered;
136    private boolean useAsyncSend;
137    private boolean optimizeAcknowledge;
138    private long optimizeAcknowledgeTimeOut = 0;
139    private boolean nestedMapAndListEnabled = true;
140    private boolean useRetroactiveConsumer;
141    private boolean exclusiveConsumer;
142    private boolean alwaysSyncSend;
143    private int closeTimeout = 15000;
144    private boolean watchTopicAdvisories = true;
145    private long warnAboutUnstartedConnectionTimeout = 500L;
146    private int sendTimeout =0;
147    private boolean sendAcksAsync=true;
148    private boolean checkForDuplicates = true;
149
150    private final Transport transport;
151    private final IdGenerator clientIdGenerator;
152    private final JMSStatsImpl factoryStats;
153    private final JMSConnectionStatsImpl stats;
154
155    private final AtomicBoolean started = new AtomicBoolean(false);
156    private final AtomicBoolean closing = new AtomicBoolean(false);
157    private final AtomicBoolean closed = new AtomicBoolean(false);
158    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
159    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
160    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
161    private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
162    private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
163    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
164
165    // Maps ConsumerIds to ActiveMQConsumer objects
166    private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
167    private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
168    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
169    private final SessionId connectionSessionId;
170    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
171    private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
172    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
173    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
174
175    private AdvisoryConsumer advisoryConsumer;
176    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
177    private BrokerInfo brokerInfo;
178    private IOException firstFailureError;
179    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
180
181    // Assume that protocol is the latest. Change to the actual protocol
182    // version when a WireFormatInfo is received.
183    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
184    private final long timeCreated;
185    private final ConnectionAudit connectionAudit = new ConnectionAudit();
186    private DestinationSource destinationSource;
187    private final Object ensureConnectionInfoSentMutex = new Object();
188    private boolean useDedicatedTaskRunner;
189    protected volatile CountDownLatch transportInterruptionProcessingComplete;
190    private long consumerFailoverRedeliveryWaitPeriod;
191    private Scheduler scheduler;
192    private boolean messagePrioritySupported = true;
193    private boolean transactedIndividualAck = false;
194    private boolean nonBlockingRedelivery = false;
195
196    /**
197     * Construct an <code>ActiveMQConnection</code>
198     *
199     * @param transport
200     * @param factoryStats
201     * @throws Exception
202     */
203    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204
205        this.transport = transport;
206        this.clientIdGenerator = clientIdGenerator;
207        this.factoryStats = factoryStats;
208
209        // Configure a single threaded executor who's core thread can timeout if
210        // idle
211        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212            public Thread newThread(Runnable r) {
213                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
215                //thread.setDaemon(true);
216                return thread;
217            }
218        });
219        // asyncConnectionThread.allowCoreThreadTimeOut(true);
220        String uniqueId = connectionIdGenerator.generateId();
221        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
222        this.info.setManageable(true);
223        this.info.setFaultTolerant(transport.isFaultTolerant());
224        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
225
226        this.transport.setTransportListener(this);
227
228        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
229        this.factoryStats.addConnection(this);
230        this.timeCreated = System.currentTimeMillis();
231        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
232    }
233
234    protected void setUserName(String userName) {
235        this.info.setUserName(userName);
236    }
237
238    protected void setPassword(String password) {
239        this.info.setPassword(password);
240    }
241
242    /**
243     * A static helper method to create a new connection
244     *
245     * @return an ActiveMQConnection
246     * @throws JMSException
247     */
248    public static ActiveMQConnection makeConnection() throws JMSException {
249        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
250        return (ActiveMQConnection)factory.createConnection();
251    }
252
253    /**
254     * A static helper method to create a new connection
255     *
256     * @param uri
257     * @return and ActiveMQConnection
258     * @throws JMSException
259     */
260    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
261        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
262        return (ActiveMQConnection)factory.createConnection();
263    }
264
265    /**
266     * A static helper method to create a new connection
267     *
268     * @param user
269     * @param password
270     * @param uri
271     * @return an ActiveMQConnection
272     * @throws JMSException
273     */
274    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
275        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
276        return (ActiveMQConnection)factory.createConnection();
277    }
278
279    /**
280     * @return a number unique for this connection
281     */
282    public JMSConnectionStatsImpl getConnectionStats() {
283        return stats;
284    }
285
286    /**
287     * Creates a <CODE>Session</CODE> object.
288     *
289     * @param transacted indicates whether the session is transacted
290     * @param acknowledgeMode indicates whether the consumer or the client will
291     *                acknowledge any messages it receives; ignored if the
292     *                session is transacted. Legal values are
293     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
294     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
295     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
296     * @return a newly created session
297     * @throws JMSException if the <CODE>Connection</CODE> object fails to
298     *                 create a session due to some internal error or lack of
299     *                 support for the specific transaction and acknowledgement
300     *                 mode.
301     * @see Session#AUTO_ACKNOWLEDGE
302     * @see Session#CLIENT_ACKNOWLEDGE
303     * @see Session#DUPS_OK_ACKNOWLEDGE
304     * @since 1.1
305     */
306    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
307        checkClosedOrFailed();
308        ensureConnectionInfoSent();
309        if(!transacted) {
310            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
311                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
312            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
313                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
314                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
315            }
316        }
317        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
318            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
319    }
320
321    /**
322     * @return sessionId
323     */
324    protected SessionId getNextSessionId() {
325        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
326    }
327
328    /**
329     * Gets the client identifier for this connection.
330     * <P>
331     * This value is specific to the JMS provider. It is either preconfigured by
332     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
333     * dynamically by the application by calling the <code>setClientID</code>
334     * method.
335     *
336     * @return the unique client identifier
337     * @throws JMSException if the JMS provider fails to return the client ID
338     *                 for this connection due to some internal error.
339     */
340    public String getClientID() throws JMSException {
341        checkClosedOrFailed();
342        return this.info.getClientId();
343    }
344
345    /**
346     * Sets the client identifier for this connection.
347     * <P>
348     * The preferred way to assign a JMS client's client identifier is for it to
349     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
350     * object and transparently assigned to the <CODE>Connection</CODE> object
351     * it creates.
352     * <P>
353     * Alternatively, a client can set a connection's client identifier using a
354     * provider-specific value. The facility to set a connection's client
355     * identifier explicitly is not a mechanism for overriding the identifier
356     * that has been administratively configured. It is provided for the case
357     * where no administratively specified identifier exists. If one does exist,
358     * an attempt to change it by setting it must throw an
359     * <CODE>IllegalStateException</CODE>. If a client sets the client
360     * identifier explicitly, it must do so immediately after it creates the
361     * connection and before any other action on the connection is taken. After
362     * this point, setting the client identifier is a programming error that
363     * should throw an <CODE>IllegalStateException</CODE>.
364     * <P>
365     * The purpose of the client identifier is to associate a connection and its
366     * objects with a state maintained on behalf of the client by a provider.
367     * The only such state identified by the JMS API is that required to support
368     * durable subscriptions.
369     * <P>
370     * If another connection with the same <code>clientID</code> is already
371     * running when this method is called, the JMS provider should detect the
372     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
373     *
374     * @param newClientID the unique client identifier
375     * @throws JMSException if the JMS provider fails to set the client ID for
376     *                 this connection due to some internal error.
377     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
378     *                 invalid or duplicate client ID.
379     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
380     *                 a connection's client ID at the wrong time or when it has
381     *                 been administratively configured.
382     */
383    public void setClientID(String newClientID) throws JMSException {
384        checkClosedOrFailed();
385
386        if (this.clientIDSet) {
387            throw new IllegalStateException("The clientID has already been set");
388        }
389
390        if (this.isConnectionInfoSentToBroker) {
391            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
392        }
393
394        this.info.setClientId(newClientID);
395        this.userSpecifiedClientID = true;
396        ensureConnectionInfoSent();
397    }
398
399    /**
400     * Sets the default client id that the connection will use if explicitly not
401     * set with the setClientId() call.
402     */
403    public void setDefaultClientID(String clientID) throws JMSException {
404        this.info.setClientId(clientID);
405        this.userSpecifiedClientID = true;
406    }
407
408    /**
409     * Gets the metadata for this connection.
410     *
411     * @return the connection metadata
412     * @throws JMSException if the JMS provider fails to get the connection
413     *                 metadata for this connection.
414     * @see javax.jms.ConnectionMetaData
415     */
416    public ConnectionMetaData getMetaData() throws JMSException {
417        checkClosedOrFailed();
418        return ActiveMQConnectionMetaData.INSTANCE;
419    }
420
421    /**
422     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
423     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
424     * associated with it.
425     *
426     * @return the <CODE>ExceptionListener</CODE> for this connection, or
427     *         null, if no <CODE>ExceptionListener</CODE> is associated with
428     *         this connection.
429     * @throws JMSException if the JMS provider fails to get the
430     *                 <CODE>ExceptionListener</CODE> for this connection.
431     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
432     */
433    public ExceptionListener getExceptionListener() throws JMSException {
434        checkClosedOrFailed();
435        return this.exceptionListener;
436    }
437
438    /**
439     * Sets an exception listener for this connection.
440     * <P>
441     * If a JMS provider detects a serious problem with a connection, it informs
442     * the connection's <CODE> ExceptionListener</CODE>, if one has been
443     * registered. It does this by calling the listener's <CODE>onException
444     * </CODE>
445     * method, passing it a <CODE>JMSException</CODE> object describing the
446     * problem.
447     * <P>
448     * An exception listener allows a client to be notified of a problem
449     * asynchronously. Some connections only consume messages, so they would
450     * have no other way to learn their connection has failed.
451     * <P>
452     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
453     * <P>
454     * A JMS provider should attempt to resolve connection problems itself
455     * before it notifies the client of them.
456     *
457     * @param listener the exception listener
458     * @throws JMSException if the JMS provider fails to set the exception
459     *                 listener for this connection.
460     */
461    public void setExceptionListener(ExceptionListener listener) throws JMSException {
462        checkClosedOrFailed();
463        this.exceptionListener = listener;
464    }
465
466    /**
467     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
468     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
469     * associated with it.
470     *
471     * @return the listener or <code>null</code> if no listener is registered with the connection.
472     */
473    public ClientInternalExceptionListener getClientInternalExceptionListener()
474    {
475        return clientInternalExceptionListener;
476    }
477
478    /**
479     * Sets a client internal exception listener for this connection.
480     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
481     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
482     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
483     * describing the problem.
484     *
485     * @param listener the exception listener
486     */
487    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
488    {
489        this.clientInternalExceptionListener = listener;
490    }
491
492    /**
493     * Starts (or restarts) a connection's delivery of incoming messages. A call
494     * to <CODE>start</CODE> on a connection that has already been started is
495     * ignored.
496     *
497     * @throws JMSException if the JMS provider fails to start message delivery
498     *                 due to some internal error.
499     * @see javax.jms.Connection#stop()
500     */
501    public void start() throws JMSException {
502        checkClosedOrFailed();
503        ensureConnectionInfoSent();
504        if (started.compareAndSet(false, true)) {
505            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
506                ActiveMQSession session = i.next();
507                session.start();
508            }
509        }
510    }
511
512    /**
513     * Temporarily stops a connection's delivery of incoming messages. Delivery
514     * can be restarted using the connection's <CODE>start</CODE> method. When
515     * the connection is stopped, delivery to all the connection's message
516     * consumers is inhibited: synchronous receives block, and messages are not
517     * delivered to message listeners.
518     * <P>
519     * This call blocks until receives and/or message listeners in progress have
520     * completed.
521     * <P>
522     * Stopping a connection has no effect on its ability to send messages. A
523     * call to <CODE>stop</CODE> on a connection that has already been stopped
524     * is ignored.
525     * <P>
526     * A call to <CODE>stop</CODE> must not return until delivery of messages
527     * has paused. This means that a client can rely on the fact that none of
528     * its message listeners will be called and that all threads of control
529     * waiting for <CODE>receive</CODE> calls to return will not return with a
530     * message until the connection is restarted. The receive timers for a
531     * stopped connection continue to advance, so receives may time out while
532     * the connection is stopped.
533     * <P>
534     * If message listeners are running when <CODE>stop</CODE> is invoked, the
535     * <CODE>stop</CODE> call must wait until all of them have returned before
536     * it may return. While these message listeners are completing, they must
537     * have the full services of the connection available to them.
538     *
539     * @throws JMSException if the JMS provider fails to stop message delivery
540     *                 due to some internal error.
541     * @see javax.jms.Connection#start()
542     */
543    public void stop() throws JMSException {
544        checkClosedOrFailed();
545        if (started.compareAndSet(true, false)) {
546            synchronized(sessions) {
547                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
548                    ActiveMQSession s = i.next();
549                    s.stop();
550                }
551            }
552        }
553    }
554
555    /**
556     * Closes the connection.
557     * <P>
558     * Since a provider typically allocates significant resources outside the
559     * JVM on behalf of a connection, clients should close these resources when
560     * they are not needed. Relying on garbage collection to eventually reclaim
561     * these resources may not be timely enough.
562     * <P>
563     * There is no need to close the sessions, producers, and consumers of a
564     * closed connection.
565     * <P>
566     * Closing a connection causes all temporary destinations to be deleted.
567     * <P>
568     * When this method is invoked, it should not return until message
569     * processing has been shut down in an orderly fashion. This means that all
570     * message listeners that may have been running have returned, and that all
571     * pending receives have returned. A close terminates all pending message
572     * receives on the connection's sessions' consumers. The receives may return
573     * with a message or with null, depending on whether there was a message
574     * available at the time of the close. If one or more of the connection's
575     * sessions' message listeners is processing a message at the time when
576     * connection <CODE>close</CODE> is invoked, all the facilities of the
577     * connection and its sessions must remain available to those listeners
578     * until they return control to the JMS provider.
579     * <P>
580     * Closing a connection causes any of its sessions' transactions in progress
581     * to be rolled back. In the case where a session's work is coordinated by
582     * an external transaction manager, a session's <CODE>commit</CODE> and
583     * <CODE> rollback</CODE> methods are not used and the result of a closed
584     * session's work is determined later by the transaction manager. Closing a
585     * connection does NOT force an acknowledgment of client-acknowledged
586     * sessions.
587     * <P>
588     * Invoking the <CODE>acknowledge</CODE> method of a received message from
589     * a closed connection's session must throw an
590     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
591     * NOT throw an exception.
592     *
593     * @throws JMSException if the JMS provider fails to close the connection
594     *                 due to some internal error. For example, a failure to
595     *                 release resources or to close a socket connection can
596     *                 cause this exception to be thrown.
597     */
598    public void close() throws JMSException {
599        // Store the interrupted state and clear so that cleanup happens without
600        // leaking connection resources.  Reset in finally to preserve state.
601        boolean interrupted = Thread.interrupted();
602
603        try {
604
605            // If we were running, lets stop first.
606            if (!closed.get() && !transportFailed.get()) {
607                stop();
608            }
609
610            synchronized (this) {
611                if (!closed.get()) {
612                    closing.set(true);
613
614                    if (destinationSource != null) {
615                        destinationSource.stop();
616                        destinationSource = null;
617                    }
618                    if (advisoryConsumer != null) {
619                        advisoryConsumer.dispose();
620                        advisoryConsumer = null;
621                    }
622
623                    Scheduler scheduler = this.scheduler;
624                    if (scheduler != null) {
625                        try {
626                            scheduler.stop();
627                        } catch (Exception e) {
628                            JMSException ex =  JMSExceptionSupport.create(e);
629                            throw ex;
630                        }
631                    }
632
633                    long lastDeliveredSequenceId = 0;
634                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
635                        ActiveMQSession s = i.next();
636                        s.dispose();
637                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
638                    }
639                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
640                        ActiveMQConnectionConsumer c = i.next();
641                        c.dispose();
642                    }
643                    for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
644                        ActiveMQInputStream c = i.next();
645                        c.dispose();
646                    }
647                    for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
648                        ActiveMQOutputStream c = i.next();
649                        c.dispose();
650                    }
651
652                    // As TemporaryQueue and TemporaryTopic instances are bound
653                    // to a connection we should just delete them after the connection
654                    // is closed to free up memory
655                    for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
656                        ActiveMQTempDestination c = i.next();
657                        c.delete();
658                    }
659
660                    if (isConnectionInfoSentToBroker) {
661                        // If we announced ourselfs to the broker.. Try to let
662                        // the broker
663                        // know that the connection is being shutdown.
664                        RemoveInfo removeCommand = info.createRemoveCommand();
665                        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
666                        doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
667                        doAsyncSendPacket(new ShutdownInfo());
668                    }
669
670                    started.set(false);
671
672                    // TODO if we move the TaskRunnerFactory to the connection
673                    // factory
674                    // then we may need to call
675                    // factory.onConnectionClose(this);
676                    if (sessionTaskRunner != null) {
677                        sessionTaskRunner.shutdown();
678                    }
679                    closed.set(true);
680                    closing.set(false);
681                }
682            }
683        } finally {
684            try {
685                if (executor != null) {
686                    executor.shutdown();
687                }
688            } catch (Throwable e) {
689                LOG.error("Error shutting down thread pool " + e, e);
690            }
691
692            ServiceSupport.dispose(this.transport);
693
694            factoryStats.removeConnection(this);
695            if (interrupted) {
696                Thread.currentThread().interrupt();
697            }
698        }
699    }
700
701    /**
702     * Tells the broker to terminate its VM. This can be used to cleanly
703     * terminate a broker running in a standalone java process. Server must have
704     * property enable.vm.shutdown=true defined to allow this to work.
705     */
706    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
707    // implemented.
708    /*
709     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
710     * command = new BrokerAdminCommand();
711     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
712     * asyncSendPacket(command); }
713     */
714
715    /**
716     * Create a durable connection consumer for this connection (optional
717     * operation). This is an expert facility not used by regular JMS clients.
718     *
719     * @param topic topic to access
720     * @param subscriptionName durable subscription name
721     * @param messageSelector only messages with properties matching the message
722     *                selector expression are delivered. A value of null or an
723     *                empty string indicates that there is no message selector
724     *                for the message consumer.
725     * @param sessionPool the server session pool to associate with this durable
726     *                connection consumer
727     * @param maxMessages the maximum number of messages that can be assigned to
728     *                a server session at one time
729     * @return the durable connection consumer
730     * @throws JMSException if the <CODE>Connection</CODE> object fails to
731     *                 create a connection consumer due to some internal error
732     *                 or invalid arguments for <CODE>sessionPool</CODE> and
733     *                 <CODE>messageSelector</CODE>.
734     * @throws javax.jms.InvalidDestinationException if an invalid destination
735     *                 is specified.
736     * @throws javax.jms.InvalidSelectorException if the message selector is
737     *                 invalid.
738     * @see javax.jms.ConnectionConsumer
739     * @since 1.1
740     */
741    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
742        throws JMSException {
743        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
744    }
745
746    /**
747     * Create a durable connection consumer for this connection (optional
748     * operation). This is an expert facility not used by regular JMS clients.
749     *
750     * @param topic topic to access
751     * @param subscriptionName durable subscription name
752     * @param messageSelector only messages with properties matching the message
753     *                selector expression are delivered. A value of null or an
754     *                empty string indicates that there is no message selector
755     *                for the message consumer.
756     * @param sessionPool the server session pool to associate with this durable
757     *                connection consumer
758     * @param maxMessages the maximum number of messages that can be assigned to
759     *                a server session at one time
760     * @param noLocal set true if you want to filter out messages published
761     *                locally
762     * @return the durable connection consumer
763     * @throws JMSException if the <CODE>Connection</CODE> object fails to
764     *                 create a connection consumer due to some internal error
765     *                 or invalid arguments for <CODE>sessionPool</CODE> and
766     *                 <CODE>messageSelector</CODE>.
767     * @throws javax.jms.InvalidDestinationException if an invalid destination
768     *                 is specified.
769     * @throws javax.jms.InvalidSelectorException if the message selector is
770     *                 invalid.
771     * @see javax.jms.ConnectionConsumer
772     * @since 1.1
773     */
774    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
775                                                              boolean noLocal) throws JMSException {
776        checkClosedOrFailed();
777        ensureConnectionInfoSent();
778        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
779        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
780        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
781        info.setSubscriptionName(subscriptionName);
782        info.setSelector(messageSelector);
783        info.setPrefetchSize(maxMessages);
784        info.setDispatchAsync(isDispatchAsync());
785
786        // Allows the options on the destination to configure the consumerInfo
787        if (info.getDestination().getOptions() != null) {
788            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
789            IntrospectionSupport.setProperties(this.info, options, "consumer.");
790        }
791
792        return new ActiveMQConnectionConsumer(this, sessionPool, info);
793    }
794
795    // Properties
796    // -------------------------------------------------------------------------
797
798    /**
799     * Returns true if this connection has been started
800     *
801     * @return true if this Connection is started
802     */
803    public boolean isStarted() {
804        return started.get();
805    }
806
807    /**
808     * Returns true if the connection is closed
809     */
810    public boolean isClosed() {
811        return closed.get();
812    }
813
814    /**
815     * Returns true if the connection is in the process of being closed
816     */
817    public boolean isClosing() {
818        return closing.get();
819    }
820
821    /**
822     * Returns true if the underlying transport has failed
823     */
824    public boolean isTransportFailed() {
825        return transportFailed.get();
826    }
827
828    /**
829     * @return Returns the prefetchPolicy.
830     */
831    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
832        return prefetchPolicy;
833    }
834
835    /**
836     * Sets the <a
837     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
838     * policy</a> for consumers created by this connection.
839     */
840    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
841        this.prefetchPolicy = prefetchPolicy;
842    }
843
844    /**
845     */
846    public Transport getTransportChannel() {
847        return transport;
848    }
849
850    /**
851     * @return Returns the clientID of the connection, forcing one to be
852     *         generated if one has not yet been configured.
853     */
854    public String getInitializedClientID() throws JMSException {
855        ensureConnectionInfoSent();
856        return info.getClientId();
857    }
858
859    /**
860     * @return Returns the timeStampsDisableByDefault.
861     */
862    public boolean isDisableTimeStampsByDefault() {
863        return disableTimeStampsByDefault;
864    }
865
866    /**
867     * Sets whether or not timestamps on messages should be disabled or not. If
868     * you disable them it adds a small performance boost.
869     */
870    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
871        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
872    }
873
874    /**
875     * @return Returns the dispatchOptimizedMessage.
876     */
877    public boolean isOptimizedMessageDispatch() {
878        return optimizedMessageDispatch;
879    }
880
881    /**
882     * If this flag is set then an larger prefetch limit is used - only
883     * applicable for durable topic subscribers.
884     */
885    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
886        this.optimizedMessageDispatch = dispatchOptimizedMessage;
887    }
888
889    /**
890     * @return Returns the closeTimeout.
891     */
892    public int getCloseTimeout() {
893        return closeTimeout;
894    }
895
896    /**
897     * Sets the timeout before a close is considered complete. Normally a
898     * close() on a connection waits for confirmation from the broker; this
899     * allows that operation to timeout to save the client hanging if there is
900     * no broker
901     */
902    public void setCloseTimeout(int closeTimeout) {
903        this.closeTimeout = closeTimeout;
904    }
905
906    /**
907     * @return ConnectionInfo
908     */
909    public ConnectionInfo getConnectionInfo() {
910        return this.info;
911    }
912
913    public boolean isUseRetroactiveConsumer() {
914        return useRetroactiveConsumer;
915    }
916
917    /**
918     * Sets whether or not retroactive consumers are enabled. Retroactive
919     * consumers allow non-durable topic subscribers to receive old messages
920     * that were published before the non-durable subscriber started.
921     */
922    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
923        this.useRetroactiveConsumer = useRetroactiveConsumer;
924    }
925
926    public boolean isNestedMapAndListEnabled() {
927        return nestedMapAndListEnabled;
928    }
929
930    /**
931     * Enables/disables whether or not Message properties and MapMessage entries
932     * support <a
933     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
934     * Structures</a> of Map and List objects
935     */
936    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
937        this.nestedMapAndListEnabled = structuredMapsEnabled;
938    }
939
940    public boolean isExclusiveConsumer() {
941        return exclusiveConsumer;
942    }
943
944    /**
945     * Enables or disables whether or not queue consumers should be exclusive or
946     * not for example to preserve ordering when not using <a
947     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
948     *
949     * @param exclusiveConsumer
950     */
951    public void setExclusiveConsumer(boolean exclusiveConsumer) {
952        this.exclusiveConsumer = exclusiveConsumer;
953    }
954
955    /**
956     * Adds a transport listener so that a client can be notified of events in
957     * the underlying transport
958     */
959    public void addTransportListener(TransportListener transportListener) {
960        transportListeners.add(transportListener);
961    }
962
963    public void removeTransportListener(TransportListener transportListener) {
964        transportListeners.remove(transportListener);
965    }
966
967    public boolean isUseDedicatedTaskRunner() {
968        return useDedicatedTaskRunner;
969    }
970
971    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
972        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
973    }
974
975    public TaskRunnerFactory getSessionTaskRunner() {
976        synchronized (this) {
977            if (sessionTaskRunner == null) {
978                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
979            }
980        }
981        return sessionTaskRunner;
982    }
983
984    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
985        this.sessionTaskRunner = sessionTaskRunner;
986    }
987
988    public MessageTransformer getTransformer() {
989        return transformer;
990    }
991
992    /**
993     * Sets the transformer used to transform messages before they are sent on
994     * to the JMS bus or when they are received from the bus but before they are
995     * delivered to the JMS client
996     */
997    public void setTransformer(MessageTransformer transformer) {
998        this.transformer = transformer;
999    }
1000
1001    /**
1002     * @return the statsEnabled
1003     */
1004    public boolean isStatsEnabled() {
1005        return this.stats.isEnabled();
1006    }
1007
1008    /**
1009     * @param statsEnabled the statsEnabled to set
1010     */
1011    public void setStatsEnabled(boolean statsEnabled) {
1012        this.stats.setEnabled(statsEnabled);
1013    }
1014
1015    /**
1016     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1017     * being created or destroyed or to enquire about the current destinations available on the broker
1018     *
1019     * @return a lazily created destination source
1020     * @throws JMSException
1021     */
1022    public DestinationSource getDestinationSource() throws JMSException {
1023        if (destinationSource == null) {
1024            destinationSource = new DestinationSource(this);
1025            destinationSource.start();
1026        }
1027        return destinationSource;
1028    }
1029
1030    // Implementation methods
1031    // -------------------------------------------------------------------------
1032
1033    /**
1034     * Used internally for adding Sessions to the Connection
1035     *
1036     * @param session
1037     * @throws JMSException
1038     * @throws JMSException
1039     */
1040    protected void addSession(ActiveMQSession session) throws JMSException {
1041        this.sessions.add(session);
1042        if (sessions.size() > 1 || session.isTransacted()) {
1043            optimizedMessageDispatch = false;
1044        }
1045    }
1046
1047    /**
1048     * Used interanlly for removing Sessions from a Connection
1049     *
1050     * @param session
1051     */
1052    protected void removeSession(ActiveMQSession session) {
1053        this.sessions.remove(session);
1054        this.removeDispatcher(session);
1055    }
1056
1057    /**
1058     * Add a ConnectionConsumer
1059     *
1060     * @param connectionConsumer
1061     * @throws JMSException
1062     */
1063    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1064        this.connectionConsumers.add(connectionConsumer);
1065    }
1066
1067    /**
1068     * Remove a ConnectionConsumer
1069     *
1070     * @param connectionConsumer
1071     */
1072    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1073        this.connectionConsumers.remove(connectionConsumer);
1074        this.removeDispatcher(connectionConsumer);
1075    }
1076
1077    /**
1078     * Creates a <CODE>TopicSession</CODE> object.
1079     *
1080     * @param transacted indicates whether the session is transacted
1081     * @param acknowledgeMode indicates whether the consumer or the client will
1082     *                acknowledge any messages it receives; ignored if the
1083     *                session is transacted. Legal values are
1084     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1085     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1086     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1087     * @return a newly created topic session
1088     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1089     *                 to create a session due to some internal error or lack of
1090     *                 support for the specific transaction and acknowledgement
1091     *                 mode.
1092     * @see Session#AUTO_ACKNOWLEDGE
1093     * @see Session#CLIENT_ACKNOWLEDGE
1094     * @see Session#DUPS_OK_ACKNOWLEDGE
1095     */
1096    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1097        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1098    }
1099
1100    /**
1101     * Creates a connection consumer for this connection (optional operation).
1102     * This is an expert facility not used by regular JMS clients.
1103     *
1104     * @param topic the topic to access
1105     * @param messageSelector only messages with properties matching the message
1106     *                selector expression are delivered. A value of null or an
1107     *                empty string indicates that there is no message selector
1108     *                for the message consumer.
1109     * @param sessionPool the server session pool to associate with this
1110     *                connection consumer
1111     * @param maxMessages the maximum number of messages that can be assigned to
1112     *                a server session at one time
1113     * @return the connection consumer
1114     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1115     *                 to create a connection consumer due to some internal
1116     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1117     *                 and <CODE>messageSelector</CODE>.
1118     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1119     *                 specified.
1120     * @throws javax.jms.InvalidSelectorException if the message selector is
1121     *                 invalid.
1122     * @see javax.jms.ConnectionConsumer
1123     */
1124    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1125        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1126    }
1127
1128    /**
1129     * Creates a connection consumer for this connection (optional operation).
1130     * This is an expert facility not used by regular JMS clients.
1131     *
1132     * @param queue the queue to access
1133     * @param messageSelector only messages with properties matching the message
1134     *                selector expression are delivered. A value of null or an
1135     *                empty string indicates that there is no message selector
1136     *                for the message consumer.
1137     * @param sessionPool the server session pool to associate with this
1138     *                connection consumer
1139     * @param maxMessages the maximum number of messages that can be assigned to
1140     *                a server session at one time
1141     * @return the connection consumer
1142     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1143     *                 to create a connection consumer due to some internal
1144     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1145     *                 and <CODE>messageSelector</CODE>.
1146     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1147     *                 specified.
1148     * @throws javax.jms.InvalidSelectorException if the message selector is
1149     *                 invalid.
1150     * @see javax.jms.ConnectionConsumer
1151     */
1152    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1153        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1154    }
1155
1156    /**
1157     * Creates a connection consumer for this connection (optional operation).
1158     * This is an expert facility not used by regular JMS clients.
1159     *
1160     * @param destination the destination to access
1161     * @param messageSelector only messages with properties matching the message
1162     *                selector expression are delivered. A value of null or an
1163     *                empty string indicates that there is no message selector
1164     *                for the message consumer.
1165     * @param sessionPool the server session pool to associate with this
1166     *                connection consumer
1167     * @param maxMessages the maximum number of messages that can be assigned to
1168     *                a server session at one time
1169     * @return the connection consumer
1170     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1171     *                 create a connection consumer due to some internal error
1172     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1173     *                 <CODE>messageSelector</CODE>.
1174     * @throws javax.jms.InvalidDestinationException if an invalid destination
1175     *                 is specified.
1176     * @throws javax.jms.InvalidSelectorException if the message selector is
1177     *                 invalid.
1178     * @see javax.jms.ConnectionConsumer
1179     * @since 1.1
1180     */
1181    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1182        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1183    }
1184
1185    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1186        throws JMSException {
1187
1188        checkClosedOrFailed();
1189        ensureConnectionInfoSent();
1190
1191        ConsumerId consumerId = createConsumerId();
1192        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1193        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1194        consumerInfo.setSelector(messageSelector);
1195        consumerInfo.setPrefetchSize(maxMessages);
1196        consumerInfo.setNoLocal(noLocal);
1197        consumerInfo.setDispatchAsync(isDispatchAsync());
1198
1199        // Allows the options on the destination to configure the consumerInfo
1200        if (consumerInfo.getDestination().getOptions() != null) {
1201            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1202            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1203        }
1204
1205        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1206    }
1207
1208    /**
1209     * @return
1210     */
1211    private ConsumerId createConsumerId() {
1212        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1213    }
1214
1215    /**
1216     * @return
1217     */
1218    private ProducerId createProducerId() {
1219        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1220    }
1221
1222    /**
1223     * Creates a <CODE>QueueSession</CODE> object.
1224     *
1225     * @param transacted indicates whether the session is transacted
1226     * @param acknowledgeMode indicates whether the consumer or the client will
1227     *                acknowledge any messages it receives; ignored if the
1228     *                session is transacted. Legal values are
1229     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1230     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1231     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1232     * @return a newly created queue session
1233     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1234     *                 to create a session due to some internal error or lack of
1235     *                 support for the specific transaction and acknowledgement
1236     *                 mode.
1237     * @see Session#AUTO_ACKNOWLEDGE
1238     * @see Session#CLIENT_ACKNOWLEDGE
1239     * @see Session#DUPS_OK_ACKNOWLEDGE
1240     */
1241    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1242        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1243    }
1244
1245    /**
1246     * Ensures that the clientID was manually specified and not auto-generated.
1247     * If the clientID was not specified this method will throw an exception.
1248     * This method is used to ensure that the clientID + durableSubscriber name
1249     * are used correctly.
1250     *
1251     * @throws JMSException
1252     */
1253    public void checkClientIDWasManuallySpecified() throws JMSException {
1254        if (!userSpecifiedClientID) {
1255            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1256        }
1257    }
1258
1259    /**
1260     * send a Packet through the Connection - for internal use only
1261     *
1262     * @param command
1263     * @throws JMSException
1264     */
1265    public void asyncSendPacket(Command command) throws JMSException {
1266        if (isClosed()) {
1267            throw new ConnectionClosedException();
1268        } else {
1269            doAsyncSendPacket(command);
1270        }
1271    }
1272
1273    private void doAsyncSendPacket(Command command) throws JMSException {
1274        try {
1275            this.transport.oneway(command);
1276        } catch (IOException e) {
1277            throw JMSExceptionSupport.create(e);
1278        }
1279    }
1280
1281    /**
1282     * Send a packet through a Connection - for internal use only
1283     *
1284     * @param command
1285     * @return
1286     * @throws JMSException
1287     */
1288    public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException {
1289        if(onComplete==null) {
1290            syncSendPacket(command);
1291        } else {
1292            if (isClosed()) {
1293                throw new ConnectionClosedException();
1294            }
1295            try {
1296                this.transport.asyncRequest(command, new ResponseCallback() {
1297                    @Override
1298                    public void onCompletion(FutureResponse resp) {
1299                        Response response;
1300                        Throwable exception = null;
1301                        try {
1302                            response = resp.getResult();
1303                            if (response.isException()) {
1304                                ExceptionResponse er = (ExceptionResponse)response;
1305                                exception = er.getException();
1306                            }
1307                        } catch (Exception e) {
1308                            exception = e;
1309                        }
1310                        if(exception!=null) {
1311                            if ( exception instanceof JMSException) {
1312                                onComplete.onException((JMSException) exception);
1313                            } else {
1314                                if (isClosed()||closing.get()) {
1315                                    LOG.debug("Received an exception but connection is closing");
1316                                }
1317                                JMSException jmsEx = null;
1318                                try {
1319                                    jmsEx = JMSExceptionSupport.create(exception);
1320                                } catch(Throwable e) {
1321                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1322                                }
1323                                //dispose of transport for security exceptions
1324                                if (exception instanceof SecurityException){
1325                                    Transport t = transport;
1326                                    if (null != t){
1327                                        ServiceSupport.dispose(t);
1328                                    }
1329                                }
1330                                if (jmsEx !=null) {
1331                                    onComplete.onException(jmsEx);
1332                                }
1333                            }
1334                        } else {
1335                            onComplete.onSuccess();
1336                        }
1337                    }
1338                });
1339            } catch (IOException e) {
1340                throw JMSExceptionSupport.create(e);
1341            }
1342        }
1343    }
1344
1345    public Response syncSendPacket(Command command) throws JMSException {
1346        if (isClosed()) {
1347            throw new ConnectionClosedException();
1348        } else {
1349
1350            try {
1351                Response response = (Response)this.transport.request(command);
1352                if (response.isException()) {
1353                    ExceptionResponse er = (ExceptionResponse)response;
1354                    if (er.getException() instanceof JMSException) {
1355                        throw (JMSException)er.getException();
1356                    } else {
1357                        if (isClosed()||closing.get()) {
1358                            LOG.debug("Received an exception but connection is closing");
1359                        }
1360                        JMSException jmsEx = null;
1361                        try {
1362                            jmsEx = JMSExceptionSupport.create(er.getException());
1363                        } catch(Throwable e) {
1364                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1365                        }
1366                        //dispose of transport for security exceptions
1367                        if (er.getException() instanceof SecurityException){
1368                            Transport t = this.transport;
1369                            if (null != t){
1370                                ServiceSupport.dispose(t);
1371                            }
1372                        }
1373                        if (jmsEx !=null) {
1374                            throw jmsEx;
1375                        }
1376                    }
1377                }
1378                return response;
1379            } catch (IOException e) {
1380                throw JMSExceptionSupport.create(e);
1381            }
1382        }
1383    }
1384
1385    /**
1386     * Send a packet through a Connection - for internal use only
1387     *
1388     * @param command
1389     * @return
1390     * @throws JMSException
1391     */
1392    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1393        if (isClosed() || closing.get()) {
1394            throw new ConnectionClosedException();
1395        } else {
1396            return doSyncSendPacket(command, timeout);
1397        }
1398    }
1399
1400    private Response doSyncSendPacket(Command command, int timeout)
1401            throws JMSException {
1402        try {
1403            Response response = (Response) (timeout > 0
1404                    ? this.transport.request(command, timeout)
1405                    : this.transport.request(command));
1406            if (response != null && response.isException()) {
1407                ExceptionResponse er = (ExceptionResponse)response;
1408                if (er.getException() instanceof JMSException) {
1409                    throw (JMSException)er.getException();
1410                } else {
1411                    throw JMSExceptionSupport.create(er.getException());
1412                }
1413            }
1414            return response;
1415        } catch (IOException e) {
1416            throw JMSExceptionSupport.create(e);
1417        }
1418    }
1419
1420    /**
1421     * @return statistics for this Connection
1422     */
1423    public StatsImpl getStats() {
1424        return stats;
1425    }
1426
1427    /**
1428     * simply throws an exception if the Connection is already closed or the
1429     * Transport has failed
1430     *
1431     * @throws JMSException
1432     */
1433    protected synchronized void checkClosedOrFailed() throws JMSException {
1434        checkClosed();
1435        if (transportFailed.get()) {
1436            throw new ConnectionFailedException(firstFailureError);
1437        }
1438    }
1439
1440    /**
1441     * simply throws an exception if the Connection is already closed
1442     *
1443     * @throws JMSException
1444     */
1445    protected synchronized void checkClosed() throws JMSException {
1446        if (closed.get()) {
1447            throw new ConnectionClosedException();
1448        }
1449    }
1450
1451    /**
1452     * Send the ConnectionInfo to the Broker
1453     *
1454     * @throws JMSException
1455     */
1456    protected void ensureConnectionInfoSent() throws JMSException {
1457        synchronized(this.ensureConnectionInfoSentMutex) {
1458            // Can we skip sending the ConnectionInfo packet??
1459            if (isConnectionInfoSentToBroker || closed.get()) {
1460                return;
1461            }
1462            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1463            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1464                info.setClientId(clientIdGenerator.generateId());
1465            }
1466            syncSendPacket(info.copy());
1467
1468            this.isConnectionInfoSentToBroker = true;
1469            // Add a temp destination advisory consumer so that
1470            // We know what the valid temporary destinations are on the
1471            // broker without having to do an RPC to the broker.
1472
1473            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1474            if (watchTopicAdvisories) {
1475                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1476            }
1477        }
1478    }
1479
1480    public synchronized boolean isWatchTopicAdvisories() {
1481        return watchTopicAdvisories;
1482    }
1483
1484    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1485        this.watchTopicAdvisories = watchTopicAdvisories;
1486    }
1487
1488    /**
1489     * @return Returns the useAsyncSend.
1490     */
1491    public boolean isUseAsyncSend() {
1492        return useAsyncSend;
1493    }
1494
1495    /**
1496     * Forces the use of <a
1497     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1498     * adds a massive performance boost; but means that the send() method will
1499     * return immediately whether the message has been sent or not which could
1500     * lead to message loss.
1501     */
1502    public void setUseAsyncSend(boolean useAsyncSend) {
1503        this.useAsyncSend = useAsyncSend;
1504    }
1505
1506    /**
1507     * @return true if always sync send messages
1508     */
1509    public boolean isAlwaysSyncSend() {
1510        return this.alwaysSyncSend;
1511    }
1512
1513    /**
1514     * Set true if always require messages to be sync sent
1515     *
1516     * @param alwaysSyncSend
1517     */
1518    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1519        this.alwaysSyncSend = alwaysSyncSend;
1520    }
1521
1522    /**
1523     * @return the messagePrioritySupported
1524     */
1525    public boolean isMessagePrioritySupported() {
1526        return this.messagePrioritySupported;
1527    }
1528
1529    /**
1530     * @param messagePrioritySupported the messagePrioritySupported to set
1531     */
1532    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1533        this.messagePrioritySupported = messagePrioritySupported;
1534    }
1535
1536    /**
1537     * Cleans up this connection so that it's state is as if the connection was
1538     * just created. This allows the Resource Adapter to clean up a connection
1539     * so that it can be reused without having to close and recreate the
1540     * connection.
1541     */
1542    public void cleanup() throws JMSException {
1543
1544        if (advisoryConsumer != null && !isTransportFailed()) {
1545            advisoryConsumer.dispose();
1546            advisoryConsumer = null;
1547        }
1548
1549        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1550            ActiveMQSession s = i.next();
1551            s.dispose();
1552        }
1553        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1554            ActiveMQConnectionConsumer c = i.next();
1555            c.dispose();
1556        }
1557        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1558            ActiveMQInputStream c = i.next();
1559            c.dispose();
1560        }
1561        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1562            ActiveMQOutputStream c = i.next();
1563            c.dispose();
1564        }
1565
1566        if (isConnectionInfoSentToBroker) {
1567            if (!transportFailed.get() && !closing.get()) {
1568                syncSendPacket(info.createRemoveCommand());
1569            }
1570            isConnectionInfoSentToBroker = false;
1571        }
1572        if (userSpecifiedClientID) {
1573            info.setClientId(null);
1574            userSpecifiedClientID = false;
1575        }
1576        clientIDSet = false;
1577
1578        started.set(false);
1579    }
1580
1581    public void finalize() throws Throwable{
1582        Scheduler s = this.scheduler;
1583        if (s != null){
1584            s.stop();
1585        }
1586    }
1587
1588    /**
1589     * Changes the associated username/password that is associated with this
1590     * connection. If the connection has been used, you must called cleanup()
1591     * before calling this method.
1592     *
1593     * @throws IllegalStateException if the connection is in used.
1594     */
1595    public void changeUserInfo(String userName, String password) throws JMSException {
1596        if (isConnectionInfoSentToBroker) {
1597            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1598        }
1599        this.info.setUserName(userName);
1600        this.info.setPassword(password);
1601    }
1602
1603    /**
1604     * @return Returns the resourceManagerId.
1605     * @throws JMSException
1606     */
1607    public String getResourceManagerId() throws JMSException {
1608        waitForBrokerInfo();
1609        if (brokerInfo == null) {
1610            throw new JMSException("Connection failed before Broker info was received.");
1611        }
1612        return brokerInfo.getBrokerId().getValue();
1613    }
1614
1615    /**
1616     * Returns the broker name if one is available or null if one is not
1617     * available yet.
1618     */
1619    public String getBrokerName() {
1620        try {
1621            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1622            if (brokerInfo == null) {
1623                return null;
1624            }
1625            return brokerInfo.getBrokerName();
1626        } catch (InterruptedException e) {
1627            Thread.currentThread().interrupt();
1628            return null;
1629        }
1630    }
1631
1632    /**
1633     * Returns the broker information if it is available or null if it is not
1634     * available yet.
1635     */
1636    public BrokerInfo getBrokerInfo() {
1637        return brokerInfo;
1638    }
1639
1640    /**
1641     * @return Returns the RedeliveryPolicy.
1642     * @throws JMSException
1643     */
1644    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1645        return redeliveryPolicy;
1646    }
1647
1648    /**
1649     * Sets the redelivery policy to be used when messages are rolled back
1650     */
1651    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1652        this.redeliveryPolicy = redeliveryPolicy;
1653    }
1654
1655    public BlobTransferPolicy getBlobTransferPolicy() {
1656        if (blobTransferPolicy == null) {
1657            blobTransferPolicy = createBlobTransferPolicy();
1658        }
1659        return blobTransferPolicy;
1660    }
1661
1662    /**
1663     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1664     * OBjects) are transferred from producers to brokers to consumers
1665     */
1666    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1667        this.blobTransferPolicy = blobTransferPolicy;
1668    }
1669
1670    /**
1671     * @return Returns the alwaysSessionAsync.
1672     */
1673    public boolean isAlwaysSessionAsync() {
1674        return alwaysSessionAsync;
1675    }
1676
1677    /**
1678     * If this flag is set then a separate thread is not used for dispatching
1679     * messages for each Session in the Connection. However, a separate thread
1680     * is always used if there is more than one session, or the session isn't in
1681     * auto acknowledge or duplicates ok mode
1682     */
1683    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1684        this.alwaysSessionAsync = alwaysSessionAsync;
1685    }
1686
1687    /**
1688     * @return Returns the optimizeAcknowledge.
1689     */
1690    public boolean isOptimizeAcknowledge() {
1691        return optimizeAcknowledge;
1692    }
1693
1694    /**
1695     * Enables an optimised acknowledgement mode where messages are acknowledged
1696     * in batches rather than individually
1697     *
1698     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1699     */
1700    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1701        this.optimizeAcknowledge = optimizeAcknowledge;
1702    }
1703
1704    /**
1705     * The max time in milliseconds between optimized ack batches
1706     * @param optimizeAcknowledgeTimeOut
1707     */
1708    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1709        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1710    }
1711
1712    public long getOptimizeAcknowledgeTimeOut() {
1713        return optimizeAcknowledgeTimeOut;
1714    }
1715
1716    public long getWarnAboutUnstartedConnectionTimeout() {
1717        return warnAboutUnstartedConnectionTimeout;
1718    }
1719
1720    /**
1721     * Enables the timeout from a connection creation to when a warning is
1722     * generated if the connection is not properly started via {@link #start()}
1723     * and a message is received by a consumer. It is a very common gotcha to
1724     * forget to <a
1725     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1726     * the connection</a> so this option makes the default case to create a
1727     * warning if the user forgets. To disable the warning just set the value to <
1728     * 0 (say -1).
1729     */
1730    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1731        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1732    }
1733
1734    /**
1735     * @return the sendTimeout
1736     */
1737    public int getSendTimeout() {
1738        return sendTimeout;
1739    }
1740
1741    /**
1742     * @param sendTimeout the sendTimeout to set
1743     */
1744    public void setSendTimeout(int sendTimeout) {
1745        this.sendTimeout = sendTimeout;
1746    }
1747
1748    /**
1749     * @return the sendAcksAsync
1750     */
1751    public boolean isSendAcksAsync() {
1752        return sendAcksAsync;
1753    }
1754
1755    /**
1756     * @param sendAcksAsync the sendAcksAsync to set
1757     */
1758    public void setSendAcksAsync(boolean sendAcksAsync) {
1759        this.sendAcksAsync = sendAcksAsync;
1760    }
1761
1762
1763    /**
1764     * Returns the time this connection was created
1765     */
1766    public long getTimeCreated() {
1767        return timeCreated;
1768    }
1769
1770    private void waitForBrokerInfo() throws JMSException {
1771        try {
1772            brokerInfoReceived.await();
1773        } catch (InterruptedException e) {
1774            Thread.currentThread().interrupt();
1775            throw JMSExceptionSupport.create(e);
1776        }
1777    }
1778
1779    // Package protected so that it can be used in unit tests
1780    public Transport getTransport() {
1781        return transport;
1782    }
1783
1784    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1785        producers.put(producerId, producer);
1786    }
1787
1788    public void removeProducer(ProducerId producerId) {
1789        producers.remove(producerId);
1790    }
1791
1792    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1793        dispatchers.put(consumerId, dispatcher);
1794    }
1795
1796    public void removeDispatcher(ConsumerId consumerId) {
1797        dispatchers.remove(consumerId);
1798    }
1799
1800    /**
1801     * @param o - the command to consume
1802     */
1803    public void onCommand(final Object o) {
1804        final Command command = (Command)o;
1805        if (!closed.get() && command != null) {
1806            try {
1807                command.visit(new CommandVisitorAdapter() {
1808                    @Override
1809                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1810                        waitForTransportInterruptionProcessingToComplete();
1811                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1812                        if (dispatcher != null) {
1813                            // Copy in case a embedded broker is dispatching via
1814                            // vm://
1815                            // md.getMessage() == null to signal end of queue
1816                            // browse.
1817                            Message msg = md.getMessage();
1818                            if (msg != null) {
1819                                msg = msg.copy();
1820                                msg.setReadOnlyBody(true);
1821                                msg.setReadOnlyProperties(true);
1822                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1823                                msg.setConnection(ActiveMQConnection.this);
1824                                md.setMessage(msg);
1825                            }
1826                            dispatcher.dispatch(md);
1827                        }
1828                        return null;
1829                    }
1830
1831                    @Override
1832                    public Response processProducerAck(ProducerAck pa) throws Exception {
1833                        if (pa != null && pa.getProducerId() != null) {
1834                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1835                            if (producer != null) {
1836                                producer.onProducerAck(pa);
1837                            }
1838                        }
1839                        return null;
1840                    }
1841
1842                    @Override
1843                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1844                        brokerInfo = info;
1845                        brokerInfoReceived.countDown();
1846                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1847                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1848                        return null;
1849                    }
1850
1851                    @Override
1852                    public Response processConnectionError(final ConnectionError error) throws Exception {
1853                        executor.execute(new Runnable() {
1854                            public void run() {
1855                                onAsyncException(error.getException());
1856                            }
1857                        });
1858                        return null;
1859                    }
1860
1861                    @Override
1862                    public Response processControlCommand(ControlCommand command) throws Exception {
1863                        onControlCommand(command);
1864                        return null;
1865                    }
1866
1867                    @Override
1868                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1869                        onConnectionControl((ConnectionControl)command);
1870                        return null;
1871                    }
1872
1873                    @Override
1874                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1875                        onConsumerControl((ConsumerControl)command);
1876                        return null;
1877                    }
1878
1879                    @Override
1880                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1881                        onWireFormatInfo((WireFormatInfo)command);
1882                        return null;
1883                    }
1884                });
1885            } catch (Exception e) {
1886                onClientInternalException(e);
1887            }
1888
1889        }
1890        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1891            TransportListener listener = iter.next();
1892            listener.onCommand(command);
1893        }
1894    }
1895
1896    protected void onWireFormatInfo(WireFormatInfo info) {
1897        protocolVersion.set(info.getVersion());
1898    }
1899
1900    /**
1901     * Handles async client internal exceptions.
1902     * A client internal exception is usually one that has been thrown
1903     * by a container runtime component during asynchronous processing of a
1904     * message that does not affect the connection itself.
1905     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1906     * its <code>onException</code> method, if one has been registered with this connection.
1907     *
1908     * @param error the exception that the problem
1909     */
1910    public void onClientInternalException(final Throwable error) {
1911        if ( !closed.get() && !closing.get() ) {
1912            if ( this.clientInternalExceptionListener != null ) {
1913                executor.execute(new Runnable() {
1914                    public void run() {
1915                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1916                    }
1917                });
1918            } else {
1919                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1920                        + error, error);
1921            }
1922        }
1923    }
1924    /**
1925     * Used for handling async exceptions
1926     *
1927     * @param error
1928     */
1929    public void onAsyncException(Throwable error) {
1930        if (!closed.get() && !closing.get()) {
1931            if (this.exceptionListener != null) {
1932
1933                if (!(error instanceof JMSException)) {
1934                    error = JMSExceptionSupport.create(error);
1935                }
1936                final JMSException e = (JMSException)error;
1937
1938                executor.execute(new Runnable() {
1939                    public void run() {
1940                        ActiveMQConnection.this.exceptionListener.onException(e);
1941                    }
1942                });
1943
1944            } else {
1945                LOG.debug("Async exception with no exception listener: " + error, error);
1946            }
1947        }
1948    }
1949
1950    public void onException(final IOException error) {
1951        onAsyncException(error);
1952        if (!closing.get() && !closed.get()) {
1953            executor.execute(new Runnable() {
1954                public void run() {
1955                    transportFailed(error);
1956                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1957                    brokerInfoReceived.countDown();
1958                    try {
1959                        cleanup();
1960                    } catch (JMSException e) {
1961                        LOG.warn("Exception during connection cleanup, " + e, e);
1962                    }
1963                    for (Iterator<TransportListener> iter = transportListeners
1964                            .iterator(); iter.hasNext();) {
1965                        TransportListener listener = iter.next();
1966                        listener.onException(error);
1967                    }
1968                }
1969            });
1970        }
1971    }
1972
1973    public void transportInterupted() {
1974        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1975        if (LOG.isDebugEnabled()) {
1976            LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1977        }
1978        signalInterruptionProcessingNeeded();
1979
1980        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1981            ActiveMQSession s = i.next();
1982            s.clearMessagesInProgress();
1983        }
1984
1985        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1986            connectionConsumer.clearMessagesInProgress();
1987        }
1988
1989        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1990            TransportListener listener = iter.next();
1991            listener.transportInterupted();
1992        }
1993    }
1994
1995    public void transportResumed() {
1996        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1997            TransportListener listener = iter.next();
1998            listener.transportResumed();
1999        }
2000    }
2001
2002    /**
2003     * Create the DestinationInfo object for the temporary destination.
2004     *
2005     * @param topic - if its true topic, else queue.
2006     * @return DestinationInfo
2007     * @throws JMSException
2008     */
2009    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2010
2011        // Check if Destination info is of temporary type.
2012        ActiveMQTempDestination dest;
2013        if (topic) {
2014            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2015        } else {
2016            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2017        }
2018
2019        DestinationInfo info = new DestinationInfo();
2020        info.setConnectionId(this.info.getConnectionId());
2021        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2022        info.setDestination(dest);
2023        syncSendPacket(info);
2024
2025        dest.setConnection(this);
2026        activeTempDestinations.put(dest, dest);
2027        return dest;
2028    }
2029
2030    /**
2031     * @param destination
2032     * @throws JMSException
2033     */
2034    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2035
2036        checkClosedOrFailed();
2037
2038        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2039            ActiveMQSession s = i.next();
2040            if (s.isInUse(destination)) {
2041                throw new JMSException("A consumer is consuming from the temporary destination");
2042            }
2043        }
2044
2045        activeTempDestinations.remove(destination);
2046
2047        DestinationInfo destInfo = new DestinationInfo();
2048        destInfo.setConnectionId(this.info.getConnectionId());
2049        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2050        destInfo.setDestination(destination);
2051        destInfo.setTimeout(0);
2052        syncSendPacket(destInfo);
2053    }
2054
2055    public boolean isDeleted(ActiveMQDestination dest) {
2056
2057        // If we are not watching the advisories.. then
2058        // we will assume that the temp destination does exist.
2059        if (advisoryConsumer == null) {
2060            return false;
2061        }
2062
2063        return !activeTempDestinations.contains(dest);
2064    }
2065
2066    public boolean isCopyMessageOnSend() {
2067        return copyMessageOnSend;
2068    }
2069
2070    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2071        return localTransactionIdGenerator;
2072    }
2073
2074    public boolean isUseCompression() {
2075        return useCompression;
2076    }
2077
2078    /**
2079     * Enables the use of compression of the message bodies
2080     */
2081    public void setUseCompression(boolean useCompression) {
2082        this.useCompression = useCompression;
2083    }
2084
2085    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2086
2087        checkClosedOrFailed();
2088        ensureConnectionInfoSent();
2089
2090        DestinationInfo info = new DestinationInfo();
2091        info.setConnectionId(this.info.getConnectionId());
2092        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2093        info.setDestination(destination);
2094        info.setTimeout(0);
2095        syncSendPacket(info);
2096
2097    }
2098
2099    public boolean isDispatchAsync() {
2100        return dispatchAsync;
2101    }
2102
2103    /**
2104     * Enables or disables the default setting of whether or not consumers have
2105     * their messages <a
2106     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2107     * synchronously or asynchronously by the broker</a>. For non-durable
2108     * topics for example we typically dispatch synchronously by default to
2109     * minimize context switches which boost performance. However sometimes its
2110     * better to go slower to ensure that a single blocked consumer socket does
2111     * not block delivery to other consumers.
2112     *
2113     * @param asyncDispatch If true then consumers created on this connection
2114     *                will default to having their messages dispatched
2115     *                asynchronously. The default value is true.
2116     */
2117    public void setDispatchAsync(boolean asyncDispatch) {
2118        this.dispatchAsync = asyncDispatch;
2119    }
2120
2121    public boolean isObjectMessageSerializationDefered() {
2122        return objectMessageSerializationDefered;
2123    }
2124
2125    /**
2126     * When an object is set on an ObjectMessage, the JMS spec requires the
2127     * object to be serialized by that set method. Enabling this flag causes the
2128     * object to not get serialized. The object may subsequently get serialized
2129     * if the message needs to be sent over a socket or stored to disk.
2130     */
2131    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2132        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2133    }
2134
2135    public InputStream createInputStream(Destination dest) throws JMSException {
2136        return createInputStream(dest, null);
2137    }
2138
2139    public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2140        return createInputStream(dest, messageSelector, false);
2141    }
2142
2143    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2144        return createInputStream(dest, messageSelector, noLocal,  -1);
2145    }
2146
2147
2148
2149    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2150        return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2151    }
2152
2153    public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2154        return createInputStream(dest, null, false);
2155    }
2156
2157    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2158        return createDurableInputStream(dest, name, messageSelector, false);
2159    }
2160
2161    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2162        return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2163    }
2164
2165    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2166        return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2167    }
2168
2169    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2170        checkClosedOrFailed();
2171        ensureConnectionInfoSent();
2172        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2173    }
2174
2175    /**
2176     * Creates a persistent output stream; individual messages will be written
2177     * to disk/database by the broker
2178     */
2179    public OutputStream createOutputStream(Destination dest) throws JMSException {
2180        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2181    }
2182
2183    /**
2184     * Creates a non persistent output stream; messages will not be written to
2185     * disk
2186     */
2187    public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2188        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2189    }
2190
2191    /**
2192     * Creates an output stream allowing full control over the delivery mode,
2193     * the priority and time to live of the messages and the properties added to
2194     * messages on the stream.
2195     *
2196     * @param streamProperties defines a map of key-value pairs where the keys
2197     *                are strings and the values are primitive values (numbers
2198     *                and strings) which are appended to the messages similarly
2199     *                to using the
2200     *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2201     *                method
2202     */
2203    public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2204        checkClosedOrFailed();
2205        ensureConnectionInfoSent();
2206        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2207    }
2208
2209    /**
2210     * Unsubscribes a durable subscription that has been created by a client.
2211     * <P>
2212     * This method deletes the state being maintained on behalf of the
2213     * subscriber by its provider.
2214     * <P>
2215     * It is erroneous for a client to delete a durable subscription while there
2216     * is an active <CODE>MessageConsumer </CODE> or
2217     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2218     * message is part of a pending transaction or has not been acknowledged in
2219     * the session.
2220     *
2221     * @param name the name used to identify this subscription
2222     * @throws JMSException if the session fails to unsubscribe to the durable
2223     *                 subscription due to some internal error.
2224     * @throws InvalidDestinationException if an invalid subscription name is
2225     *                 specified.
2226     * @since 1.1
2227     */
2228    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2229        checkClosedOrFailed();
2230        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2231        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2232        rsi.setSubscriptionName(name);
2233        rsi.setClientId(getConnectionInfo().getClientId());
2234        syncSendPacket(rsi);
2235    }
2236
2237    /**
2238     * Internal send method optimized: - It does not copy the message - It can
2239     * only handle ActiveMQ messages. - You can specify if the send is async or
2240     * sync - Does not allow you to send /w a transaction.
2241     */
2242    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2243        checkClosedOrFailed();
2244
2245        if (destination.isTemporary() && isDeleted(destination)) {
2246            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2247        }
2248
2249        msg.setJMSDestination(destination);
2250        msg.setJMSDeliveryMode(deliveryMode);
2251        long expiration = 0L;
2252
2253        if (!isDisableTimeStampsByDefault()) {
2254            long timeStamp = System.currentTimeMillis();
2255            msg.setJMSTimestamp(timeStamp);
2256            if (timeToLive > 0) {
2257                expiration = timeToLive + timeStamp;
2258            }
2259        }
2260
2261        msg.setJMSExpiration(expiration);
2262        msg.setJMSPriority(priority);
2263
2264        msg.setJMSRedelivered(false);
2265        msg.setMessageId(messageId);
2266
2267        msg.onSend();
2268
2269        msg.setProducerId(msg.getMessageId().getProducerId());
2270
2271        if (LOG.isDebugEnabled()) {
2272            LOG.debug("Sending message: " + msg);
2273        }
2274
2275        if (async) {
2276            asyncSendPacket(msg);
2277        } else {
2278            syncSendPacket(msg);
2279        }
2280
2281    }
2282
2283    public void addOutputStream(ActiveMQOutputStream stream) {
2284        outputStreams.add(stream);
2285    }
2286
2287    public void removeOutputStream(ActiveMQOutputStream stream) {
2288        outputStreams.remove(stream);
2289    }
2290
2291    public void addInputStream(ActiveMQInputStream stream) {
2292        inputStreams.add(stream);
2293    }
2294
2295    public void removeInputStream(ActiveMQInputStream stream) {
2296        inputStreams.remove(stream);
2297    }
2298
2299    protected void onControlCommand(ControlCommand command) {
2300        String text = command.getCommand();
2301        if (text != null) {
2302            if ("shutdown".equals(text)) {
2303                LOG.info("JVM told to shutdown");
2304                System.exit(0);
2305            }
2306            if (false && "close".equals(text)){
2307                LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2308                try {
2309                    close();
2310                } catch (JMSException e) {
2311                }
2312            }
2313        }
2314    }
2315
2316    protected void onConnectionControl(ConnectionControl command) {
2317        if (command.isFaultTolerant()) {
2318            this.optimizeAcknowledge = false;
2319            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2320                ActiveMQSession s = i.next();
2321                s.setOptimizeAcknowledge(false);
2322            }
2323        }
2324    }
2325
2326    protected void onConsumerControl(ConsumerControl command) {
2327        if (command.isClose()) {
2328            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2329                ActiveMQSession s = i.next();
2330                s.close(command.getConsumerId());
2331            }
2332        } else {
2333            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2334                ActiveMQSession s = i.next();
2335                s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2336            }
2337        }
2338    }
2339
2340    protected void transportFailed(IOException error) {
2341        transportFailed.set(true);
2342        if (firstFailureError == null) {
2343            firstFailureError = error;
2344        }
2345    }
2346
2347    /**
2348     * Should a JMS message be copied to a new JMS Message object as part of the
2349     * send() method in JMS. This is enabled by default to be compliant with the
2350     * JMS specification. You can disable it if you do not mutate JMS messages
2351     * after they are sent for a performance boost
2352     */
2353    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2354        this.copyMessageOnSend = copyMessageOnSend;
2355    }
2356
2357    @Override
2358    public String toString() {
2359        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2360    }
2361
2362    protected BlobTransferPolicy createBlobTransferPolicy() {
2363        return new BlobTransferPolicy();
2364    }
2365
2366    public int getProtocolVersion() {
2367        return protocolVersion.get();
2368    }
2369
2370    public int getProducerWindowSize() {
2371        return producerWindowSize;
2372    }
2373
2374    public void setProducerWindowSize(int producerWindowSize) {
2375        this.producerWindowSize = producerWindowSize;
2376    }
2377
2378    public void setAuditDepth(int auditDepth) {
2379        connectionAudit.setAuditDepth(auditDepth);
2380    }
2381
2382    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2383        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2384    }
2385
2386    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2387        connectionAudit.removeDispatcher(dispatcher);
2388    }
2389
2390    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2391        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2392    }
2393
2394    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2395        connectionAudit.rollbackDuplicate(dispatcher, message);
2396    }
2397
2398    public IOException getFirstFailureError() {
2399        return firstFailureError;
2400    }
2401
2402    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2403        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2404        if (cdl != null) {
2405            if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2406                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2407                cdl.await(10, TimeUnit.SECONDS);
2408            }
2409            signalInterruptionProcessingComplete();
2410        }
2411    }
2412
2413    protected void transportInterruptionProcessingComplete() {
2414        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2415        if (cdl != null) {
2416            cdl.countDown();
2417            try {
2418                signalInterruptionProcessingComplete();
2419            } catch (InterruptedException ignored) {}
2420        }
2421    }
2422
2423    private void signalInterruptionProcessingComplete() throws InterruptedException {
2424        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2425        if (cdl.getCount()==0) {
2426            if (LOG.isDebugEnabled()) {
2427                LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2428            }
2429            this.transportInterruptionProcessingComplete = null;
2430
2431            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2432            if (failoverTransport != null) {
2433                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2434                if (LOG.isDebugEnabled()) {
2435                    LOG.debug("notified failover transport (" + failoverTransport
2436                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2437                }
2438            }
2439
2440        }
2441    }
2442
2443    private void signalInterruptionProcessingNeeded() {
2444        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2445        if (failoverTransport != null) {
2446            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2447            if (LOG.isDebugEnabled()) {
2448                LOG.debug("notified failover transport (" + failoverTransport
2449                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2450            }
2451        }
2452    }
2453
2454    /*
2455     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2456     * will wait to receive re dispatched messages.
2457     * default value is 0 so there is no wait by default.
2458     */
2459    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2460        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2461    }
2462
2463    public long getConsumerFailoverRedeliveryWaitPeriod() {
2464        return consumerFailoverRedeliveryWaitPeriod;
2465    }
2466
2467    protected Scheduler getScheduler() throws JMSException {
2468        Scheduler result = scheduler;
2469        if (result == null) {
2470            synchronized (this) {
2471                result = scheduler;
2472                if (result == null) {
2473                    checkClosed();
2474                    try {
2475                        result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2476                        scheduler.start();
2477                    } catch(Exception e) {
2478                        throw JMSExceptionSupport.create(e);
2479                    }
2480                }
2481            }
2482        }
2483        return result;
2484    }
2485
2486    protected ThreadPoolExecutor getExecutor() {
2487        return this.executor;
2488    }
2489
2490    /**
2491     * @return the checkForDuplicates
2492     */
2493    public boolean isCheckForDuplicates() {
2494        return this.checkForDuplicates;
2495    }
2496
2497    /**
2498     * @param checkForDuplicates the checkForDuplicates to set
2499     */
2500    public void setCheckForDuplicates(boolean checkForDuplicates) {
2501        this.checkForDuplicates = checkForDuplicates;
2502    }
2503
2504
2505    public boolean isTransactedIndividualAck() {
2506        return transactedIndividualAck;
2507    }
2508
2509    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2510        this.transactedIndividualAck = transactedIndividualAck;
2511    }
2512
2513    public boolean isNonBlockingRedelivery() {
2514        return nonBlockingRedelivery;
2515    }
2516
2517    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2518        this.nonBlockingRedelivery = nonBlockingRedelivery;
2519    }
2520
2521    /**
2522     * Removes any TempDestinations that this connection has cached, ignoring
2523     * any exceptions generated because the destination is in use as they should
2524     * not be removed.
2525     */
2526    public void cleanUpTempDestinations() {
2527
2528        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2529            return;
2530        }
2531
2532        Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2533            = this.activeTempDestinations.entrySet().iterator();
2534        while(entries.hasNext()) {
2535            ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2536            try {
2537                // Only delete this temp destination if it was created from this connection. The connection used
2538                // for the advisory consumer may also have a reference to this temp destination.
2539                ActiveMQTempDestination dest = entry.getValue();
2540                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2541                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2542                    this.deleteTempDestination(entry.getValue());
2543                }
2544            } catch (Exception ex) {
2545                // the temp dest is in use so it can not be deleted.
2546                // it is ok to leave it to connection tear down phase
2547            }
2548        }
2549    }
2550}