001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.net.MalformedURLException;
020import java.util.Enumeration;
021
022import javax.jms.BytesMessage;
023import javax.jms.Destination;
024import javax.jms.JMSException;
025import javax.jms.MapMessage;
026import javax.jms.Message;
027import javax.jms.MessageEOFException;
028import javax.jms.ObjectMessage;
029import javax.jms.Queue;
030import javax.jms.StreamMessage;
031import javax.jms.TemporaryQueue;
032import javax.jms.TemporaryTopic;
033import javax.jms.TextMessage;
034import javax.jms.Topic;
035
036import org.apache.activemq.blob.BlobDownloader;
037import org.apache.activemq.blob.BlobUploader;
038import org.apache.activemq.command.ActiveMQBlobMessage;
039import org.apache.activemq.command.ActiveMQBytesMessage;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQMapMessage;
042import org.apache.activemq.command.ActiveMQMessage;
043import org.apache.activemq.command.ActiveMQObjectMessage;
044import org.apache.activemq.command.ActiveMQQueue;
045import org.apache.activemq.command.ActiveMQStreamMessage;
046import org.apache.activemq.command.ActiveMQTempQueue;
047import org.apache.activemq.command.ActiveMQTempTopic;
048import org.apache.activemq.command.ActiveMQTextMessage;
049import org.apache.activemq.command.ActiveMQTopic;
050
051/**
052 * A helper class for converting normal JMS interfaces into ActiveMQ specific
053 * ones.
054 * 
055 * 
056 */
057public final class ActiveMQMessageTransformation {
058
059    private ActiveMQMessageTransformation() {    
060    }
061    
062    /**
063     * Creates a an available JMS message from another provider.
064     * 
065     * @param destination - Destination to be converted into ActiveMQ's
066     *                implementation.
067     * @return ActiveMQDestination - ActiveMQ's implementation of the
068     *         destination.
069     * @throws JMSException if an error occurs
070     */
071    public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
072        ActiveMQDestination activeMQDestination = null;
073
074        if (destination != null) {
075            if (destination instanceof ActiveMQDestination) {
076                return (ActiveMQDestination)destination;
077
078            } else {
079                if (destination instanceof TemporaryQueue) {
080                    activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
081                } else if (destination instanceof TemporaryTopic) {
082                    activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
083                } else if (destination instanceof Queue) {
084                    activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
085                } else if (destination instanceof Topic) {
086                    activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
087                }
088            }
089        }
090
091        return activeMQDestination;
092    }
093
094    /**
095     * Creates a fast shallow copy of the current ActiveMQMessage or creates a
096     * whole new message instance from an available JMS message from another
097     * provider.
098     * 
099     * @param message - Message to be converted into ActiveMQ's implementation.
100     * @param connection
101     * @return ActiveMQMessage - ActiveMQ's implementation object of the
102     *         message.
103     * @throws JMSException if an error occurs
104     */
105    public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
106        throws JMSException {
107        if (message instanceof ActiveMQMessage) {
108            return (ActiveMQMessage)message;
109
110        } else {
111            ActiveMQMessage activeMessage = null;
112
113            if (message instanceof BytesMessage) {
114                BytesMessage bytesMsg = (BytesMessage)message;
115                bytesMsg.reset();
116                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
117                msg.setConnection(connection);
118                try {
119                    for (;;) {
120                        // Reads a byte from the message stream until the stream
121                        // is empty
122                        msg.writeByte(bytesMsg.readByte());
123                    }
124                } catch (MessageEOFException e) {
125                    // if an end of message stream as expected
126                } catch (JMSException e) {
127                }
128
129                activeMessage = msg;
130            } else if (message instanceof MapMessage) {
131                MapMessage mapMsg = (MapMessage)message;
132                ActiveMQMapMessage msg = new ActiveMQMapMessage();
133                msg.setConnection(connection);
134                Enumeration iter = mapMsg.getMapNames();
135
136                while (iter.hasMoreElements()) {
137                    String name = iter.nextElement().toString();
138                    msg.setObject(name, mapMsg.getObject(name));
139                }
140
141                activeMessage = msg;
142            } else if (message instanceof ObjectMessage) {
143                ObjectMessage objMsg = (ObjectMessage)message;
144                ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
145                msg.setConnection(connection);
146                msg.setObject(objMsg.getObject());
147                msg.storeContent();
148                activeMessage = msg;
149            } else if (message instanceof StreamMessage) {
150                StreamMessage streamMessage = (StreamMessage)message;
151                streamMessage.reset();
152                ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
153                msg.setConnection(connection);
154                Object obj = null;
155
156                try {
157                    while ((obj = streamMessage.readObject()) != null) {
158                        msg.writeObject(obj);
159                    }
160                } catch (MessageEOFException e) {
161                    // if an end of message stream as expected
162                } catch (JMSException e) {
163                }
164
165                activeMessage = msg;
166            } else if (message instanceof TextMessage) {
167                TextMessage textMsg = (TextMessage)message;
168                ActiveMQTextMessage msg = new ActiveMQTextMessage();
169                msg.setConnection(connection);
170                msg.setText(textMsg.getText());
171                activeMessage = msg;
172            } else if (message instanceof BlobMessage) {
173                BlobMessage blobMessage = (BlobMessage)message;
174                ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
175                msg.setConnection(connection);
176                msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
177                try {
178                                        msg.setURL(blobMessage.getURL());
179                                } catch (MalformedURLException e) {
180                                        
181                                }
182                activeMessage = msg;
183            } else {
184                activeMessage = new ActiveMQMessage();
185                activeMessage.setConnection(connection);
186            }
187
188            copyProperties(message, activeMessage);
189
190            return activeMessage;
191        }
192    }
193
194    /**
195     * Copies the standard JMS and user defined properties from the givem
196     * message to the specified message
197     * 
198     * @param fromMessage the message to take the properties from
199     * @param toMessage the message to add the properties to
200     * @throws JMSException
201     */
202    public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
203        toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
204        toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
205        toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
206        toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
207        toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
208        toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
209        toMessage.setJMSType(fromMessage.getJMSType());
210        toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
211        toMessage.setJMSPriority(fromMessage.getJMSPriority());
212        toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
213
214        Enumeration propertyNames = fromMessage.getPropertyNames();
215
216        while (propertyNames.hasMoreElements()) {
217            String name = propertyNames.nextElement().toString();
218            Object obj = fromMessage.getObjectProperty(name);
219            toMessage.setObjectProperty(name, obj);
220        }
221    }
222}