001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.IOException; 020import java.io.Serializable; 021import java.io.StringReader; 022import java.io.StringWriter; 023import java.util.HashMap; 024import java.util.Map; 025 026import javax.jms.JMSException; 027 028import org.apache.activemq.advisory.AdvisorySupport; 029import org.apache.activemq.broker.BrokerContext; 030import org.apache.activemq.broker.BrokerContextAware; 031import org.apache.activemq.command.ActiveMQMapMessage; 032import org.apache.activemq.command.ActiveMQMessage; 033import org.apache.activemq.command.ActiveMQObjectMessage; 034import org.apache.activemq.command.DataStructure; 035import org.codehaus.jettison.mapped.Configuration; 036 037import com.thoughtworks.xstream.XStream; 038import com.thoughtworks.xstream.io.HierarchicalStreamReader; 039import com.thoughtworks.xstream.io.HierarchicalStreamWriter; 040import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; 041import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; 042import com.thoughtworks.xstream.io.xml.XppReader; 043import com.thoughtworks.xstream.io.xml.xppdom.XppFactory; 044 045/** 046 * Frame translator implementation that uses XStream to convert messages to and 047 * from XML and JSON 048 * 049 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a> 050 */ 051public class JmsFrameTranslator extends LegacyFrameTranslator implements 052 BrokerContextAware { 053 054 XStream xStream = null; 055 BrokerContext brokerContext; 056 057 public ActiveMQMessage convertFrame(ProtocolConverter converter, 058 StompFrame command) throws JMSException, ProtocolException { 059 Map<String, String> headers = command.getHeaders(); 060 ActiveMQMessage msg; 061 String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION); 062 if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { 063 msg = super.convertFrame(converter, command); 064 } else { 065 HierarchicalStreamReader in; 066 067 try { 068 String text = new String(command.getContent(), "UTF-8"); 069 switch (Stomp.Transformations.getValue(transformation)) { 070 case JMS_OBJECT_XML: 071 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); 072 msg = createObjectMessage(in); 073 break; 074 case JMS_OBJECT_JSON: 075 in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); 076 msg = createObjectMessage(in); 077 break; 078 case JMS_MAP_XML: 079 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); 080 msg = createMapMessage(in); 081 break; 082 case JMS_MAP_JSON: 083 in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); 084 msg = createMapMessage(in); 085 break; 086 default: 087 throw new Exception("Unknown transformation: " + transformation); 088 } 089 } catch (Throwable e) { 090 command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); 091 msg = super.convertFrame(converter, command); 092 } 093 } 094 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); 095 return msg; 096 } 097 098 public StompFrame convertMessage(ProtocolConverter converter, 099 ActiveMQMessage message) throws IOException, JMSException { 100 if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { 101 StompFrame command = new StompFrame(); 102 command.setAction(Stomp.Responses.MESSAGE); 103 Map<String, String> headers = new HashMap<String, String>(25); 104 command.setHeaders(headers); 105 106 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 107 converter, message, command, this); 108 109 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { 110 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString()); 111 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { 112 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString()); 113 } 114 115 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); 116 command.setContent(marshall(msg.getObject(), 117 headers.get(Stomp.Headers.TRANSFORMATION)) 118 .getBytes("UTF-8")); 119 return command; 120 121 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 122 StompFrame command = new StompFrame(); 123 command.setAction(Stomp.Responses.MESSAGE); 124 Map<String, String> headers = new HashMap<String, String>(25); 125 command.setHeaders(headers); 126 127 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 128 converter, message, command, this); 129 130 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { 131 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString()); 132 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { 133 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString()); 134 } 135 136 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 137 command.setContent(marshall((Serializable)msg.getContentMap(), 138 headers.get(Stomp.Headers.TRANSFORMATION)) 139 .getBytes("UTF-8")); 140 return command; 141 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && 142 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { 143 144 StompFrame command = new StompFrame(); 145 command.setAction(Stomp.Responses.MESSAGE); 146 Map<String, String> headers = new HashMap<String, String>(25); 147 command.setHeaders(headers); 148 149 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 150 converter, message, command, this); 151 152 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { 153 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString()); 154 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { 155 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); 156 } 157 158 String body = marshallAdvisory(message.getDataStructure(), 159 headers.get(Stomp.Headers.TRANSFORMATION)); 160 command.setContent(body.getBytes("UTF-8")); 161 return command; 162 } else { 163 return super.convertMessage(converter, message); 164 } 165 } 166 167 /** 168 * Marshalls the Object to a string using XML or JSON encoding 169 */ 170 protected String marshall(Serializable object, String transformation) 171 throws JMSException { 172 StringWriter buffer = new StringWriter(); 173 HierarchicalStreamWriter out; 174 if (transformation.toLowerCase().endsWith("json")) { 175 out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer); 176 } else { 177 out = new PrettyPrintWriter(buffer); 178 } 179 getXStream().marshal(object, out); 180 return buffer.toString(); 181 } 182 183 protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException { 184 ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); 185 Object obj = getXStream().unmarshal(in); 186 objMsg.setObject((Serializable) obj); 187 return objMsg; 188 } 189 190 @SuppressWarnings("unchecked") 191 protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { 192 ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); 193 Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in); 194 for (String key : map.keySet()) { 195 mapMsg.setObject(key, map.get(key)); 196 } 197 return mapMsg; 198 } 199 200 protected String marshallAdvisory(final DataStructure ds, String transformation) { 201 202 StringWriter buffer = new StringWriter(); 203 HierarchicalStreamWriter out; 204 if (transformation.toLowerCase().endsWith("json")) { 205 out = new JettisonMappedXmlDriver().createWriter(buffer); 206 } else { 207 out = new PrettyPrintWriter(buffer); 208 } 209 210 XStream xstream = getXStream(); 211 xstream.setMode(XStream.NO_REFERENCES); 212 xstream.aliasPackage("", "org.apache.activemq.command"); 213 xstream.marshal(ds, out); 214 return buffer.toString(); 215 } 216 217 // Properties 218 // ------------------------------------------------------------------------- 219 public XStream getXStream() { 220 if (xStream == null) { 221 xStream = createXStream(); 222 } 223 return xStream; 224 } 225 226 public void setXStream(XStream xStream) { 227 this.xStream = xStream; 228 } 229 230 // Implementation methods 231 // ------------------------------------------------------------------------- 232 @SuppressWarnings("unchecked") 233 protected XStream createXStream() { 234 XStream xstream = null; 235 if (brokerContext != null) { 236 Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class); 237 for (XStream bean : beans.values()) { 238 if (bean != null) { 239 xstream = bean; 240 break; 241 } 242 } 243 } 244 245 if (xstream == null) { 246 xstream = XStreamSupport.createXStream(); 247 xstream.ignoreUnknownElements(); 248 } 249 return xstream; 250 251 } 252 253 public void setBrokerContext(BrokerContext brokerContext) { 254 this.brokerContext = brokerContext; 255 } 256 257}