001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.concurrent.atomic.AtomicBoolean;
020import javax.jms.Connection;
021import javax.jms.Destination;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageConsumer;
025import javax.jms.MessageListener;
026import javax.jms.MessageProducer;
027import org.apache.activemq.Service;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * A Destination bridge is used to bridge between to different JMS systems
033 */
034public abstract class DestinationBridge implements Service, MessageListener {
035
036    private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
037
038    protected MessageConsumer consumer;
039    protected AtomicBoolean started = new AtomicBoolean(false);
040    protected JmsMesageConvertor jmsMessageConvertor;
041    protected boolean doHandleReplyTo = true;
042    protected JmsConnector jmsConnector;
043
044    /**
045     * @return Returns the consumer.
046     */
047    public MessageConsumer getConsumer() {
048        return consumer;
049    }
050
051    /**
052     * @param consumer The consumer to set.
053     */
054    public void setConsumer(MessageConsumer consumer) {
055        this.consumer = consumer;
056    }
057
058    /**
059     * @param connector
060     */
061    public void setJmsConnector(JmsConnector connector) {
062        this.jmsConnector = connector;
063    }
064
065    /**
066     * @return Returns the inboundMessageConvertor.
067     */
068    public JmsMesageConvertor getJmsMessageConvertor() {
069        return jmsMessageConvertor;
070    }
071
072    /**
073     * @param jmsMessageConvertor
074     */
075    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
076        this.jmsMessageConvertor = jmsMessageConvertor;
077    }
078
079    protected Destination processReplyToDestination(Destination destination) {
080        return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
081    }
082
083    public void start() throws Exception {
084        if (started.compareAndSet(false, true)) {
085            createConsumer();
086            createProducer();
087        }
088    }
089
090    public void stop() throws Exception {
091        started.set(false);
092    }
093
094    public void onMessage(Message message) {
095
096        int attempt = 0;
097        final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
098
099        while (started.get() && message != null && ++attempt <= maxRetries) {
100
101            try {
102
103                if (attempt > 0) {
104                    try {
105                        Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
106                    } catch(InterruptedException e) {
107                        break;
108                    }
109                }
110
111                Message converted;
112                if (jmsMessageConvertor != null) {
113                    if (doHandleReplyTo) {
114                        Destination replyTo = message.getJMSReplyTo();
115                        if (replyTo != null) {
116                            converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
117                        } else {
118                            converted = jmsMessageConvertor.convert(message);
119                        }
120                    } else {
121                        message.setJMSReplyTo(null);
122                        converted = jmsMessageConvertor.convert(message);
123                    }
124                } else {
125                    // The Producer side is not up or not yet configured, retry.
126                    continue;
127                }
128
129                try {
130                    sendMessage(converted);
131                } catch(Exception e) {
132                    jmsConnector.handleConnectionFailure(getConnectionForProducer());
133                    continue;
134                }
135
136                try {
137                    message.acknowledge();
138                } catch(Exception e) {
139                    jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
140                    continue;
141                }
142
143                // if we got here then it made it out and was ack'd
144                return;
145
146            } catch (Exception e) {
147                LOG.info("failed to forward message on attempt: " + attempt +
148                         " reason: " + e + " message: " + message, e);
149            }
150        }
151    }
152
153    /**
154     * @return Returns the doHandleReplyTo.
155     */
156    protected boolean isDoHandleReplyTo() {
157        return doHandleReplyTo;
158    }
159
160    /**
161     * @param doHandleReplyTo The doHandleReplyTo to set.
162     */
163    protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
164        this.doHandleReplyTo = doHandleReplyTo;
165    }
166
167    protected abstract MessageConsumer createConsumer() throws JMSException;
168
169    protected abstract MessageProducer createProducer() throws JMSException;
170
171    protected abstract void sendMessage(Message message) throws JMSException;
172
173    protected abstract Connection getConnnectionForConsumer();
174
175    protected abstract Connection getConnectionForProducer();
176
177}