001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.apache.activemq.command.ShutdownInfo;
029import org.apache.activemq.thread.DefaultThreadPools;
030import org.apache.activemq.thread.Task;
031import org.apache.activemq.thread.TaskRunner;
032import org.apache.activemq.transport.FutureResponse;
033import org.apache.activemq.transport.ResponseCallback;
034import org.apache.activemq.transport.Transport;
035import org.apache.activemq.transport.TransportDisposedIOException;
036import org.apache.activemq.transport.TransportListener;
037
038/**
039 * A Transport implementation that uses direct method invocations.
040 */
041public class VMTransport implements Transport, Task {
042
043    private static final Object DISCONNECT = new Object();
044    private static final AtomicLong NEXT_ID = new AtomicLong(0);
045
046    // Transport Configuration
047    protected VMTransport peer;
048    protected TransportListener transportListener;
049    protected boolean marshal;
050    protected boolean network;
051    protected boolean async = true;
052    protected int asyncQueueDepth = 2000;
053    protected final URI location;
054    protected final long id;
055
056    // Implementation
057    private LinkedBlockingQueue<Object> messageQueue;
058    private TaskRunner taskRunner;
059
060    // Transport State
061    protected final AtomicBoolean started = new AtomicBoolean();
062    protected final AtomicBoolean disposed = new AtomicBoolean();
063
064    private volatile int receiveCounter;
065
066    public VMTransport(URI location) {
067        this.location = location;
068        this.id = NEXT_ID.getAndIncrement();
069    }
070
071    public void setPeer(VMTransport peer) {
072        this.peer = peer;
073    }
074
075    public void oneway(Object command) throws IOException {
076
077        if (disposed.get()) {
078            throw new TransportDisposedIOException("Transport disposed.");
079        }
080
081        if (peer == null) {
082            throw new IOException("Peer not connected.");
083        }
084
085        try {
086
087            if (peer.disposed.get()) {
088                throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
089            }
090
091            if (peer.async || !peer.started.get()) {
092                peer.getMessageQueue().put(command);
093                peer.wakeup();
094                return;
095            }
096
097        } catch (InterruptedException e) {
098            InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
099            iioe.initCause(e);
100            throw iioe;
101        }
102
103        dispatch(peer, peer.messageQueue, command);
104    }
105
106    public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
107        TransportListener transportListener = transport.getTransportListener();
108        if (transportListener != null) {
109            synchronized (started) {
110
111                // Ensure that no additional commands entered the queue in the small time window
112                // before the start method locks the dispatch lock and the oneway method was in
113                // an put operation.
114                while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
115                    doDispatch(transport, transportListener, pending.poll());
116                }
117
118                // We are now in sync mode and won't enqueue any more commands to the target
119                // transport so lets clean up its resources.
120                transport.messageQueue = null;
121
122                // Don't dispatch if either end was disposed already.
123                if (command != null && !this.disposed.get() && !transport.isDisposed()) {
124                    doDispatch(transport, transportListener, command);
125                }
126            }
127        }
128    }
129
130    public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
131        if (command == DISCONNECT) {
132            transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
133        } else {
134            transport.receiveCounter++;
135            transportListener.onCommand(command);
136        }
137    }
138
139    public void start() throws Exception {
140
141        if (transportListener == null) {
142            throw new IOException("TransportListener not set.");
143        }
144
145        // If we are not in async mode we lock the dispatch lock here and then start to
146        // prevent any sync dispatches from occurring until we dispatch the pending messages
147        // to maintain delivery order.  When async this happens automatically so just set
148        // started and wakeup the task runner.
149        if (!async) {
150            synchronized (started) {
151                if (started.compareAndSet(false, true)) {
152                    LinkedBlockingQueue<Object> mq = getMessageQueue();
153                    Object command;
154                    while ((command = mq.poll()) != null && !disposed.get() ) {
155                        receiveCounter++;
156                        doDispatch(this, transportListener, command);
157                    }
158                }
159            }
160        } else {
161            if (started.compareAndSet(false, true)) {
162                wakeup();
163            }
164        }
165    }
166
167    public void stop() throws Exception {
168        // Only need to do this once, all future oneway calls will now
169        // fail as will any asnyc jobs in the task runner.
170        if (disposed.compareAndSet(false, true)) {
171
172            TaskRunner tr = taskRunner;
173            LinkedBlockingQueue<Object> mq = this.messageQueue;
174
175            taskRunner = null;
176            messageQueue = null;
177
178            if (mq != null) {
179                mq.clear();
180            }
181
182            // Allow pending deliveries to finish up, but don't wait
183            // forever in case of an stalled onCommand.
184            if (tr != null) {
185                try {
186                    tr.shutdown(TimeUnit.SECONDS.toMillis(1));
187                } catch(Exception e) {
188                }
189            }
190
191            // let the peer know that we are disconnecting after attempting
192            // to cleanly shutdown the async tasks so that this is the last
193            // command it see's.
194            try {
195                peer.transportListener.onCommand(new ShutdownInfo());
196            } catch (Exception ignore) {
197            }
198        }
199    }
200
201    protected void wakeup() {
202        if (async && started.get()) {
203            try {
204                getTaskRunner().wakeup();
205            } catch (InterruptedException e) {
206                Thread.currentThread().interrupt();
207            } catch (TransportDisposedIOException e) {
208            }
209        }
210    }
211
212    /**
213     * @see org.apache.activemq.thread.Task#iterate()
214     */
215    public boolean iterate() {
216
217        final TransportListener tl = transportListener;
218
219        LinkedBlockingQueue<Object> mq;
220        try {
221            mq = getMessageQueue();
222        } catch (TransportDisposedIOException e) {
223            return false;
224        }
225
226        Object command = mq.poll();
227        if (command != null && !disposed.get()) {
228            if( command == DISCONNECT ) {
229                tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
230            } else {
231                tl.onCommand(command);
232            }
233            return !mq.isEmpty() && !disposed.get();
234        } else {
235            if(disposed.get()) {
236                mq.clear();
237            }
238            return false;
239        }
240    }
241
242    public void setTransportListener(TransportListener commandListener) {
243        this.transportListener = commandListener;
244    }
245
246    public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
247        synchronized (this) {
248            if (messageQueue == null) {
249                messageQueue = asyncQueue;
250            }
251        }
252    }
253
254    public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
255        LinkedBlockingQueue<Object> result = messageQueue;
256        if (result == null) {
257            synchronized (this) {
258                result = messageQueue;
259                if (result == null) {
260                    if (disposed.get()) {
261                        throw new TransportDisposedIOException("The Transport has been disposed");
262                    }
263
264                    messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
265                }
266            }
267        }
268        return result;
269    }
270
271    protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
272        TaskRunner result = taskRunner;
273        if (result == null) {
274            synchronized (this) {
275                result = taskRunner;
276                if (result == null) {
277                    if (disposed.get()) {
278                        throw new TransportDisposedIOException("The Transport has been disposed");
279                    }
280
281                    taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
282                }
283            }
284        }
285        return result;
286    }
287
288    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
289        throw new AssertionError("Unsupported Method");
290    }
291
292    public Object request(Object command) throws IOException {
293        throw new AssertionError("Unsupported Method");
294    }
295
296    public Object request(Object command, int timeout) throws IOException {
297        throw new AssertionError("Unsupported Method");
298    }
299
300    public TransportListener getTransportListener() {
301        return transportListener;
302    }
303
304    public <T> T narrow(Class<T> target) {
305        if (target.isAssignableFrom(getClass())) {
306            return target.cast(this);
307        }
308        return null;
309    }
310
311    public boolean isMarshal() {
312        return marshal;
313    }
314
315    public void setMarshal(boolean marshal) {
316        this.marshal = marshal;
317    }
318
319    public boolean isNetwork() {
320        return network;
321    }
322
323    public void setNetwork(boolean network) {
324        this.network = network;
325    }
326
327    @Override
328    public String toString() {
329        return location + "#" + id;
330    }
331
332    public String getRemoteAddress() {
333        if (peer != null) {
334            return peer.toString();
335        }
336        return null;
337    }
338
339    /**
340     * @return the async
341     */
342    public boolean isAsync() {
343        return async;
344    }
345
346    /**
347     * @param async the async to set
348     */
349    public void setAsync(boolean async) {
350        this.async = async;
351    }
352
353    /**
354     * @return the asyncQueueDepth
355     */
356    public int getAsyncQueueDepth() {
357        return asyncQueueDepth;
358    }
359
360    /**
361     * @param asyncQueueDepth the asyncQueueDepth to set
362     */
363    public void setAsyncQueueDepth(int asyncQueueDepth) {
364        this.asyncQueueDepth = asyncQueueDepth;
365    }
366
367    public boolean isFaultTolerant() {
368        return false;
369    }
370
371    public boolean isDisposed() {
372        return disposed.get();
373    }
374
375    public boolean isConnected() {
376        return !disposed.get();
377    }
378
379    public void reconnect(URI uri) throws IOException {
380        throw new IOException("Transport reconnect is not supported");
381    }
382
383    public boolean isReconnectSupported() {
384        return false;
385    }
386
387    public boolean isUpdateURIsSupported() {
388        return false;
389    }
390
391    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
392        throw new IOException("URI update feature not supported");
393    }
394
395    public int getReceiveCounter() {
396        return receiveCounter;
397    }
398}