001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.activemq.broker.ConnectionContext;
044import org.apache.activemq.broker.region.BaseDestination;
045import org.apache.activemq.broker.scheduler.JobSchedulerStore;
046import org.apache.activemq.command.ActiveMQDestination;
047import org.apache.activemq.command.ActiveMQQueue;
048import org.apache.activemq.command.ActiveMQTempQueue;
049import org.apache.activemq.command.ActiveMQTempTopic;
050import org.apache.activemq.command.ActiveMQTopic;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.MessageId;
054import org.apache.activemq.command.ProducerId;
055import org.apache.activemq.command.SubscriptionInfo;
056import org.apache.activemq.command.TransactionId;
057import org.apache.activemq.openwire.OpenWireFormat;
058import org.apache.activemq.protobuf.Buffer;
059import org.apache.activemq.store.AbstractMessageStore;
060import org.apache.activemq.store.IndexListener;
061import org.apache.activemq.store.ListenableFuture;
062import org.apache.activemq.store.MessageRecoveryListener;
063import org.apache.activemq.store.MessageStore;
064import org.apache.activemq.store.MessageStoreStatistics;
065import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
066import org.apache.activemq.store.PersistenceAdapter;
067import org.apache.activemq.store.TopicMessageStore;
068import org.apache.activemq.store.TransactionIdTransformer;
069import org.apache.activemq.store.TransactionStore;
070import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
071import org.apache.activemq.store.kahadb.data.KahaDestination;
072import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
073import org.apache.activemq.store.kahadb.data.KahaLocation;
074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
076import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
077import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
078import org.apache.activemq.store.kahadb.disk.journal.Location;
079import org.apache.activemq.store.kahadb.disk.page.Transaction;
080import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
081import org.apache.activemq.usage.MemoryUsage;
082import org.apache.activemq.usage.SystemUsage;
083import org.apache.activemq.util.ServiceStopper;
084import org.apache.activemq.util.ThreadPoolUtils;
085import org.apache.activemq.wireformat.WireFormat;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
090    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
091    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
092
093    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
094    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
095            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
096    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
097    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
098            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
099
100    protected ExecutorService queueExecutor;
101    protected ExecutorService topicExecutor;
102    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
103    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
104    final WireFormat wireFormat = new OpenWireFormat();
105    private SystemUsage usageManager;
106    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
107    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
108    Semaphore globalQueueSemaphore;
109    Semaphore globalTopicSemaphore;
110    private boolean concurrentStoreAndDispatchQueues = true;
111    // when true, message order may be compromised when cache is exhausted if store is out
112    // or order w.r.t cache
113    private boolean concurrentStoreAndDispatchTopics = false;
114    private final boolean concurrentStoreAndDispatchTransactions = false;
115    private int maxAsyncJobs = MAX_ASYNC_JOBS;
116    private final KahaDBTransactionStore transactionStore;
117    private TransactionIdTransformer transactionIdTransformer;
118
119    public KahaDBStore() {
120        this.transactionStore = new KahaDBTransactionStore(this);
121        this.transactionIdTransformer = new TransactionIdTransformer() {
122            @Override
123            public TransactionId transform(TransactionId txid) {
124                return txid;
125            }
126        };
127    }
128
129    @Override
130    public String toString() {
131        return "KahaDB:[" + directory.getAbsolutePath() + "]";
132    }
133
134    @Override
135    public void setBrokerName(String brokerName) {
136    }
137
138    @Override
139    public void setUsageManager(SystemUsage usageManager) {
140        this.usageManager = usageManager;
141    }
142
143    public SystemUsage getUsageManager() {
144        return this.usageManager;
145    }
146
147    /**
148     * @return the concurrentStoreAndDispatch
149     */
150    public boolean isConcurrentStoreAndDispatchQueues() {
151        return this.concurrentStoreAndDispatchQueues;
152    }
153
154    /**
155     * @param concurrentStoreAndDispatch
156     *            the concurrentStoreAndDispatch to set
157     */
158    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
159        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
160    }
161
162    /**
163     * @return the concurrentStoreAndDispatch
164     */
165    public boolean isConcurrentStoreAndDispatchTopics() {
166        return this.concurrentStoreAndDispatchTopics;
167    }
168
169    /**
170     * @param concurrentStoreAndDispatch
171     *            the concurrentStoreAndDispatch to set
172     */
173    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
174        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
175    }
176
177    public boolean isConcurrentStoreAndDispatchTransactions() {
178        return this.concurrentStoreAndDispatchTransactions;
179    }
180
181    /**
182     * @return the maxAsyncJobs
183     */
184    public int getMaxAsyncJobs() {
185        return this.maxAsyncJobs;
186    }
187
188    /**
189     * @param maxAsyncJobs
190     *            the maxAsyncJobs to set
191     */
192    public void setMaxAsyncJobs(int maxAsyncJobs) {
193        this.maxAsyncJobs = maxAsyncJobs;
194    }
195
196
197    @Override
198    protected void configureMetadata() {
199        if (brokerService != null) {
200            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
201            wireFormat.setVersion(metadata.openwireVersion);
202
203            if (LOG.isDebugEnabled()) {
204                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
205            }
206
207        }
208    }
209
210    @Override
211    public void doStart() throws Exception {
212        //configure the metadata before start, right now
213        //this is just the open wire version
214        configureMetadata();
215
216        super.doStart();
217
218        if (brokerService != null) {
219            // In case the recovered store used a different OpenWire version log a warning
220            // to assist in determining why journal reads fail.
221            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
222                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
223                         "than the version configured[{}] reverting to the version " +
224                         "used by this store, some newer broker features may not work" +
225                         "as expected.",
226                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
227
228                // Update the broker service instance to the actual version in use.
229                wireFormat.setVersion(metadata.openwireVersion);
230                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
231            }
232        }
233
234        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
235        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
236        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
237        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
238        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
239            asyncQueueJobQueue, new ThreadFactory() {
240                @Override
241                public Thread newThread(Runnable runnable) {
242                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
243                    thread.setDaemon(true);
244                    return thread;
245                }
246            });
247        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
248            asyncTopicJobQueue, new ThreadFactory() {
249                @Override
250                public Thread newThread(Runnable runnable) {
251                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
252                    thread.setDaemon(true);
253                    return thread;
254                }
255            });
256    }
257
258    @Override
259    public void doStop(ServiceStopper stopper) throws Exception {
260        // drain down async jobs
261        LOG.info("Stopping async queue tasks");
262        if (this.globalQueueSemaphore != null) {
263            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
264        }
265        synchronized (this.asyncQueueMaps) {
266            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
267                synchronized (m) {
268                    for (StoreTask task : m.values()) {
269                        task.cancel();
270                    }
271                }
272            }
273            this.asyncQueueMaps.clear();
274        }
275        LOG.info("Stopping async topic tasks");
276        if (this.globalTopicSemaphore != null) {
277            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
278        }
279        synchronized (this.asyncTopicMaps) {
280            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
281                synchronized (m) {
282                    for (StoreTask task : m.values()) {
283                        task.cancel();
284                    }
285                }
286            }
287            this.asyncTopicMaps.clear();
288        }
289        if (this.globalQueueSemaphore != null) {
290            this.globalQueueSemaphore.drainPermits();
291        }
292        if (this.globalTopicSemaphore != null) {
293            this.globalTopicSemaphore.drainPermits();
294        }
295        if (this.queueExecutor != null) {
296            ThreadPoolUtils.shutdownNow(queueExecutor);
297            queueExecutor = null;
298        }
299        if (this.topicExecutor != null) {
300            ThreadPoolUtils.shutdownNow(topicExecutor);
301            topicExecutor = null;
302        }
303        LOG.info("Stopped KahaDB");
304        super.doStop(stopper);
305    }
306
307    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
308        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
309            @Override
310            public Location execute(Transaction tx) throws IOException {
311                StoredDestination sd = getStoredDestination(destination, tx);
312                Long sequence = sd.messageIdIndex.get(tx, key);
313                if (sequence == null) {
314                    return null;
315                }
316                return sd.orderIndex.get(tx, sequence).location;
317            }
318        });
319    }
320
321    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
322        StoreQueueTask task = null;
323        synchronized (store.asyncTaskMap) {
324            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
325        }
326        return task;
327    }
328
329    // with asyncTaskMap locked
330    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
331        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
332        this.queueExecutor.execute(task);
333    }
334
335    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
336        StoreTopicTask task = null;
337        synchronized (store.asyncTaskMap) {
338            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
339        }
340        return task;
341    }
342
343    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
344        synchronized (store.asyncTaskMap) {
345            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
346        }
347        this.topicExecutor.execute(task);
348    }
349
350    @Override
351    public TransactionStore createTransactionStore() throws IOException {
352        return this.transactionStore;
353    }
354
355    public boolean getForceRecoverIndex() {
356        return this.forceRecoverIndex;
357    }
358
359    public void setForceRecoverIndex(boolean forceRecoverIndex) {
360        this.forceRecoverIndex = forceRecoverIndex;
361    }
362
363    public class KahaDBMessageStore extends AbstractMessageStore {
364        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
365        protected KahaDestination dest;
366        private final int maxAsyncJobs;
367        private final Semaphore localDestinationSemaphore;
368
369        double doneTasks, canceledTasks = 0;
370
371        public KahaDBMessageStore(ActiveMQDestination destination) {
372            super(destination);
373            this.dest = convert(destination);
374            this.maxAsyncJobs = getMaxAsyncJobs();
375            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
376        }
377
378        @Override
379        public ActiveMQDestination getDestination() {
380            return destination;
381        }
382
383        @Override
384        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
385                throws IOException {
386            if (isConcurrentStoreAndDispatchQueues()) {
387                message.beforeMarshall(wireFormat);
388                StoreQueueTask result = new StoreQueueTask(this, context, message);
389                ListenableFuture<Object> future = result.getFuture();
390                message.getMessageId().setFutureOrSequenceLong(future);
391                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
392                result.aquireLocks();
393                synchronized (asyncTaskMap) {
394                    addQueueTask(this, result);
395                    if (indexListener != null) {
396                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
397                    }
398                }
399                return future;
400            } else {
401                return super.asyncAddQueueMessage(context, message);
402            }
403        }
404
405        @Override
406        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
407            if (isConcurrentStoreAndDispatchQueues()) {
408                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
409                StoreQueueTask task = null;
410                synchronized (asyncTaskMap) {
411                    task = (StoreQueueTask) asyncTaskMap.get(key);
412                }
413                if (task != null) {
414                    if (ack.isInTransaction() || !task.cancel()) {
415                        try {
416                            task.future.get();
417                        } catch (InterruptedException e) {
418                            throw new InterruptedIOException(e.toString());
419                        } catch (Exception ignored) {
420                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
421                        }
422                        removeMessage(context, ack);
423                    } else {
424                        synchronized (asyncTaskMap) {
425                            asyncTaskMap.remove(key);
426                        }
427                    }
428                } else {
429                    removeMessage(context, ack);
430                }
431            } else {
432                removeMessage(context, ack);
433            }
434        }
435
436        @Override
437        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
438            final KahaAddMessageCommand command = new KahaAddMessageCommand();
439            command.setDestination(dest);
440            command.setMessageId(message.getMessageId().toProducerKey());
441            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
442            command.setPriority(message.getPriority());
443            command.setPrioritySupported(isPrioritizedMessages());
444            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
445            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
446            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
447                // sync add? (for async, future present from getFutureOrSequenceLong)
448                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
449
450                @Override
451                public void sequenceAssignedWithIndexLocked(final long sequence) {
452                    message.getMessageId().setFutureOrSequenceLong(sequence);
453                    if (indexListener != null) {
454                        if (possibleFuture == null) {
455                            trackPendingAdd(dest, sequence);
456                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
457                                @Override
458                                public void run() {
459                                    trackPendingAddComplete(dest, sequence);
460                                }
461                            }));
462                        }
463                    }
464                }
465            }, null);
466        }
467
468        @Override
469        public void updateMessage(Message message) throws IOException {
470            if (LOG.isTraceEnabled()) {
471                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
472            }
473            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
474            KahaAddMessageCommand command = new KahaAddMessageCommand();
475            command.setDestination(dest);
476            command.setMessageId(message.getMessageId().toProducerKey());
477            command.setPriority(message.getPriority());
478            command.setPrioritySupported(prioritizedMessages);
479            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
480            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
481            updateMessageCommand.setMessage(command);
482            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
483        }
484
485        @Override
486        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
487            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
488            command.setDestination(dest);
489            command.setMessageId(ack.getLastMessageId().toProducerKey());
490            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
491
492            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
493            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
494            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
495        }
496
497        @Override
498        public void removeAllMessages(ConnectionContext context) throws IOException {
499            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
500            command.setDestination(dest);
501            store(command, true, null, null);
502        }
503
504        @Override
505        public Message getMessage(MessageId identity) throws IOException {
506            final String key = identity.toProducerKey();
507
508            // Hopefully one day the page file supports concurrent read
509            // operations... but for now we must
510            // externally synchronize...
511            Location location;
512            indexLock.writeLock().lock();
513            try {
514                location = findMessageLocation(key, dest);
515            } finally {
516                indexLock.writeLock().unlock();
517            }
518            if (location == null) {
519                return null;
520            }
521
522            return loadMessage(location);
523        }
524
525        @Override
526        public boolean isEmpty() throws IOException {
527            indexLock.writeLock().lock();
528            try {
529                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
530                    @Override
531                    public Boolean execute(Transaction tx) throws IOException {
532                        // Iterate through all index entries to get a count of
533                        // messages in the destination.
534                        StoredDestination sd = getStoredDestination(dest, tx);
535                        return sd.locationIndex.isEmpty(tx);
536                    }
537                });
538            } finally {
539                indexLock.writeLock().unlock();
540            }
541        }
542
543        @Override
544        public void recover(final MessageRecoveryListener listener) throws Exception {
545            // recovery may involve expiry which will modify
546            indexLock.writeLock().lock();
547            try {
548                pageFile.tx().execute(new Transaction.Closure<Exception>() {
549                    @Override
550                    public void execute(Transaction tx) throws Exception {
551                        StoredDestination sd = getStoredDestination(dest, tx);
552                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
553                        sd.orderIndex.resetCursorPosition();
554                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
555                                .hasNext(); ) {
556                            Entry<Long, MessageKeys> entry = iterator.next();
557                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
558                                continue;
559                            }
560                            Message msg = loadMessage(entry.getValue().location);
561                            listener.recoverMessage(msg);
562                        }
563                    }
564                });
565            } finally {
566                indexLock.writeLock().unlock();
567            }
568        }
569
570        @Override
571        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
572            indexLock.writeLock().lock();
573            try {
574                pageFile.tx().execute(new Transaction.Closure<Exception>() {
575                    @Override
576                    public void execute(Transaction tx) throws Exception {
577                        StoredDestination sd = getStoredDestination(dest, tx);
578                        Entry<Long, MessageKeys> entry = null;
579                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
580                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
581                            entry = iterator.next();
582                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
583                                continue;
584                            }
585                            Message msg = loadMessage(entry.getValue().location);
586                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
587                            listener.recoverMessage(msg);
588                            counter++;
589                            if (counter >= maxReturned) {
590                                break;
591                            }
592                        }
593                        sd.orderIndex.stoppedIterating();
594                    }
595                });
596            } finally {
597                indexLock.writeLock().unlock();
598            }
599        }
600
601        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
602            int counter = 0;
603            String id;
604            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
605                id = iterator.next();
606                iterator.remove();
607                Long sequence = sd.messageIdIndex.get(tx, id);
608                if (sequence != null) {
609                    if (sd.orderIndex.alreadyDispatched(sequence)) {
610                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
611                        counter++;
612                        if (counter >= maxReturned) {
613                            break;
614                        }
615                    } else {
616                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
617                    }
618                } else {
619                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
620                }
621            }
622            return counter;
623        }
624
625
626        @Override
627        public void resetBatching() {
628            if (pageFile.isLoaded()) {
629                indexLock.writeLock().lock();
630                try {
631                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
632                        @Override
633                        public void execute(Transaction tx) throws Exception {
634                            StoredDestination sd = getExistingStoredDestination(dest, tx);
635                            if (sd != null) {
636                                sd.orderIndex.resetCursorPosition();}
637                            }
638                        });
639                } catch (Exception e) {
640                    LOG.error("Failed to reset batching",e);
641                } finally {
642                    indexLock.writeLock().unlock();
643                }
644            }
645        }
646
647        @Override
648        public void setBatch(final MessageId identity) throws IOException {
649            indexLock.writeLock().lock();
650            try {
651                pageFile.tx().execute(new Transaction.Closure<IOException>() {
652                    @Override
653                    public void execute(Transaction tx) throws IOException {
654                        StoredDestination sd = getStoredDestination(dest, tx);
655                        Long location = (Long) identity.getFutureOrSequenceLong();
656                        Long pending = sd.orderIndex.minPendingAdd();
657                        if (pending != null) {
658                            location = Math.min(location, pending-1);
659                        }
660                        sd.orderIndex.setBatch(tx, location);
661                    }
662                });
663            } finally {
664                indexLock.writeLock().unlock();
665            }
666        }
667
668        @Override
669        public void setMemoryUsage(MemoryUsage memoryUsage) {
670        }
671        @Override
672        public void start() throws Exception {
673            super.start();
674        }
675        @Override
676        public void stop() throws Exception {
677            super.stop();
678        }
679
680        protected void lockAsyncJobQueue() {
681            try {
682                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
683                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
684                }
685            } catch (Exception e) {
686                LOG.error("Failed to lock async jobs for " + this.destination, e);
687            }
688        }
689
690        protected void unlockAsyncJobQueue() {
691            this.localDestinationSemaphore.release(this.maxAsyncJobs);
692        }
693
694        protected void acquireLocalAsyncLock() {
695            try {
696                this.localDestinationSemaphore.acquire();
697            } catch (InterruptedException e) {
698                LOG.error("Failed to aquire async lock for " + this.destination, e);
699            }
700        }
701
702        protected void releaseLocalAsyncLock() {
703            this.localDestinationSemaphore.release();
704        }
705
706        @Override
707        public String toString(){
708            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
709        }
710
711        @Override
712        protected void recoverMessageStoreStatistics() throws IOException {
713            try {
714                MessageStoreStatistics recoveredStatistics;
715                lockAsyncJobQueue();
716                indexLock.writeLock().lock();
717                try {
718                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
719                        @Override
720                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
721                            MessageStoreStatistics statistics = new MessageStoreStatistics();
722
723                            // Iterate through all index entries to get the size of each message
724                            StoredDestination sd = getStoredDestination(dest, tx);
725                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
726                                int locationSize = iterator.next().getKey().getSize();
727                                statistics.getMessageCount().increment();
728                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
729                            }
730                           return statistics;
731                        }
732                    });
733                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
734                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
735                } finally {
736                    indexLock.writeLock().unlock();
737                }
738            } finally {
739                unlockAsyncJobQueue();
740            }
741        }
742    }
743
744    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
745        private final AtomicInteger subscriptionCount = new AtomicInteger();
746        protected final MessageStoreSubscriptionStatistics messageStoreSubStats =
747                new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
748
749        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
750            super(destination);
751            this.subscriptionCount.set(getAllSubscriptions().length);
752            if (isConcurrentStoreAndDispatchTopics()) {
753                asyncTopicMaps.add(asyncTaskMap);
754            }
755        }
756
757        @Override
758        protected void recoverMessageStoreStatistics() throws IOException {
759            super.recoverMessageStoreStatistics();
760            this.recoverMessageStoreSubMetrics();
761        }
762
763        @Override
764        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
765                throws IOException {
766            if (isConcurrentStoreAndDispatchTopics()) {
767                message.beforeMarshall(wireFormat);
768                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
769                result.aquireLocks();
770                addTopicTask(this, result);
771                return result.getFuture();
772            } else {
773                return super.asyncAddTopicMessage(context, message);
774            }
775        }
776
777        @Override
778        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
779                                MessageId messageId, MessageAck ack) throws IOException {
780            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
781            if (isConcurrentStoreAndDispatchTopics()) {
782                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
783                StoreTopicTask task = null;
784                synchronized (asyncTaskMap) {
785                    task = (StoreTopicTask) asyncTaskMap.get(key);
786                }
787                if (task != null) {
788                    if (task.addSubscriptionKey(subscriptionKey)) {
789                        removeTopicTask(this, messageId);
790                        if (task.cancel()) {
791                            synchronized (asyncTaskMap) {
792                                asyncTaskMap.remove(key);
793                            }
794                        }
795                    }
796                } else {
797                    doAcknowledge(context, subscriptionKey, messageId, ack);
798                }
799            } else {
800                doAcknowledge(context, subscriptionKey, messageId, ack);
801            }
802        }
803
804        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
805                throws IOException {
806            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
807            command.setDestination(dest);
808            command.setSubscriptionKey(subscriptionKey);
809            command.setMessageId(messageId.toProducerKey());
810            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
811            if (ack != null && ack.isUnmatchedAck()) {
812                command.setAck(UNMATCHED);
813            } else {
814                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
815                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
816            }
817            store(command, false, null, null);
818        }
819
820        @Override
821        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
822            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
823                    .getSubscriptionName());
824            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
825            command.setDestination(dest);
826            command.setSubscriptionKey(subscriptionKey.toString());
827            command.setRetroactive(retroactive);
828            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
829            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
830            store(command, isEnableJournalDiskSyncs() && true, null, null);
831            this.subscriptionCount.incrementAndGet();
832        }
833
834        @Override
835        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
836            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
837            command.setDestination(dest);
838            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
839            store(command, isEnableJournalDiskSyncs() && true, null, null);
840            this.subscriptionCount.decrementAndGet();
841        }
842
843        @Override
844        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
845
846            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
847            indexLock.writeLock().lock();
848            try {
849                pageFile.tx().execute(new Transaction.Closure<IOException>() {
850                    @Override
851                    public void execute(Transaction tx) throws IOException {
852                        StoredDestination sd = getStoredDestination(dest, tx);
853                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
854                                .hasNext();) {
855                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
856                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
857                                    .getValue().getSubscriptionInfo().newInput()));
858                            subscriptions.add(info);
859
860                        }
861                    }
862                });
863            } finally {
864                indexLock.writeLock().unlock();
865            }
866
867            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
868            subscriptions.toArray(rc);
869            return rc;
870        }
871
872        @Override
873        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
874            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
875            indexLock.writeLock().lock();
876            try {
877                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
878                    @Override
879                    public SubscriptionInfo execute(Transaction tx) throws IOException {
880                        StoredDestination sd = getStoredDestination(dest, tx);
881                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
882                        if (command == null) {
883                            return null;
884                        }
885                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
886                                .getSubscriptionInfo().newInput()));
887                    }
888                });
889            } finally {
890                indexLock.writeLock().unlock();
891            }
892        }
893
894        @Override
895        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
896            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
897
898            if (isEnableSubscriptionStatistics()) {
899                return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount();
900            } else {
901
902                indexLock.writeLock().lock();
903                try {
904                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
905                        @Override
906                        public Integer execute(Transaction tx) throws IOException {
907                            StoredDestination sd = getStoredDestination(dest, tx);
908                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
909                            if (cursorPos == null) {
910                                // The subscription might not exist.
911                                return 0;
912                            }
913
914                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
915                        }
916                    });
917                } finally {
918                    indexLock.writeLock().unlock();
919                }
920            }
921        }
922
923
924        @Override
925        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
926            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
927            if (isEnableSubscriptionStatistics()) {
928                return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize();
929            } else {
930                indexLock.writeLock().lock();
931                try {
932                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
933                        @Override
934                        public Integer execute(Transaction tx) throws IOException {
935                            StoredDestination sd = getStoredDestination(dest, tx);
936                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
937                            if (cursorPos == null) {
938                                // The subscription might not exist.
939                                return 0;
940                            }
941
942                            return (int) getStoredMessageSize(tx, sd, subscriptionKey);
943                        }
944                    });
945                } finally {
946                    indexLock.writeLock().unlock();
947                }
948            }
949        }
950
951
952        protected void recoverMessageStoreSubMetrics() throws IOException {
953            if (isEnableSubscriptionStatistics()) {
954
955                final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
956                indexLock.writeLock().lock();
957                try {
958                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
959                        @Override
960                        public void execute(Transaction tx) throws IOException {
961                            StoredDestination sd = getStoredDestination(dest, tx);
962                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
963                                    .iterator(tx); iterator.hasNext();) {
964                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
965
966                                String subscriptionKey = entry.getKey();
967                                LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
968                                if (cursorPos != null) {
969                                    long size = getStoredMessageSize(tx, sd, subscriptionKey);
970                                    statistics.getMessageCount(subscriptionKey)
971                                            .setCount(getStoredMessageCount(tx, sd, subscriptionKey));
972                                    statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0);
973                                }
974                            }
975                        }
976                    });
977                } finally {
978                    indexLock.writeLock().unlock();
979                }
980            }
981        }
982
983        @Override
984        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
985                throws Exception {
986            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
987            @SuppressWarnings("unused")
988            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
989            indexLock.writeLock().lock();
990            try {
991                pageFile.tx().execute(new Transaction.Closure<Exception>() {
992                    @Override
993                    public void execute(Transaction tx) throws Exception {
994                        StoredDestination sd = getStoredDestination(dest, tx);
995                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
996                        sd.orderIndex.setBatch(tx, cursorPos);
997                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
998                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
999                                .hasNext();) {
1000                            Entry<Long, MessageKeys> entry = iterator.next();
1001                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1002                                continue;
1003                            }
1004                            listener.recoverMessage(loadMessage(entry.getValue().location));
1005                        }
1006                        sd.orderIndex.resetCursorPosition();
1007                    }
1008                });
1009            } finally {
1010                indexLock.writeLock().unlock();
1011            }
1012        }
1013
1014        @Override
1015        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
1016                final MessageRecoveryListener listener) throws Exception {
1017            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1018            @SuppressWarnings("unused")
1019            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1020            indexLock.writeLock().lock();
1021            try {
1022                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1023                    @Override
1024                    public void execute(Transaction tx) throws Exception {
1025                        StoredDestination sd = getStoredDestination(dest, tx);
1026                        sd.orderIndex.resetCursorPosition();
1027                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
1028                        if (moc == null) {
1029                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
1030                            if (pos == null) {
1031                                // sub deleted
1032                                return;
1033                            }
1034                            sd.orderIndex.setBatch(tx, pos);
1035                            moc = sd.orderIndex.cursor;
1036                        } else {
1037                            sd.orderIndex.cursor.sync(moc);
1038                        }
1039
1040                        Entry<Long, MessageKeys> entry = null;
1041                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
1042                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
1043                                .hasNext();) {
1044                            entry = iterator.next();
1045                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1046                                continue;
1047                            }
1048                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
1049                                counter++;
1050                            }
1051                            if (counter >= maxReturned || listener.hasSpace() == false) {
1052                                break;
1053                            }
1054                        }
1055                        sd.orderIndex.stoppedIterating();
1056                        if (entry != null) {
1057                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1058                            sd.subscriptionCursors.put(subscriptionKey, copy);
1059                        }
1060                    }
1061                });
1062            } finally {
1063                indexLock.writeLock().unlock();
1064            }
1065        }
1066
1067        @Override
1068        public void resetBatching(String clientId, String subscriptionName) {
1069            try {
1070                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1071                indexLock.writeLock().lock();
1072                try {
1073                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1074                        @Override
1075                        public void execute(Transaction tx) throws IOException {
1076                            StoredDestination sd = getStoredDestination(dest, tx);
1077                            sd.subscriptionCursors.remove(subscriptionKey);
1078                        }
1079                    });
1080                }finally {
1081                    indexLock.writeLock().unlock();
1082                }
1083            } catch (IOException e) {
1084                throw new RuntimeException(e);
1085            }
1086        }
1087
1088        @Override
1089        public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
1090            return messageStoreSubStats;
1091        }
1092    }
1093
1094    String subscriptionKey(String clientId, String subscriptionName) {
1095        return clientId + ":" + subscriptionName;
1096    }
1097
1098    @Override
1099    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1100        String key = key(convert(destination));
1101        MessageStore store = storeCache.get(key(convert(destination)));
1102        if (store == null) {
1103            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1104            store = storeCache.putIfAbsent(key, queueStore);
1105            if (store == null) {
1106                store = queueStore;
1107            }
1108        }
1109
1110        return store;
1111    }
1112
1113    @Override
1114    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1115        String key = key(convert(destination));
1116        MessageStore store = storeCache.get(key(convert(destination)));
1117        if (store == null) {
1118            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1119            store = storeCache.putIfAbsent(key, topicStore);
1120            if (store == null) {
1121                store = topicStore;
1122            }
1123        }
1124
1125        return (TopicMessageStore) store;
1126    }
1127
1128    /**
1129     * Cleanup method to remove any state associated with the given destination.
1130     * This method does not stop the message store (it might not be cached).
1131     *
1132     * @param destination
1133     *            Destination to forget
1134     */
1135    @Override
1136    public void removeQueueMessageStore(ActiveMQQueue destination) {
1137    }
1138
1139    /**
1140     * Cleanup method to remove any state associated with the given destination
1141     * This method does not stop the message store (it might not be cached).
1142     *
1143     * @param destination
1144     *            Destination to forget
1145     */
1146    @Override
1147    public void removeTopicMessageStore(ActiveMQTopic destination) {
1148    }
1149
1150    @Override
1151    public void deleteAllMessages() throws IOException {
1152        deleteAllMessages = true;
1153    }
1154
1155    @Override
1156    public Set<ActiveMQDestination> getDestinations() {
1157        try {
1158            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1159            indexLock.writeLock().lock();
1160            try {
1161                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1162                    @Override
1163                    public void execute(Transaction tx) throws IOException {
1164                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1165                                .hasNext();) {
1166                            Entry<String, StoredDestination> entry = iterator.next();
1167                            //Removing isEmpty topic check - see AMQ-5875
1168                            rc.add(convert(entry.getKey()));
1169                        }
1170                    }
1171                });
1172            }finally {
1173                indexLock.writeLock().unlock();
1174            }
1175            return rc;
1176        } catch (IOException e) {
1177            throw new RuntimeException(e);
1178        }
1179    }
1180
1181    @Override
1182    public long getLastMessageBrokerSequenceId() throws IOException {
1183        return 0;
1184    }
1185
1186    @Override
1187    public long getLastProducerSequenceId(ProducerId id) {
1188        indexLock.writeLock().lock();
1189        try {
1190            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1191        } finally {
1192            indexLock.writeLock().unlock();
1193        }
1194    }
1195
1196    @Override
1197    public long size() {
1198        try {
1199            return journalSize.get() + getPageFile().getDiskSize();
1200        } catch (IOException e) {
1201            throw new RuntimeException(e);
1202        }
1203    }
1204
1205    @Override
1206    public void beginTransaction(ConnectionContext context) throws IOException {
1207        throw new IOException("Not yet implemented.");
1208    }
1209    @Override
1210    public void commitTransaction(ConnectionContext context) throws IOException {
1211        throw new IOException("Not yet implemented.");
1212    }
1213    @Override
1214    public void rollbackTransaction(ConnectionContext context) throws IOException {
1215        throw new IOException("Not yet implemented.");
1216    }
1217
1218    @Override
1219    public void checkpoint(boolean sync) throws IOException {
1220        super.checkpointCleanup(sync);
1221    }
1222
1223    // /////////////////////////////////////////////////////////////////
1224    // Internal helper methods.
1225    // /////////////////////////////////////////////////////////////////
1226
1227    /**
1228     * @param location
1229     * @return
1230     * @throws IOException
1231     */
1232    Message loadMessage(Location location) throws IOException {
1233        try {
1234            JournalCommand<?> command = load(location);
1235            KahaAddMessageCommand addMessage = null;
1236            switch (command.type()) {
1237                case KAHA_UPDATE_MESSAGE_COMMAND:
1238                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1239                    break;
1240                default:
1241                    addMessage = (KahaAddMessageCommand) command;
1242            }
1243            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1244            return msg;
1245        } catch (IOException ioe) {
1246            LOG.error("Failed to load message at: {}", location , ioe);
1247            brokerService.handleIOException(ioe);
1248            throw ioe;
1249        }
1250    }
1251
1252    // /////////////////////////////////////////////////////////////////
1253    // Internal conversion methods.
1254    // /////////////////////////////////////////////////////////////////
1255
1256    KahaLocation convert(Location location) {
1257        KahaLocation rc = new KahaLocation();
1258        rc.setLogId(location.getDataFileId());
1259        rc.setOffset(location.getOffset());
1260        return rc;
1261    }
1262
1263    KahaDestination convert(ActiveMQDestination dest) {
1264        KahaDestination rc = new KahaDestination();
1265        rc.setName(dest.getPhysicalName());
1266        switch (dest.getDestinationType()) {
1267        case ActiveMQDestination.QUEUE_TYPE:
1268            rc.setType(DestinationType.QUEUE);
1269            return rc;
1270        case ActiveMQDestination.TOPIC_TYPE:
1271            rc.setType(DestinationType.TOPIC);
1272            return rc;
1273        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1274            rc.setType(DestinationType.TEMP_QUEUE);
1275            return rc;
1276        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1277            rc.setType(DestinationType.TEMP_TOPIC);
1278            return rc;
1279        default:
1280            return null;
1281        }
1282    }
1283
1284    ActiveMQDestination convert(String dest) {
1285        int p = dest.indexOf(":");
1286        if (p < 0) {
1287            throw new IllegalArgumentException("Not in the valid destination format");
1288        }
1289        int type = Integer.parseInt(dest.substring(0, p));
1290        String name = dest.substring(p + 1);
1291        return convert(type, name);
1292    }
1293
1294    private ActiveMQDestination convert(KahaDestination commandDestination) {
1295        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1296    }
1297
1298    private ActiveMQDestination convert(int type, String name) {
1299        switch (KahaDestination.DestinationType.valueOf(type)) {
1300        case QUEUE:
1301            return new ActiveMQQueue(name);
1302        case TOPIC:
1303            return new ActiveMQTopic(name);
1304        case TEMP_QUEUE:
1305            return new ActiveMQTempQueue(name);
1306        case TEMP_TOPIC:
1307            return new ActiveMQTempTopic(name);
1308        default:
1309            throw new IllegalArgumentException("Not in the valid destination format");
1310        }
1311    }
1312
1313    public TransactionIdTransformer getTransactionIdTransformer() {
1314        return transactionIdTransformer;
1315    }
1316
1317    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1318        this.transactionIdTransformer = transactionIdTransformer;
1319    }
1320
1321    static class AsyncJobKey {
1322        MessageId id;
1323        ActiveMQDestination destination;
1324
1325        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1326            this.id = id;
1327            this.destination = destination;
1328        }
1329
1330        @Override
1331        public boolean equals(Object obj) {
1332            if (obj == this) {
1333                return true;
1334            }
1335            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1336                    && destination.equals(((AsyncJobKey) obj).destination);
1337        }
1338
1339        @Override
1340        public int hashCode() {
1341            return id.hashCode() + destination.hashCode();
1342        }
1343
1344        @Override
1345        public String toString() {
1346            return destination.getPhysicalName() + "-" + id;
1347        }
1348    }
1349
1350    public interface StoreTask {
1351        public boolean cancel();
1352
1353        public void aquireLocks();
1354
1355        public void releaseLocks();
1356    }
1357
1358    class StoreQueueTask implements Runnable, StoreTask {
1359        protected final Message message;
1360        protected final ConnectionContext context;
1361        protected final KahaDBMessageStore store;
1362        protected final InnerFutureTask future;
1363        protected final AtomicBoolean done = new AtomicBoolean();
1364        protected final AtomicBoolean locked = new AtomicBoolean();
1365
1366        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1367            this.store = store;
1368            this.context = context;
1369            this.message = message;
1370            this.future = new InnerFutureTask(this);
1371        }
1372
1373        public ListenableFuture<Object> getFuture() {
1374            return this.future;
1375        }
1376
1377        @Override
1378        public boolean cancel() {
1379            if (this.done.compareAndSet(false, true)) {
1380                return this.future.cancel(false);
1381            }
1382            return false;
1383        }
1384
1385        @Override
1386        public void aquireLocks() {
1387            if (this.locked.compareAndSet(false, true)) {
1388                try {
1389                    globalQueueSemaphore.acquire();
1390                    store.acquireLocalAsyncLock();
1391                    message.incrementReferenceCount();
1392                } catch (InterruptedException e) {
1393                    LOG.warn("Failed to aquire lock", e);
1394                }
1395            }
1396
1397        }
1398
1399        @Override
1400        public void releaseLocks() {
1401            if (this.locked.compareAndSet(true, false)) {
1402                store.releaseLocalAsyncLock();
1403                globalQueueSemaphore.release();
1404                message.decrementReferenceCount();
1405            }
1406        }
1407
1408        @Override
1409        public void run() {
1410            this.store.doneTasks++;
1411            try {
1412                if (this.done.compareAndSet(false, true)) {
1413                    this.store.addMessage(context, message);
1414                    removeQueueTask(this.store, this.message.getMessageId());
1415                    this.future.complete();
1416                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1417                    System.err.println(this.store.dest.getName() + " cancelled: "
1418                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1419                    this.store.canceledTasks = this.store.doneTasks = 0;
1420                }
1421            } catch (Exception e) {
1422                this.future.setException(e);
1423            }
1424        }
1425
1426        protected Message getMessage() {
1427            return this.message;
1428        }
1429
1430        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1431
1432            private Runnable listener;
1433            public InnerFutureTask(Runnable runnable) {
1434                super(runnable, null);
1435
1436            }
1437
1438            public void setException(final Exception e) {
1439                super.setException(e);
1440            }
1441
1442            public void complete() {
1443                super.set(null);
1444            }
1445
1446            @Override
1447            public void done() {
1448                fireListener();
1449            }
1450
1451            @Override
1452            public void addListener(Runnable listener) {
1453                this.listener = listener;
1454                if (isDone()) {
1455                    fireListener();
1456                }
1457            }
1458
1459            private void fireListener() {
1460                if (listener != null) {
1461                    try {
1462                        listener.run();
1463                    } catch (Exception ignored) {
1464                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1465                    }
1466                }
1467            }
1468        }
1469    }
1470
1471    class StoreTopicTask extends StoreQueueTask {
1472        private final int subscriptionCount;
1473        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1474        private final KahaDBTopicMessageStore topicStore;
1475        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1476                int subscriptionCount) {
1477            super(store, context, message);
1478            this.topicStore = store;
1479            this.subscriptionCount = subscriptionCount;
1480
1481        }
1482
1483        @Override
1484        public void aquireLocks() {
1485            if (this.locked.compareAndSet(false, true)) {
1486                try {
1487                    globalTopicSemaphore.acquire();
1488                    store.acquireLocalAsyncLock();
1489                    message.incrementReferenceCount();
1490                } catch (InterruptedException e) {
1491                    LOG.warn("Failed to aquire lock", e);
1492                }
1493            }
1494        }
1495
1496        @Override
1497        public void releaseLocks() {
1498            if (this.locked.compareAndSet(true, false)) {
1499                message.decrementReferenceCount();
1500                store.releaseLocalAsyncLock();
1501                globalTopicSemaphore.release();
1502            }
1503        }
1504
1505        /**
1506         * add a key
1507         *
1508         * @param key
1509         * @return true if all acknowledgements received
1510         */
1511        public boolean addSubscriptionKey(String key) {
1512            synchronized (this.subscriptionKeys) {
1513                this.subscriptionKeys.add(key);
1514            }
1515            return this.subscriptionKeys.size() >= this.subscriptionCount;
1516        }
1517
1518        @Override
1519        public void run() {
1520            this.store.doneTasks++;
1521            try {
1522                if (this.done.compareAndSet(false, true)) {
1523                    this.topicStore.addMessage(context, message);
1524                    // apply any acks we have
1525                    synchronized (this.subscriptionKeys) {
1526                        for (String key : this.subscriptionKeys) {
1527                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1528
1529                        }
1530                    }
1531                    removeTopicTask(this.topicStore, this.message.getMessageId());
1532                    this.future.complete();
1533                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1534                    System.err.println(this.store.dest.getName() + " cancelled: "
1535                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1536                    this.store.canceledTasks = this.store.doneTasks = 0;
1537                }
1538            } catch (Exception e) {
1539                this.future.setException(e);
1540            }
1541        }
1542    }
1543
1544    public class StoreTaskExecutor extends ThreadPoolExecutor {
1545
1546        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1547            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1548        }
1549
1550        @Override
1551        protected void afterExecute(Runnable runnable, Throwable throwable) {
1552            super.afterExecute(runnable, throwable);
1553
1554            if (runnable instanceof StoreTask) {
1555               ((StoreTask)runnable).releaseLocks();
1556            }
1557        }
1558    }
1559
1560    @Override
1561    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1562        return new JobSchedulerStoreImpl();
1563    }
1564}