001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.Properties;
024import java.util.concurrent.Executor;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.ThreadFactory;
027
028import javax.jms.Connection;
029import javax.jms.ConnectionFactory;
030import javax.jms.ExceptionListener;
031import javax.jms.JMSException;
032import javax.jms.QueueConnection;
033import javax.jms.QueueConnectionFactory;
034import javax.jms.TopicConnection;
035import javax.jms.TopicConnectionFactory;
036import javax.naming.Context;
037
038import org.apache.activemq.blob.BlobTransferPolicy;
039import org.apache.activemq.jndi.JNDIBaseStorable;
040import org.apache.activemq.management.JMSStatsImpl;
041import org.apache.activemq.management.StatsCapable;
042import org.apache.activemq.management.StatsImpl;
043import org.apache.activemq.transport.Transport;
044import org.apache.activemq.transport.TransportFactory;
045import org.apache.activemq.transport.TransportListener;
046import org.apache.activemq.util.IdGenerator;
047import org.apache.activemq.util.IntrospectionSupport;
048import org.apache.activemq.util.JMSExceptionSupport;
049import org.apache.activemq.util.URISupport;
050import org.apache.activemq.util.URISupport.CompositeData;
051
052/**
053 * A ConnectionFactory is an an Administered object, and is used for creating
054 * Connections. <p/> This class also implements QueueConnectionFactory and
055 * TopicConnectionFactory. You can use this connection to create both
056 * QueueConnections and TopicConnections.
057 *
058 *
059 * @see javax.jms.ConnectionFactory
060 */
061public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062
063    public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
064    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
065    public static final String DEFAULT_USER = null;
066    public static final String DEFAULT_PASSWORD = null;
067    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
068
069    protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
070        public Thread newThread(Runnable run) {
071            Thread thread = new Thread(run);
072            thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
073            return thread;
074        }
075    });
076
077    protected URI brokerURL;
078    protected String userName;
079    protected String password;
080    protected String clientID;
081    protected boolean dispatchAsync=true;
082    protected boolean alwaysSessionAsync=true;
083
084    JMSStatsImpl factoryStats = new JMSStatsImpl();
085
086    private IdGenerator clientIdGenerator;
087    private String clientIDPrefix;
088    private IdGenerator connectionIdGenerator;
089    private String connectionIDPrefix;
090
091    // client policies
092    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
093    private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
094    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
095    private MessageTransformer transformer;
096
097    private boolean disableTimeStampsByDefault;
098    private boolean optimizedMessageDispatch = true;
099    private long optimizeAcknowledgeTimeOut = 300;
100    private boolean copyMessageOnSend = true;
101    private boolean useCompression;
102    private boolean objectMessageSerializationDefered;
103    private boolean useAsyncSend;
104    private boolean optimizeAcknowledge;
105    private int closeTimeout = 15000;
106    private boolean useRetroactiveConsumer;
107    private boolean exclusiveConsumer;
108    private boolean nestedMapAndListEnabled = true;
109    private boolean alwaysSyncSend;
110    private boolean watchTopicAdvisories = true;
111    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
112    private long warnAboutUnstartedConnectionTimeout = 500L;
113    private int sendTimeout = 0;
114    private boolean sendAcksAsync=true;
115    private TransportListener transportListener;
116    private ExceptionListener exceptionListener;
117    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
118    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
119    private boolean useDedicatedTaskRunner;
120    private long consumerFailoverRedeliveryWaitPeriod = 0;
121    private boolean checkForDuplicates = true;
122    private ClientInternalExceptionListener clientInternalExceptionListener;
123    private boolean messagePrioritySupported = true;
124    private boolean transactedIndividualAck = false;
125    private boolean nonBlockingRedelivery = false;
126
127    // /////////////////////////////////////////////
128    //
129    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
130    //
131    // /////////////////////////////////////////////
132
133    public ActiveMQConnectionFactory() {
134        this(DEFAULT_BROKER_URL);
135    }
136
137    public ActiveMQConnectionFactory(String brokerURL) {
138        this(createURI(brokerURL));
139    }
140
141    public ActiveMQConnectionFactory(URI brokerURL) {
142        setBrokerURL(brokerURL.toString());
143    }
144
145    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
146        setUserName(userName);
147        setPassword(password);
148        setBrokerURL(brokerURL.toString());
149    }
150
151    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
152        setUserName(userName);
153        setPassword(password);
154        setBrokerURL(brokerURL);
155    }
156
157    /**
158     * Returns a copy of the given connection factory
159     */
160    public ActiveMQConnectionFactory copy() {
161        try {
162            return (ActiveMQConnectionFactory)super.clone();
163        } catch (CloneNotSupportedException e) {
164            throw new RuntimeException("This should never happen: " + e, e);
165        }
166    }
167
168    /**
169     * @param brokerURL
170     * @return
171     * @throws URISyntaxException
172     */
173    private static URI createURI(String brokerURL) {
174        try {
175            return new URI(brokerURL);
176        } catch (URISyntaxException e) {
177            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
178        }
179    }
180
181    /**
182     * @return Returns the Connection.
183     */
184    public Connection createConnection() throws JMSException {
185        return createActiveMQConnection();
186    }
187
188    /**
189     * @return Returns the Connection.
190     */
191    public Connection createConnection(String userName, String password) throws JMSException {
192        return createActiveMQConnection(userName, password);
193    }
194
195    /**
196     * @return Returns the QueueConnection.
197     * @throws JMSException
198     */
199    public QueueConnection createQueueConnection() throws JMSException {
200        return createActiveMQConnection();
201    }
202
203    /**
204     * @return Returns the QueueConnection.
205     */
206    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
207        return createActiveMQConnection(userName, password);
208    }
209
210    /**
211     * @return Returns the TopicConnection.
212     * @throws JMSException
213     */
214    public TopicConnection createTopicConnection() throws JMSException {
215        return createActiveMQConnection();
216    }
217
218    /**
219     * @return Returns the TopicConnection.
220     */
221    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
222        return createActiveMQConnection(userName, password);
223    }
224
225    /**
226     * @returns the StatsImpl associated with this ConnectionFactory.
227     */
228    public StatsImpl getStats() {
229        return this.factoryStats;
230    }
231
232    // /////////////////////////////////////////////
233    //
234    // Implementation methods.
235    //
236    // /////////////////////////////////////////////
237
238    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
239        return createActiveMQConnection(userName, password);
240    }
241
242    /**
243     * Creates a Transport based on this object's connection settings. Separated
244     * from createActiveMQConnection to allow for subclasses to override.
245     *
246     * @return The newly created Transport.
247     * @throws JMSException If unable to create trasnport.
248     * @author sepandm@gmail.com
249     */
250    protected Transport createTransport() throws JMSException {
251        try {
252            return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
253        } catch (Exception e) {
254            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
255        }
256    }
257
258    /**
259     * @return Returns the Connection.
260     */
261    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
262        if (brokerURL == null) {
263            throw new ConfigurationException("brokerURL not set.");
264        }
265        ActiveMQConnection connection = null;
266        try {
267            Transport transport = createTransport();
268            connection = createActiveMQConnection(transport, factoryStats);
269
270            connection.setUserName(userName);
271            connection.setPassword(password);
272
273            configureConnection(connection);
274
275            transport.start();
276
277            if (clientID != null) {
278                connection.setDefaultClientID(clientID);
279            }
280
281            return connection;
282        } catch (JMSException e) {
283            // Clean up!
284            try {
285                connection.close();
286            } catch (Throwable ignore) {
287            }
288            throw e;
289        } catch (Exception e) {
290            // Clean up!
291            try {
292                connection.close();
293            } catch (Throwable ignore) {
294            }
295            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
296        }
297    }
298
299    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
300        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
301                getConnectionIdGenerator(), stats);
302        return connection;
303    }
304
305    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
306        connection.setPrefetchPolicy(getPrefetchPolicy());
307        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
308        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
309        connection.setCopyMessageOnSend(isCopyMessageOnSend());
310        connection.setUseCompression(isUseCompression());
311        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
312        connection.setDispatchAsync(isDispatchAsync());
313        connection.setUseAsyncSend(isUseAsyncSend());
314        connection.setAlwaysSyncSend(isAlwaysSyncSend());
315        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
316        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
317        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
318        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
319        connection.setExclusiveConsumer(isExclusiveConsumer());
320        connection.setRedeliveryPolicy(getRedeliveryPolicy());
321        connection.setTransformer(getTransformer());
322        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
323        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
324        connection.setProducerWindowSize(getProducerWindowSize());
325        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
326        connection.setSendTimeout(getSendTimeout());
327        connection.setCloseTimeout(getCloseTimeout());
328        connection.setSendAcksAsync(isSendAcksAsync());
329        connection.setAuditDepth(getAuditDepth());
330        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
331        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
332        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
333        connection.setCheckForDuplicates(isCheckForDuplicates());
334        connection.setMessagePrioritySupported(isMessagePrioritySupported());
335        connection.setTransactedIndividualAck(isTransactedIndividualAck());
336        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
337        if (transportListener != null) {
338            connection.addTransportListener(transportListener);
339        }
340        if (exceptionListener != null) {
341            connection.setExceptionListener(exceptionListener);
342        }
343        if (clientInternalExceptionListener != null) {
344            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
345        }
346    }
347
348    // /////////////////////////////////////////////
349    //
350    // Property Accessors
351    //
352    // /////////////////////////////////////////////
353
354    public String getBrokerURL() {
355        return brokerURL == null ? null : brokerURL.toString();
356    }
357
358    /**
359     * Sets the <a
360     * href="http://activemq.apache.org/configuring-transports.html">connection
361     * URL</a> used to connect to the ActiveMQ broker.
362     */
363    public void setBrokerURL(String brokerURL) {
364        this.brokerURL = createURI(brokerURL);
365
366        // Use all the properties prefixed with 'jms.' to set the connection
367        // factory
368        // options.
369        if (this.brokerURL.getQuery() != null) {
370            // It might be a standard URI or...
371            try {
372
373                Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
374                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
375                if (buildFromMap(jmsOptionsMap)) {
376                    if (!jmsOptionsMap.isEmpty()) {
377                        String msg = "There are " + jmsOptionsMap.size()
378                            + " jms options that couldn't be set on the ConnectionFactory."
379                            + " Check the options are spelled correctly."
380                            + " Unknown parameters=[" + jmsOptionsMap + "]."
381                            + " This connection factory cannot be started.";
382                        throw new IllegalArgumentException(msg);
383                    }
384
385                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
386                }
387
388            } catch (URISyntaxException e) {
389            }
390
391        } else {
392
393            // It might be a composite URI.
394            try {
395                CompositeData data = URISupport.parseComposite(this.brokerURL);
396                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
397                if (buildFromMap(jmsOptionsMap)) {
398                    if (!jmsOptionsMap.isEmpty()) {
399                        String msg = "There are " + jmsOptionsMap.size()
400                            + " jms options that couldn't be set on the ConnectionFactory."
401                            + " Check the options are spelled correctly."
402                            + " Unknown parameters=[" + jmsOptionsMap + "]."
403                            + " This connection factory cannot be started.";
404                        throw new IllegalArgumentException(msg);
405                    }
406
407                    this.brokerURL = data.toURI();
408                }
409            } catch (URISyntaxException e) {
410            }
411        }
412    }
413
414    public String getClientID() {
415        return clientID;
416    }
417
418    /**
419     * Sets the JMS clientID to use for the created connection. Note that this
420     * can only be used by one connection at once so generally its a better idea
421     * to set the clientID on a Connection
422     */
423    public void setClientID(String clientID) {
424        this.clientID = clientID;
425    }
426
427    public boolean isCopyMessageOnSend() {
428        return copyMessageOnSend;
429    }
430
431    /**
432     * Should a JMS message be copied to a new JMS Message object as part of the
433     * send() method in JMS. This is enabled by default to be compliant with the
434     * JMS specification. You can disable it if you do not mutate JMS messages
435     * after they are sent for a performance boost
436     */
437    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
438        this.copyMessageOnSend = copyMessageOnSend;
439    }
440
441    public boolean isDisableTimeStampsByDefault() {
442        return disableTimeStampsByDefault;
443    }
444
445    /**
446     * Sets whether or not timestamps on messages should be disabled or not. If
447     * you disable them it adds a small performance boost.
448     */
449    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
450        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
451    }
452
453    public boolean isOptimizedMessageDispatch() {
454        return optimizedMessageDispatch;
455    }
456
457    /**
458     * If this flag is set then an larger prefetch limit is used - only
459     * applicable for durable topic subscribers.
460     */
461    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
462        this.optimizedMessageDispatch = optimizedMessageDispatch;
463    }
464
465    public String getPassword() {
466        return password;
467    }
468
469    /**
470     * Sets the JMS password used for connections created from this factory
471     */
472    public void setPassword(String password) {
473        this.password = password;
474    }
475
476    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
477        return prefetchPolicy;
478    }
479
480    /**
481     * Sets the <a
482     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
483     * policy</a> for consumers created by this connection.
484     */
485    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
486        this.prefetchPolicy = prefetchPolicy;
487    }
488
489    public boolean isUseAsyncSend() {
490        return useAsyncSend;
491    }
492
493    public BlobTransferPolicy getBlobTransferPolicy() {
494        return blobTransferPolicy;
495    }
496
497    /**
498     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
499     * OBjects) are transferred from producers to brokers to consumers
500     */
501    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
502        this.blobTransferPolicy = blobTransferPolicy;
503    }
504
505    /**
506     * Forces the use of <a
507     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
508     * adds a massive performance boost; but means that the send() method will
509     * return immediately whether the message has been sent or not which could
510     * lead to message loss.
511     */
512    public void setUseAsyncSend(boolean useAsyncSend) {
513        this.useAsyncSend = useAsyncSend;
514    }
515
516    public synchronized boolean isWatchTopicAdvisories() {
517        return watchTopicAdvisories;
518    }
519
520    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
521        this.watchTopicAdvisories = watchTopicAdvisories;
522    }
523
524    /**
525     * @return true if always sync send messages
526     */
527    public boolean isAlwaysSyncSend() {
528        return this.alwaysSyncSend;
529    }
530
531    /**
532     * Set true if always require messages to be sync sent
533     *
534     * @param alwaysSyncSend
535     */
536    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
537        this.alwaysSyncSend = alwaysSyncSend;
538    }
539
540    public String getUserName() {
541        return userName;
542    }
543
544    /**
545     * Sets the JMS userName used by connections created by this factory
546     */
547    public void setUserName(String userName) {
548        this.userName = userName;
549    }
550
551    public boolean isUseRetroactiveConsumer() {
552        return useRetroactiveConsumer;
553    }
554
555    /**
556     * Sets whether or not retroactive consumers are enabled. Retroactive
557     * consumers allow non-durable topic subscribers to receive old messages
558     * that were published before the non-durable subscriber started.
559     */
560    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
561        this.useRetroactiveConsumer = useRetroactiveConsumer;
562    }
563
564    public boolean isExclusiveConsumer() {
565        return exclusiveConsumer;
566    }
567
568    /**
569     * Enables or disables whether or not queue consumers should be exclusive or
570     * not for example to preserve ordering when not using <a
571     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
572     *
573     * @param exclusiveConsumer
574     */
575    public void setExclusiveConsumer(boolean exclusiveConsumer) {
576        this.exclusiveConsumer = exclusiveConsumer;
577    }
578
579    public RedeliveryPolicy getRedeliveryPolicy() {
580        return redeliveryPolicy;
581    }
582
583    /**
584     * Sets the global redelivery policy to be used when a message is delivered
585     * but the session is rolled back
586     */
587    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
588        this.redeliveryPolicy = redeliveryPolicy;
589    }
590
591    public MessageTransformer getTransformer() {
592        return transformer;
593    }
594
595    /**
596     * @return the sendTimeout
597     */
598    public int getSendTimeout() {
599        return sendTimeout;
600    }
601
602    /**
603     * @param sendTimeout the sendTimeout to set
604     */
605    public void setSendTimeout(int sendTimeout) {
606        this.sendTimeout = sendTimeout;
607    }
608
609    /**
610     * @return the sendAcksAsync
611     */
612    public boolean isSendAcksAsync() {
613        return sendAcksAsync;
614    }
615
616    /**
617     * @param sendAcksAsync the sendAcksAsync to set
618     */
619    public void setSendAcksAsync(boolean sendAcksAsync) {
620        this.sendAcksAsync = sendAcksAsync;
621    }
622
623    /**
624     * @return the messagePrioritySupported
625     */
626    public boolean isMessagePrioritySupported() {
627        return this.messagePrioritySupported;
628    }
629
630    /**
631     * @param messagePrioritySupported the messagePrioritySupported to set
632     */
633    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
634        this.messagePrioritySupported = messagePrioritySupported;
635    }
636
637
638    /**
639     * Sets the transformer used to transform messages before they are sent on
640     * to the JMS bus or when they are received from the bus but before they are
641     * delivered to the JMS client
642     */
643    public void setTransformer(MessageTransformer transformer) {
644        this.transformer = transformer;
645    }
646
647    @SuppressWarnings({ "unchecked", "rawtypes" })
648    @Override
649    public void buildFromProperties(Properties properties) {
650
651        if (properties == null) {
652            properties = new Properties();
653        }
654
655        String temp = properties.getProperty(Context.PROVIDER_URL);
656        if (temp == null || temp.length() == 0) {
657            temp = properties.getProperty("brokerURL");
658        }
659        if (temp != null && temp.length() > 0) {
660            setBrokerURL(temp);
661        }
662
663        Map<String, Object> p = new HashMap(properties);
664        buildFromMap(p);
665    }
666
667    public boolean buildFromMap(Map<String, Object> properties) {
668        boolean rc = false;
669
670        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
671        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
672            setPrefetchPolicy(p);
673            rc = true;
674        }
675
676        RedeliveryPolicy rp = new RedeliveryPolicy();
677        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
678            setRedeliveryPolicy(rp);
679            rc = true;
680        }
681
682        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
683        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
684            setBlobTransferPolicy(blobTransferPolicy);
685            rc = true;
686        }
687
688        rc |= IntrospectionSupport.setProperties(this, properties);
689
690        return rc;
691    }
692
693    @Override
694    public void populateProperties(Properties props) {
695        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
696
697        if (getBrokerURL() != null) {
698            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
699            props.setProperty("brokerURL", getBrokerURL());
700        }
701
702        if (getClientID() != null) {
703            props.setProperty("clientID", getClientID());
704        }
705
706        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
707        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
708        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
709
710        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
711        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
712        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
713        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
714
715        if (getPassword() != null) {
716            props.setProperty("password", getPassword());
717        }
718
719        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
720        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
721        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
722        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
723
724        if (getUserName() != null) {
725            props.setProperty("userName", getUserName());
726        }
727
728        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
729        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
730        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
731        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
732        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
733        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
734        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
735        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
736        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
737        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
738        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
739        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
740        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
741        props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
742    }
743
744    public boolean isUseCompression() {
745        return useCompression;
746    }
747
748    /**
749     * Enables the use of compression of the message bodies
750     */
751    public void setUseCompression(boolean useCompression) {
752        this.useCompression = useCompression;
753    }
754
755    public boolean isObjectMessageSerializationDefered() {
756        return objectMessageSerializationDefered;
757    }
758
759    /**
760     * When an object is set on an ObjectMessage, the JMS spec requires the
761     * object to be serialized by that set method. Enabling this flag causes the
762     * object to not get serialized. The object may subsequently get serialized
763     * if the message needs to be sent over a socket or stored to disk.
764     */
765    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
766        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
767    }
768
769    public boolean isDispatchAsync() {
770        return dispatchAsync;
771    }
772
773    /**
774     * Enables or disables the default setting of whether or not consumers have
775     * their messages <a
776     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
777     * synchronously or asynchronously by the broker</a>. For non-durable
778     * topics for example we typically dispatch synchronously by default to
779     * minimize context switches which boost performance. However sometimes its
780     * better to go slower to ensure that a single blocked consumer socket does
781     * not block delivery to other consumers.
782     *
783     * @param asyncDispatch If true then consumers created on this connection
784     *                will default to having their messages dispatched
785     *                asynchronously. The default value is true.
786     */
787    public void setDispatchAsync(boolean asyncDispatch) {
788        this.dispatchAsync = asyncDispatch;
789    }
790
791    /**
792     * @return Returns the closeTimeout.
793     */
794    public int getCloseTimeout() {
795        return closeTimeout;
796    }
797
798    /**
799     * Sets the timeout before a close is considered complete. Normally a
800     * close() on a connection waits for confirmation from the broker; this
801     * allows that operation to timeout to save the client hanging if there is
802     * no broker
803     */
804    public void setCloseTimeout(int closeTimeout) {
805        this.closeTimeout = closeTimeout;
806    }
807
808    /**
809     * @return Returns the alwaysSessionAsync.
810     */
811    public boolean isAlwaysSessionAsync() {
812        return alwaysSessionAsync;
813    }
814
815    /**
816     * If this flag is set then a separate thread is not used for dispatching
817     * messages for each Session in the Connection. However, a separate thread
818     * is always used if there is more than one session, or the session isn't in
819     * auto acknowledge or duplicates ok mode
820     */
821    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
822        this.alwaysSessionAsync = alwaysSessionAsync;
823    }
824
825    /**
826     * @return Returns the optimizeAcknowledge.
827     */
828    public boolean isOptimizeAcknowledge() {
829        return optimizeAcknowledge;
830    }
831
832    /**
833     * @param optimizeAcknowledge The optimizeAcknowledge to set.
834     */
835    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
836        this.optimizeAcknowledge = optimizeAcknowledge;
837    }
838
839    /**
840     * The max time in milliseconds between optimized ack batches
841     * @param optimizeAcknowledgeTimeOut
842     */
843    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
844        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
845    }
846
847    public long getOptimizeAcknowledgeTimeOut() {
848        return optimizeAcknowledgeTimeOut;
849    }
850
851    public boolean isNestedMapAndListEnabled() {
852        return nestedMapAndListEnabled;
853    }
854
855    /**
856     * Enables/disables whether or not Message properties and MapMessage entries
857     * support <a
858     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
859     * Structures</a> of Map and List objects
860     */
861    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
862        this.nestedMapAndListEnabled = structuredMapsEnabled;
863    }
864
865    public String getClientIDPrefix() {
866        return clientIDPrefix;
867    }
868
869    /**
870     * Sets the prefix used by autogenerated JMS Client ID values which are used
871     * if the JMS client does not explicitly specify on.
872     *
873     * @param clientIDPrefix
874     */
875    public void setClientIDPrefix(String clientIDPrefix) {
876        this.clientIDPrefix = clientIDPrefix;
877    }
878
879    protected synchronized IdGenerator getClientIdGenerator() {
880        if (clientIdGenerator == null) {
881            if (clientIDPrefix != null) {
882                clientIdGenerator = new IdGenerator(clientIDPrefix);
883            } else {
884                clientIdGenerator = new IdGenerator();
885            }
886        }
887        return clientIdGenerator;
888    }
889
890    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
891        this.clientIdGenerator = clientIdGenerator;
892    }
893
894    /**
895     * Sets the prefix used by connection id generator
896     * @param connectionIDPrefix
897     */
898    public void setConnectionIDPrefix(String connectionIDPrefix) {
899        this.connectionIDPrefix = connectionIDPrefix;
900    }
901
902    protected synchronized IdGenerator getConnectionIdGenerator() {
903        if (connectionIdGenerator == null) {
904            if (connectionIDPrefix != null) {
905                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
906            } else {
907                connectionIdGenerator = new IdGenerator();
908            }
909        }
910        return connectionIdGenerator;
911    }
912
913    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
914        this.connectionIdGenerator = connectionIdGenerator;
915    }
916
917    /**
918     * @return the statsEnabled
919     */
920    public boolean isStatsEnabled() {
921        return this.factoryStats.isEnabled();
922    }
923
924    /**
925     * @param statsEnabled the statsEnabled to set
926     */
927    public void setStatsEnabled(boolean statsEnabled) {
928        this.factoryStats.setEnabled(statsEnabled);
929    }
930
931    public synchronized int getProducerWindowSize() {
932        return producerWindowSize;
933    }
934
935    public synchronized void setProducerWindowSize(int producerWindowSize) {
936        this.producerWindowSize = producerWindowSize;
937    }
938
939    public long getWarnAboutUnstartedConnectionTimeout() {
940        return warnAboutUnstartedConnectionTimeout;
941    }
942
943    /**
944     * Enables the timeout from a connection creation to when a warning is
945     * generated if the connection is not properly started via
946     * {@link Connection#start()} and a message is received by a consumer. It is
947     * a very common gotcha to forget to <a
948     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
949     * the connection</a> so this option makes the default case to create a
950     * warning if the user forgets. To disable the warning just set the value to <
951     * 0 (say -1).
952     */
953    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
954        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
955    }
956
957    public TransportListener getTransportListener() {
958        return transportListener;
959    }
960
961    /**
962     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
963     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
964     * a transport listener.
965     *
966     * @param transportListener sets the listener to be registered on all connections
967     * created by this factory
968     */
969    public void setTransportListener(TransportListener transportListener) {
970        this.transportListener = transportListener;
971    }
972
973
974    public ExceptionListener getExceptionListener() {
975        return exceptionListener;
976    }
977
978    /**
979     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
980     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
981     * an exception listener.
982     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
983     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
984     * @param exceptionListener sets the exception listener to be registered on all connections
985     * created by this factory
986     */
987    public void setExceptionListener(ExceptionListener exceptionListener) {
988        this.exceptionListener = exceptionListener;
989    }
990
991    public int getAuditDepth() {
992        return auditDepth;
993    }
994
995    public void setAuditDepth(int auditDepth) {
996        this.auditDepth = auditDepth;
997    }
998
999    public int getAuditMaximumProducerNumber() {
1000        return auditMaximumProducerNumber;
1001    }
1002
1003    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1004        this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1005    }
1006
1007    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009    }
1010
1011    public boolean isUseDedicatedTaskRunner() {
1012        return useDedicatedTaskRunner;
1013    }
1014
1015    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1016        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1017    }
1018
1019    public long getConsumerFailoverRedeliveryWaitPeriod() {
1020        return consumerFailoverRedeliveryWaitPeriod;
1021    }
1022
1023    public ClientInternalExceptionListener getClientInternalExceptionListener() {
1024        return clientInternalExceptionListener;
1025    }
1026
1027    /**
1028     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1029     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1030     * an exception listener.
1031     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1032     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1033     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1034     * created by this factory
1035     */
1036    public void setClientInternalExceptionListener(
1037            ClientInternalExceptionListener clientInternalExceptionListener) {
1038        this.clientInternalExceptionListener = clientInternalExceptionListener;
1039    }
1040
1041    /**
1042     * @return the checkForDuplicates
1043     */
1044    public boolean isCheckForDuplicates() {
1045        return this.checkForDuplicates;
1046    }
1047
1048    /**
1049     * @param checkForDuplicates the checkForDuplicates to set
1050     */
1051    public void setCheckForDuplicates(boolean checkForDuplicates) {
1052        this.checkForDuplicates = checkForDuplicates;
1053    }
1054
1055    public boolean isTransactedIndividualAck() {
1056         return transactedIndividualAck;
1057     }
1058
1059     /**
1060      * when true, submit individual transacted acks immediately rather than with transaction completion.
1061      * This allows the acks to represent delivery status which can be persisted on rollback
1062      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1063      */
1064     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1065         this.transactedIndividualAck = transactedIndividualAck;
1066     }
1067
1068
1069     public boolean isNonBlockingRedelivery() {
1070         return nonBlockingRedelivery;
1071     }
1072
1073     /**
1074      * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1075      * from a rolled back transaction.  This implies that message order will not be preserved and
1076      * also will result in the TransactedIndividualAck option to be enabled.
1077      */
1078     public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1079         this.nonBlockingRedelivery = nonBlockingRedelivery;
1080     }
1081
1082}