001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.activemq.command.BrokerInfo;
025import org.apache.activemq.transport.MutexTransport;
026import org.apache.activemq.transport.ResponseCorrelator;
027import org.apache.activemq.transport.Transport;
028import org.apache.activemq.transport.TransportAcceptListener;
029import org.apache.activemq.transport.TransportServer;
030
031/**
032 * Broker side of the VMTransport
033 */
034public class VMTransportServer implements TransportServer {
035
036    private TransportAcceptListener acceptListener;
037    private final URI location;
038    private boolean disposed;
039
040    private final AtomicInteger connectionCount = new AtomicInteger(0);
041    private final boolean disposeOnDisconnect;
042
043    /**
044     * @param location
045     * @param disposeOnDisconnect
046     */
047    public VMTransportServer(URI location, boolean disposeOnDisconnect) {
048        this.location = location;
049        this.disposeOnDisconnect = disposeOnDisconnect;
050    }
051
052    /**
053     * @return a pretty print of this
054     */
055    public String toString() {
056        return "VMTransportServer(" + location + ")";
057    }
058
059    /**
060     * @return new VMTransport
061     * @throws IOException
062     */
063    public VMTransport connect() throws IOException {
064        TransportAcceptListener al;
065        synchronized (this) {
066            if (disposed) {
067                throw new IOException("Server has been disposed.");
068            }
069            al = acceptListener;
070        }
071        if (al == null) {
072            throw new IOException("Server TransportAcceptListener is null.");
073        }
074
075        connectionCount.incrementAndGet();
076        VMTransport client = new VMTransport(location) {
077            public void stop() throws Exception {
078                if (!disposed.get()) {
079                    super.stop();
080                    if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {
081                        VMTransportServer.this.stop();
082                    }
083                }
084            };
085        };
086
087        VMTransport server = new VMTransport(location);
088        client.setPeer(server);
089        server.setPeer(client);
090        al.onAccept(configure(server));
091        return client;
092    }
093
094    /**
095     * Configure transport
096     *
097     * @param transport
098     * @return the Transport
099     */
100    public static Transport configure(Transport transport) {
101        transport = new MutexTransport(transport);
102        transport = new ResponseCorrelator(transport);
103        return transport;
104    }
105
106    /**
107     * Set the Transport accept listener for new Connections
108     *
109     * @param acceptListener
110     */
111    public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
112        this.acceptListener = acceptListener;
113    }
114
115    public void start() throws IOException {
116    }
117
118    public void stop() throws IOException {
119        VMTransportFactory.stopped(this);
120    }
121
122    public URI getConnectURI() {
123        return location;
124    }
125
126    public URI getBindURI() {
127        return location;
128    }
129
130    public void setBrokerInfo(BrokerInfo brokerInfo) {
131    }
132
133    public InetSocketAddress getSocketAddress() {
134        return null;
135    }
136
137    public int getConnectionCount() {
138        return connectionCount.intValue();
139    }
140}