001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.proxy;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.Iterator;
023import java.util.concurrent.CopyOnWriteArrayList;
024import org.apache.activemq.Service;
025import org.apache.activemq.transport.CompositeTransport;
026import org.apache.activemq.transport.Transport;
027import org.apache.activemq.transport.TransportAcceptListener;
028import org.apache.activemq.transport.TransportFactory;
029import org.apache.activemq.transport.TransportFilter;
030import org.apache.activemq.transport.TransportServer;
031import org.apache.activemq.util.ServiceStopper;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * @org.apache.xbean.XBean
037 * 
038 * 
039 */
040public class ProxyConnector implements Service {
041
042    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnector.class);
043    private TransportServer server;
044    private URI bind;
045    private URI remote;
046    private URI localUri;
047    private String name;
048    /**
049     * Should we proxy commands to the local broker using VM transport as well?
050     */
051    private boolean proxyToLocalBroker = true;
052    
053    private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
054
055    public void start() throws Exception {
056
057        this.getServer().setAcceptListener(new TransportAcceptListener() {
058            public void onAccept(Transport localTransport) {
059                try {
060                    Transport remoteTransport = createRemoteTransport();
061                    ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport);
062                    connections.add(connection);
063                    connection.start();
064                } catch (Exception e) {
065                    onAcceptError(e);
066                }
067            }
068
069            public void onAcceptError(Exception error) {
070                LOG.error("Could not accept connection: " + error, error);
071            }
072        });
073        getServer().start();
074        LOG.info("Proxy Connector " + getName() + " Started");
075
076    }
077
078    public void stop() throws Exception {
079        ServiceStopper ss = new ServiceStopper();
080        if (this.server != null) {
081            ss.stop(this.server);
082        }
083        for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
084            LOG.info("Connector stopped: Stopping proxy.");
085            ss.stop(iter.next());
086        }
087        ss.throwFirstException();
088        LOG.info("Proxy Connector " + getName() + " Stopped");
089    }
090
091    // Properties
092    // -------------------------------------------------------------------------
093
094    public URI getLocalUri() {
095        return localUri;
096    }
097
098    public void setLocalUri(URI localURI) {
099        this.localUri = localURI;
100    }
101
102    public URI getBind() {
103        return bind;
104    }
105
106    public void setBind(URI bind) {
107        this.bind = bind;
108    }
109
110    public URI getRemote() {
111        return remote;
112    }
113
114    public void setRemote(URI remote) {
115        this.remote = remote;
116    }
117
118    public TransportServer getServer() throws IOException, URISyntaxException {
119        if (server == null) {
120            server = createServer();
121        }
122        return server;
123    }
124
125    public void setServer(TransportServer server) {
126        this.server = server;
127    }
128
129    protected TransportServer createServer() throws IOException, URISyntaxException {
130        if (bind == null) {
131            throw new IllegalArgumentException("You must specify either a server or the bind property");
132        }
133        return TransportFactory.bind(bind);
134    }
135
136    private Transport createRemoteTransport() throws Exception {
137        Transport transport = TransportFactory.compositeConnect(remote);
138        CompositeTransport ct = transport.narrow(CompositeTransport.class);
139        if (ct != null && localUri != null && proxyToLocalBroker) {
140            ct.add(false,new URI[] {localUri});
141        }
142
143        // Add a transport filter so that we can track the transport life cycle
144        transport = new TransportFilter(transport) {
145            @Override
146            public void stop() throws Exception {
147                LOG.info("Stopping proxy.");
148                super.stop();
149                connections.remove(this);
150            }
151        };
152        return transport;
153    }
154
155    public String getName() {
156        if (name == null) {
157            if (server != null) {
158                name = server.getConnectURI().toString();
159            } else {
160                name = "proxy";
161            }
162        }
163        return name;
164    }
165
166    public void setName(String name) {
167        this.name = name;
168    }
169
170    public boolean isProxyToLocalBroker() {
171        return proxyToLocalBroker;
172    }
173
174    public void setProxyToLocalBroker(boolean proxyToLocalBroker) {
175        this.proxyToLocalBroker = proxyToLocalBroker;
176    }
177
178}