001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.net.UnknownHostException; 024import java.nio.ByteBuffer; 025import java.util.HashMap; 026import java.util.Map; 027 028import javax.net.ServerSocketFactory; 029import javax.net.SocketFactory; 030import javax.net.ssl.SSLEngine; 031 032import org.apache.activemq.TransportLoggerSupport; 033import org.apache.activemq.openwire.OpenWireFormat; 034import org.apache.activemq.transport.InactivityMonitor; 035import org.apache.activemq.transport.Transport; 036import org.apache.activemq.transport.TransportFactory; 037import org.apache.activemq.transport.TransportServer; 038import org.apache.activemq.transport.WireFormatNegotiator; 039import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; 040import org.apache.activemq.util.IOExceptionSupport; 041import org.apache.activemq.util.IntrospectionSupport; 042import org.apache.activemq.util.URISupport; 043import org.apache.activemq.wireformat.WireFormat; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047public class TcpTransportFactory extends TransportFactory { 048 049 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportFactory.class); 050 051 @Override 052 public TransportServer doBind(final URI location) throws IOException { 053 try { 054 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 055 056 ServerSocketFactory serverSocketFactory = createServerSocketFactory(); 057 TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); 058 server.setWireFormatFactory(createWireFormatFactory(options)); 059 IntrospectionSupport.setProperties(server, options); 060 Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport."); 061 server.setTransportOption(transportOptions); 062 server.bind(); 063 064 return server; 065 } catch (URISyntaxException e) { 066 throw IOExceptionSupport.create(e); 067 } 068 } 069 070 /** 071 * Allows subclasses of TcpTransportFactory to create custom instances of 072 * TcpTransportServer. 073 * 074 * @param location 075 * @param serverSocketFactory 076 * @return a new TcpTransportServer instance. 077 * @throws IOException 078 * @throws URISyntaxException 079 */ 080 protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { 081 return new TcpTransportServer(this, location, serverSocketFactory); 082 } 083 084 @Override 085 @SuppressWarnings("rawtypes") 086 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 087 088 TcpTransport tcpTransport = transport.narrow(TcpTransport.class); 089 IntrospectionSupport.setProperties(tcpTransport, options); 090 091 Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket."); 092 tcpTransport.setSocketOptions(socketOptions); 093 094 if (tcpTransport.isTrace()) { 095 try { 096 transport = TransportLoggerSupport.createTransportLogger(transport, tcpTransport.getLogWriterName(), tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort()); 097 } catch (Throwable e) { 098 LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e); 099 } 100 } 101 102 boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true")); 103 if (useInactivityMonitor && isUseInactivityMonitor(transport)) { 104 transport = createInactivityMonitor(transport, format); 105 IntrospectionSupport.setProperties(transport, options); 106 } 107 108 // Only need the WireFormatNegotiator if using openwire 109 if (format instanceof OpenWireFormat) { 110 transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); 111 } 112 113 return super.compositeConfigure(transport, format, options); 114 } 115 116 117 /** 118 * @return true if the inactivity monitor should be used on the transport 119 */ 120 protected boolean isUseInactivityMonitor(Transport transport) { 121 return true; 122 } 123 124 @Override 125 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { 126 URI localLocation = null; 127 String path = location.getPath(); 128 // see if the path is a local URI location 129 if (path != null && path.length() > 0) { 130 int localPortIndex = path.indexOf(':'); 131 try { 132 Integer.parseInt(path.substring(localPortIndex + 1, path.length())); 133 String localString = location.getScheme() + ":/" + path; 134 localLocation = new URI(localString); 135 } catch (Exception e) { 136 LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage()); 137 if(LOG.isDebugEnabled()) { 138 LOG.debug("Failure detail", e); 139 } 140 } 141 } 142 SocketFactory socketFactory = createSocketFactory(); 143 return createTcpTransport(wf, socketFactory, location, localLocation); 144 } 145 146 public TcpTransport createTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException { 147 throw new IOException("createTransport() method not implemented!"); 148 } 149 150 public TcpTransport createTransport(WireFormat wireFormat, Socket socket, 151 SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException { 152 throw new IOException("createTransport() method not implemented!"); 153 } 154 155 /** 156 * Allows subclasses of TcpTransportFactory to provide a create custom 157 * TcpTransport instances. 158 * 159 * @param wf 160 * @param socketFactory 161 * @param location 162 * @param localLocation 163 * 164 * @return a new TcpTransport instance connected to the given location. 165 * 166 * @throws UnknownHostException 167 * @throws IOException 168 */ 169 protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { 170 return new TcpTransport(wf, socketFactory, location, localLocation); 171 } 172 173 protected ServerSocketFactory createServerSocketFactory() throws IOException { 174 return ServerSocketFactory.getDefault(); 175 } 176 177 protected SocketFactory createSocketFactory() throws IOException { 178 return SocketFactory.getDefault(); 179 } 180 181 protected Transport createInactivityMonitor(Transport transport, WireFormat format) { 182 return new InactivityMonitor(transport, format); 183 } 184}