001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InterruptedIOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.OutputStream;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.Date;
035import java.util.HashMap;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.LinkedHashMap;
039import java.util.LinkedHashSet;
040import java.util.List;
041import java.util.Map;
042import java.util.Map.Entry;
043import java.util.Set;
044import java.util.SortedSet;
045import java.util.Stack;
046import java.util.TreeMap;
047import java.util.TreeSet;
048import java.util.concurrent.atomic.AtomicBoolean;
049import java.util.concurrent.atomic.AtomicLong;
050import java.util.concurrent.locks.ReentrantReadWriteLock;
051
052import org.apache.activemq.ActiveMQMessageAuditNoSync;
053import org.apache.activemq.broker.BrokerService;
054import org.apache.activemq.broker.BrokerServiceAware;
055import org.apache.activemq.command.MessageAck;
056import org.apache.activemq.command.SubscriptionInfo;
057import org.apache.activemq.command.TransactionId;
058import org.apache.activemq.protobuf.Buffer;
059import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
060import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
061import org.apache.activemq.store.kahadb.data.KahaDestination;
062import org.apache.activemq.store.kahadb.data.KahaEntryType;
063import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
064import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
065import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
066import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
067import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
068import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
069import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
070import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
071import org.apache.activemq.util.Callback;
072import org.apache.activemq.util.IOHelper;
073import org.apache.activemq.util.ServiceStopper;
074import org.apache.activemq.util.ServiceSupport;
075import org.apache.kahadb.index.BTreeIndex;
076import org.apache.kahadb.index.BTreeVisitor;
077import org.apache.kahadb.index.ListIndex;
078import org.apache.kahadb.journal.DataFile;
079import org.apache.kahadb.journal.Journal;
080import org.apache.kahadb.journal.Location;
081import org.apache.kahadb.page.Page;
082import org.apache.kahadb.page.PageFile;
083import org.apache.kahadb.page.Transaction;
084import org.apache.kahadb.util.ByteSequence;
085import org.apache.kahadb.util.DataByteArrayInputStream;
086import org.apache.kahadb.util.DataByteArrayOutputStream;
087import org.apache.kahadb.util.LocationMarshaller;
088import org.apache.kahadb.util.LockFile;
089import org.apache.kahadb.util.LongMarshaller;
090import org.apache.kahadb.util.Marshaller;
091import org.apache.kahadb.util.Sequence;
092import org.apache.kahadb.util.SequenceSet;
093import org.apache.kahadb.util.StringMarshaller;
094import org.apache.kahadb.util.VariableMarshaller;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097
098public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099
100    protected BrokerService brokerService;
101
102    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105    protected static final Buffer UNMATCHED;
106    static {
107        UNMATCHED = new Buffer(new byte[]{});
108    }
109    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110    private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111
112    static final int CLOSED_STATE = 1;
113    static final int OPEN_STATE = 2;
114    static final long NOT_ACKED = -1;
115
116    static final int VERSION = 4;
117
118    protected class Metadata {
119        protected Page<Metadata> page;
120        protected int state;
121        protected BTreeIndex<String, StoredDestination> destinations;
122        protected Location lastUpdate;
123        protected Location firstInProgressTransactionLocation;
124        protected Location producerSequenceIdTrackerLocation = null;
125        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126        protected int version = VERSION;
127        public void read(DataInput is) throws IOException {
128            state = is.readInt();
129            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130            if (is.readBoolean()) {
131                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132            } else {
133                lastUpdate = null;
134            }
135            if (is.readBoolean()) {
136                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137            } else {
138                firstInProgressTransactionLocation = null;
139            }
140            try {
141                if (is.readBoolean()) {
142                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143                } else {
144                    producerSequenceIdTrackerLocation = null;
145                }
146            } catch (EOFException expectedOnUpgrade) {
147            }
148            try {
149               version = is.readInt();
150            } catch (EOFException expectedOnUpgrade) {
151                version=1;
152            }
153            LOG.info("KahaDB is version " + version);
154        }
155
156        public void write(DataOutput os) throws IOException {
157            os.writeInt(state);
158            os.writeLong(destinations.getPageId());
159
160            if (lastUpdate != null) {
161                os.writeBoolean(true);
162                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163            } else {
164                os.writeBoolean(false);
165            }
166
167            if (firstInProgressTransactionLocation != null) {
168                os.writeBoolean(true);
169                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170            } else {
171                os.writeBoolean(false);
172            }
173
174            if (producerSequenceIdTrackerLocation != null) {
175                os.writeBoolean(true);
176                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177            } else {
178                os.writeBoolean(false);
179            }
180            os.writeInt(VERSION);
181        }
182    }
183
184    class MetadataMarshaller extends VariableMarshaller<Metadata> {
185        public Metadata readPayload(DataInput dataIn) throws IOException {
186            Metadata rc = new Metadata();
187            rc.read(dataIn);
188            return rc;
189        }
190
191        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
192            object.write(dataOut);
193        }
194    }
195
196    protected PageFile pageFile;
197    protected Journal journal;
198    protected Metadata metadata = new Metadata();
199
200    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
201
202    protected boolean failIfDatabaseIsLocked;
203
204    protected boolean deleteAllMessages;
205    protected File directory = DEFAULT_DIRECTORY;
206    protected Thread checkpointThread;
207    protected boolean enableJournalDiskSyncs=true;
208    protected boolean archiveDataLogs;
209    protected File directoryArchive;
210    protected AtomicLong storeSize = new AtomicLong(0);
211    long checkpointInterval = 5*1000;
212    long cleanupInterval = 30*1000;
213    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
214    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
215    boolean enableIndexWriteAsync = false;
216    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
217
218    protected AtomicBoolean opened = new AtomicBoolean();
219    private LockFile lockFile;
220    private boolean ignoreMissingJournalfiles = false;
221    private int indexCacheSize = 10000;
222    private boolean checkForCorruptJournalFiles = false;
223    private boolean checksumJournalFiles = false;
224    private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
225    protected boolean forceRecoverIndex = false;
226    private final Object checkpointThreadLock = new Object();
227    private boolean rewriteOnRedelivery = false;
228    private boolean archiveCorruptedIndex = false;
229    private boolean useIndexLFRUEviction = false;
230    private float indexLFUEvictionFactor = 0.2f;
231    private boolean enableIndexDiskSyncs = true;
232    private boolean enableIndexRecoveryFile = true;
233    private boolean enableIndexPageCaching = true;
234
235    public MessageDatabase() {
236    }
237
238    @Override
239    public void doStart() throws Exception {
240        load();
241    }
242
243    @Override
244    public void doStop(ServiceStopper stopper) throws Exception {
245        unload();
246    }
247
248    private void loadPageFile() throws IOException {
249        this.indexLock.writeLock().lock();
250        try {
251            final PageFile pageFile = getPageFile();
252            pageFile.load();
253            pageFile.tx().execute(new Transaction.Closure<IOException>() {
254                public void execute(Transaction tx) throws IOException {
255                    if (pageFile.getPageCount() == 0) {
256                        // First time this is created.. Initialize the metadata
257                        Page<Metadata> page = tx.allocate();
258                        assert page.getPageId() == 0;
259                        page.set(metadata);
260                        metadata.page = page;
261                        metadata.state = CLOSED_STATE;
262                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
263
264                        tx.store(metadata.page, metadataMarshaller, true);
265                    } else {
266                        Page<Metadata> page = tx.load(0, metadataMarshaller);
267                        metadata = page.get();
268                        metadata.page = page;
269                    }
270                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
271                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
272                    metadata.destinations.load(tx);
273                }
274            });
275            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
276            // Perhaps we should just keep an index of file
277            storedDestinations.clear();
278            pageFile.tx().execute(new Transaction.Closure<IOException>() {
279                public void execute(Transaction tx) throws IOException {
280                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
281                        Entry<String, StoredDestination> entry = iterator.next();
282                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
283                        storedDestinations.put(entry.getKey(), sd);
284                    }
285                }
286            });
287            pageFile.flush();
288        } finally {
289            this.indexLock.writeLock().unlock();
290        }
291    }
292
293    private void startCheckpoint() {
294        if (checkpointInterval == 0 &&  cleanupInterval == 0) {
295            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
296            return;
297        }
298        synchronized (checkpointThreadLock) {
299            boolean start = false;
300            if (checkpointThread == null) {
301                start = true;
302            } else if (!checkpointThread.isAlive()) {
303                start = true;
304                LOG.info("KahaDB: Recovering checkpoint thread after death");
305            }
306            if (start) {
307                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
308                    @Override
309                    public void run() {
310                        try {
311                            long lastCleanup = System.currentTimeMillis();
312                            long lastCheckpoint = System.currentTimeMillis();
313                            // Sleep for a short time so we can periodically check
314                            // to see if we need to exit this thread.
315                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
316                            while (opened.get()) {
317                                Thread.sleep(sleepTime);
318                                long now = System.currentTimeMillis();
319                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
320                                    checkpointCleanup(true);
321                                    lastCleanup = now;
322                                    lastCheckpoint = now;
323                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
324                                    checkpointCleanup(false);
325                                    lastCheckpoint = now;
326                                }
327                            }
328                        } catch (InterruptedException e) {
329                            // Looks like someone really wants us to exit this thread...
330                        } catch (IOException ioe) {
331                            LOG.error("Checkpoint failed", ioe);
332                            brokerService.handleIOException(ioe);
333                        }
334                    }
335                };
336
337                checkpointThread.setDaemon(true);
338                checkpointThread.start();
339            }
340        }
341    }
342
343    public void open() throws IOException {
344        if( opened.compareAndSet(false, true) ) {
345            getJournal().start();
346            try {
347                loadPageFile();
348            } catch (Throwable t) {
349                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
350                if (LOG.isDebugEnabled()) {
351                    LOG.debug("Index load failure", t);
352                }
353                // try to recover index
354                try {
355                    pageFile.unload();
356                } catch (Exception ignore) {}
357                if (archiveCorruptedIndex) {
358                    pageFile.archive();
359                } else {
360                    pageFile.delete();
361                }
362                metadata = new Metadata();
363                pageFile = null;
364                loadPageFile();
365            }
366            startCheckpoint();
367            recover();
368        }
369    }
370
371    private void lock() throws IOException {
372
373        if (lockFile == null) {
374            File lockFileName = new File(directory, "lock");
375            lockFile = new LockFile(lockFileName, true);
376            if (failIfDatabaseIsLocked) {
377                lockFile.lock();
378            } else {
379                boolean locked = false;
380                while ((!isStopped()) && (!isStopping())) {
381                    try {
382                        lockFile.lock();
383                        locked = true;
384                        break;
385                    } catch (IOException e) {
386                        LOG.info("Database "
387                                + lockFileName
388                                + " is locked... waiting "
389                                + (getDatabaseLockedWaitDelay() / 1000)
390                                + " seconds for the database to be unlocked. Reason: "
391                                + e);
392                        try {
393                            Thread.sleep(getDatabaseLockedWaitDelay());
394                        } catch (InterruptedException e1) {
395                        }
396                    }
397                }
398                if (!locked) {
399                    throw new IOException("attempt to obtain lock aborted due to shutdown");
400                }
401            }
402        }
403    }
404
405    // for testing
406    public LockFile getLockFile() {
407        return lockFile;
408    }
409
410    public void load() throws IOException {
411        this.indexLock.writeLock().lock();
412        try {
413            lock();
414            if (deleteAllMessages) {
415                getJournal().start();
416                getJournal().delete();
417                getJournal().close();
418                journal = null;
419                getPageFile().delete();
420                LOG.info("Persistence store purged.");
421                deleteAllMessages = false;
422            }
423
424            open();
425            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
426        } finally {
427            this.indexLock.writeLock().unlock();
428        }
429    }
430
431    public void close() throws IOException, InterruptedException {
432        if( opened.compareAndSet(true, false)) {
433            try {
434                this.indexLock.writeLock().lock();
435                try {
436                    if (metadata.page != null) {
437                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
438                            public void execute(Transaction tx) throws IOException {
439                                checkpointUpdate(tx, true);
440                            }
441                        });
442                    }
443                    pageFile.unload();
444                    metadata = new Metadata();
445                } finally {
446                    this.indexLock.writeLock().unlock();
447                }
448                journal.close();
449                synchronized (checkpointThreadLock) {
450                    if (checkpointThread != null) {
451                        checkpointThread.join();
452                    }
453                }
454            } finally {
455                lockFile.unlock();
456                lockFile=null;
457            }
458        }
459    }
460
461    public void unload() throws IOException, InterruptedException {
462        this.indexLock.writeLock().lock();
463        try {
464            if( pageFile != null && pageFile.isLoaded() ) {
465                metadata.state = CLOSED_STATE;
466                metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
467
468                if (metadata.page != null) {
469                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
470                        public void execute(Transaction tx) throws IOException {
471                            tx.store(metadata.page, metadataMarshaller, true);
472                        }
473                    });
474                }
475            }
476        } finally {
477            this.indexLock.writeLock().unlock();
478        }
479        close();
480    }
481
482    // public for testing
483    @SuppressWarnings("rawtypes")
484    public Location getFirstInProgressTxLocation() {
485        Location l = null;
486        synchronized (inflightTransactions) {
487            if (!inflightTransactions.isEmpty()) {
488                for (List<Operation> ops : inflightTransactions.values()) {
489                    if (!ops.isEmpty()) {
490                        l = ops.get(0).getLocation();
491                        break;
492                    }
493                }
494            }
495            if (!preparedTransactions.isEmpty()) {
496                for (List<Operation> ops : preparedTransactions.values()) {
497                    if (!ops.isEmpty()) {
498                        Location t = ops.get(0).getLocation();
499                        if (l==null || t.compareTo(l) <= 0) {
500                            l = t;
501                        }
502                        break;
503                    }
504                }
505            }
506        }
507        return l;
508    }
509
510    /**
511     * Move all the messages that were in the journal into long term storage. We
512     * just replay and do a checkpoint.
513     *
514     * @throws IOException
515     * @throws IOException
516     * @throws IllegalStateException
517     */
518    private void recover() throws IllegalStateException, IOException {
519        this.indexLock.writeLock().lock();
520        try {
521
522            long start = System.currentTimeMillis();
523            Location producerAuditPosition = recoverProducerAudit();
524            Location lastIndoubtPosition = getRecoveryPosition();
525
526            Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
527
528            if (recoveryPosition != null) {
529                int redoCounter = 0;
530                LOG.info("Recovering from the journal ...");
531                while (recoveryPosition != null) {
532                    JournalCommand<?> message = load(recoveryPosition);
533                    metadata.lastUpdate = recoveryPosition;
534                    process(message, recoveryPosition, lastIndoubtPosition);
535                    redoCounter++;
536                    recoveryPosition = journal.getNextLocation(recoveryPosition);
537                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
538                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
539                     }
540                }
541                if (LOG.isInfoEnabled()) {
542                    long end = System.currentTimeMillis();
543                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
544                }
545            }
546
547            // We may have to undo some index updates.
548            pageFile.tx().execute(new Transaction.Closure<IOException>() {
549                public void execute(Transaction tx) throws IOException {
550                    recoverIndex(tx);
551                }
552            });
553
554            // rollback any recovered inflight local transactions
555            Set<TransactionId> toRollback = new HashSet<TransactionId>();
556            synchronized (inflightTransactions) {
557                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
558                    TransactionId id = it.next();
559                    if (id.isLocalTransaction()) {
560                        toRollback.add(id);
561                    }
562                }
563                for (TransactionId tx: toRollback) {
564                    if (LOG.isDebugEnabled()) {
565                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
566                    }
567                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
568                }
569            }
570        } finally {
571            this.indexLock.writeLock().unlock();
572        }
573    }
574
575    @SuppressWarnings("unused")
576    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
577        return TransactionIdConversion.convertToLocal(tx);
578    }
579
580    private Location minimum(Location producerAuditPosition,
581            Location lastIndoubtPosition) {
582        Location min = null;
583        if (producerAuditPosition != null) {
584            min = producerAuditPosition;
585            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
586                min = lastIndoubtPosition;
587            }
588        } else {
589            min = lastIndoubtPosition;
590        }
591        return min;
592    }
593
594    private Location recoverProducerAudit() throws IOException {
595        if (metadata.producerSequenceIdTrackerLocation != null) {
596            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
597            try {
598                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
599                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
600                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
601            } catch (Exception e) {
602                LOG.warn("Cannot recover message audit", e);
603                return journal.getNextLocation(null);
604            }
605        } else {
606            // got no audit stored so got to recreate via replay from start of the journal
607            return journal.getNextLocation(null);
608        }
609    }
610
611    protected void recoverIndex(Transaction tx) throws IOException {
612        long start = System.currentTimeMillis();
613        // It is possible index updates got applied before the journal updates..
614        // in that case we need to removed references to messages that are not in the journal
615        final Location lastAppendLocation = journal.getLastAppendLocation();
616        long undoCounter=0;
617
618        // Go through all the destinations to see if they have messages past the lastAppendLocation
619        for (StoredDestination sd : storedDestinations.values()) {
620
621            final ArrayList<Long> matches = new ArrayList<Long>();
622            // Find all the Locations that are >= than the last Append Location.
623            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
624                @Override
625                protected void matched(Location key, Long value) {
626                    matches.add(value);
627                }
628            });
629
630            for (Long sequenceId : matches) {
631                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
632                sd.locationIndex.remove(tx, keys.location);
633                sd.messageIdIndex.remove(tx, keys.messageId);
634                metadata.producerSequenceIdTracker.rollback(keys.messageId);
635                undoCounter++;
636                // TODO: do we need to modify the ack positions for the pub sub case?
637            }
638        }
639
640        if( undoCounter > 0 ) {
641            // The rolledback operations are basically in flight journal writes.  To avoid getting
642            // these the end user should do sync writes to the journal.
643            if (LOG.isInfoEnabled()) {
644                long end = System.currentTimeMillis();
645                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
646            }
647        }
648
649        undoCounter = 0;
650        start = System.currentTimeMillis();
651
652        // Lets be extra paranoid here and verify that all the datafiles being referenced
653        // by the indexes still exists.
654
655        final SequenceSet ss = new SequenceSet();
656        for (StoredDestination sd : storedDestinations.values()) {
657            // Use a visitor to cut down the number of pages that we load
658            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
659                int last=-1;
660
661                public boolean isInterestedInKeysBetween(Location first, Location second) {
662                    if( first==null ) {
663                        return !ss.contains(0, second.getDataFileId());
664                    } else if( second==null ) {
665                        return true;
666                    } else {
667                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
668                    }
669                }
670
671                public void visit(List<Location> keys, List<Long> values) {
672                    for (Location l : keys) {
673                        int fileId = l.getDataFileId();
674                        if( last != fileId ) {
675                            ss.add(fileId);
676                            last = fileId;
677                        }
678                    }
679                }
680
681            });
682        }
683        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
684        while (!ss.isEmpty()) {
685            missingJournalFiles.add((int) ss.removeFirst());
686        }
687        missingJournalFiles.removeAll(journal.getFileMap().keySet());
688
689        if (!missingJournalFiles.isEmpty()) {
690            if (LOG.isInfoEnabled()) {
691                LOG.info("Some journal files are missing: " + missingJournalFiles);
692            }
693        }
694
695        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
696        for (Integer missing : missingJournalFiles) {
697            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
698        }
699
700        if (checkForCorruptJournalFiles) {
701            Collection<DataFile> dataFiles = journal.getFileMap().values();
702            for (DataFile dataFile : dataFiles) {
703                int id = dataFile.getDataFileId();
704                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
705                Sequence seq = dataFile.getCorruptedBlocks().getHead();
706                while (seq != null) {
707                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
708                    seq = seq.getNext();
709                }
710            }
711        }
712
713        if (!missingPredicates.isEmpty()) {
714            for (StoredDestination sd : storedDestinations.values()) {
715
716                final ArrayList<Long> matches = new ArrayList<Long>();
717                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
718                    @Override
719                    protected void matched(Location key, Long value) {
720                        matches.add(value);
721                    }
722                });
723
724                // If somes message references are affected by the missing data files...
725                if (!matches.isEmpty()) {
726
727                    // We either 'gracefully' recover dropping the missing messages or
728                    // we error out.
729                    if( ignoreMissingJournalfiles ) {
730                        // Update the index to remove the references to the missing data
731                        for (Long sequenceId : matches) {
732                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
733                            sd.locationIndex.remove(tx, keys.location);
734                            sd.messageIdIndex.remove(tx, keys.messageId);
735                            undoCounter++;
736                            // TODO: do we need to modify the ack positions for the pub sub case?
737                        }
738
739                    } else {
740                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
741                    }
742                }
743            }
744        }
745
746        if( undoCounter > 0 ) {
747            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
748            // should do sync writes to the journal.
749            if (LOG.isInfoEnabled()) {
750                long end = System.currentTimeMillis();
751                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
752            }
753        }
754    }
755
756    private Location nextRecoveryPosition;
757    private Location lastRecoveryPosition;
758
759    public void incrementalRecover() throws IOException {
760        this.indexLock.writeLock().lock();
761        try {
762            if( nextRecoveryPosition == null ) {
763                if( lastRecoveryPosition==null ) {
764                    nextRecoveryPosition = getRecoveryPosition();
765                } else {
766                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
767                }
768            }
769            while (nextRecoveryPosition != null) {
770                lastRecoveryPosition = nextRecoveryPosition;
771                metadata.lastUpdate = lastRecoveryPosition;
772                JournalCommand<?> message = load(lastRecoveryPosition);
773                process(message, lastRecoveryPosition, (Runnable)null);
774                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
775            }
776        } finally {
777            this.indexLock.writeLock().unlock();
778        }
779    }
780
781    public Location getLastUpdatePosition() throws IOException {
782        return metadata.lastUpdate;
783    }
784
785    private Location getRecoveryPosition() throws IOException {
786
787        if (!this.forceRecoverIndex) {
788
789            // If we need to recover the transactions..
790            if (metadata.firstInProgressTransactionLocation != null) {
791                return metadata.firstInProgressTransactionLocation;
792            }
793
794            // Perhaps there were no transactions...
795            if( metadata.lastUpdate!=null) {
796                // Start replay at the record after the last one recorded in the index file.
797                return journal.getNextLocation(metadata.lastUpdate);
798            }
799        }
800        // This loads the first position.
801        return journal.getNextLocation(null);
802    }
803
804    protected void checkpointCleanup(final boolean cleanup) throws IOException {
805        long start;
806        this.indexLock.writeLock().lock();
807        try {
808            start = System.currentTimeMillis();
809            if( !opened.get() ) {
810                return;
811            }
812            pageFile.tx().execute(new Transaction.Closure<IOException>() {
813                public void execute(Transaction tx) throws IOException {
814                    checkpointUpdate(tx, cleanup);
815                }
816            });
817        } finally {
818            this.indexLock.writeLock().unlock();
819        }
820
821        long end = System.currentTimeMillis();
822        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
823            if (LOG.isInfoEnabled()) {
824                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
825            }
826        }
827    }
828
829    public void checkpoint(Callback closure) throws Exception {
830        this.indexLock.writeLock().lock();
831        try {
832            pageFile.tx().execute(new Transaction.Closure<IOException>() {
833                public void execute(Transaction tx) throws IOException {
834                    checkpointUpdate(tx, false);
835                }
836            });
837            closure.execute();
838        } finally {
839            this.indexLock.writeLock().unlock();
840        }
841    }
842
843    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
844        int size = data.serializedSizeFramed();
845        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
846        os.writeByte(data.type().getNumber());
847        data.writeFramed(os);
848        return os.toByteSequence();
849    }
850
851    // /////////////////////////////////////////////////////////////////
852    // Methods call by the broker to update and query the store.
853    // /////////////////////////////////////////////////////////////////
854    public Location store(JournalCommand<?> data) throws IOException {
855        return store(data, false, null,null);
856    }
857
858    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
859        return store(data, false, null,null, onJournalStoreComplete);
860    }
861
862    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
863        return store(data, sync, before, after, null);
864    }
865
866    /**
867     * All updated are are funneled through this method. The updates are converted
868     * to a JournalMessage which is logged to the journal and then the data from
869     * the JournalMessage is used to update the index just like it would be done
870     * during a recovery process.
871     */
872    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
873        if (before != null) {
874            before.run();
875        }
876        try {
877            ByteSequence sequence = toByteSequence(data);
878            long start = System.currentTimeMillis();
879            Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
880            long start2 = System.currentTimeMillis();
881            process(data, location, after);
882            long end = System.currentTimeMillis();
883            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
884                if (LOG.isInfoEnabled()) {
885                    LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
886                }
887            }
888
889            if (after != null) {
890                Runnable afterCompletion = null;
891                synchronized (orderedTransactionAfters) {
892                    if (!orderedTransactionAfters.empty()) {
893                        afterCompletion = orderedTransactionAfters.pop();
894                    }
895                }
896                if (afterCompletion != null) {
897                    afterCompletion.run();
898                } else {
899                    // non persistent message case
900                    after.run();
901                }
902            }
903
904            if (checkpointThread != null && !checkpointThread.isAlive()) {
905                startCheckpoint();
906            }
907            return location;
908        } catch (IOException ioe) {
909            LOG.error("KahaDB failed to store to Journal", ioe);
910            brokerService.handleIOException(ioe);
911            throw ioe;
912        }
913    }
914
915    /**
916     * Loads a previously stored JournalMessage
917     *
918     * @param location
919     * @return
920     * @throws IOException
921     */
922    public JournalCommand<?> load(Location location) throws IOException {
923        long start = System.currentTimeMillis();
924        ByteSequence data = journal.read(location);
925        long end = System.currentTimeMillis();
926        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
927            if (LOG.isInfoEnabled()) {
928                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
929            }
930        }
931        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
932        byte readByte = is.readByte();
933        KahaEntryType type = KahaEntryType.valueOf(readByte);
934        if( type == null ) {
935            throw new IOException("Could not load journal record. Invalid location: "+location);
936        }
937        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
938        message.mergeFramed(is);
939        return message;
940    }
941
942    /**
943     * do minimal recovery till we reach the last inDoubtLocation
944     * @param data
945     * @param location
946     * @param inDoubtlocation
947     * @throws IOException
948     */
949    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
950        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
951            process(data, location, (Runnable) null);
952        } else {
953            // just recover producer audit
954            data.visit(new Visitor() {
955                public void visit(KahaAddMessageCommand command) throws IOException {
956                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
957                }
958            });
959        }
960    }
961
962    // /////////////////////////////////////////////////////////////////
963    // Journaled record processing methods. Once the record is journaled,
964    // these methods handle applying the index updates. These may be called
965    // from the recovery method too so they need to be idempotent
966    // /////////////////////////////////////////////////////////////////
967
968    void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
969        data.visit(new Visitor() {
970            @Override
971            public void visit(KahaAddMessageCommand command) throws IOException {
972                process(command, location);
973            }
974
975            @Override
976            public void visit(KahaRemoveMessageCommand command) throws IOException {
977                process(command, location);
978            }
979
980            @Override
981            public void visit(KahaPrepareCommand command) throws IOException {
982                process(command, location);
983            }
984
985            @Override
986            public void visit(KahaCommitCommand command) throws IOException {
987                process(command, location, after);
988            }
989
990            @Override
991            public void visit(KahaRollbackCommand command) throws IOException {
992                process(command, location);
993            }
994
995            @Override
996            public void visit(KahaRemoveDestinationCommand command) throws IOException {
997                process(command, location);
998            }
999
1000            @Override
1001            public void visit(KahaSubscriptionCommand command) throws IOException {
1002                process(command, location);
1003            }
1004
1005            @Override
1006            public void visit(KahaProducerAuditCommand command) throws IOException {
1007                processLocation(location);
1008            }
1009
1010            @Override
1011            public void visit(KahaTraceCommand command) {
1012                processLocation(location);
1013            }
1014        });
1015    }
1016
1017    @SuppressWarnings("rawtypes")
1018    protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
1019        if (command.hasTransactionInfo()) {
1020            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1021            inflightTx.add(new AddOpperation(command, location));
1022        } else {
1023            this.indexLock.writeLock().lock();
1024            try {
1025                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1026                    public void execute(Transaction tx) throws IOException {
1027                        upadateIndex(tx, command, location);
1028                    }
1029                });
1030            } finally {
1031                this.indexLock.writeLock().unlock();
1032            }
1033        }
1034    }
1035
1036    @SuppressWarnings("rawtypes")
1037    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1038        if (command.hasTransactionInfo()) {
1039           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1040           inflightTx.add(new RemoveOpperation(command, location));
1041        } else {
1042            this.indexLock.writeLock().lock();
1043            try {
1044                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1045                    public void execute(Transaction tx) throws IOException {
1046                        updateIndex(tx, command, location);
1047                    }
1048                });
1049            } finally {
1050                this.indexLock.writeLock().unlock();
1051            }
1052        }
1053    }
1054
1055    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1056        this.indexLock.writeLock().lock();
1057        try {
1058            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1059                public void execute(Transaction tx) throws IOException {
1060                    updateIndex(tx, command, location);
1061                }
1062            });
1063        } finally {
1064            this.indexLock.writeLock().unlock();
1065        }
1066    }
1067
1068    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1069        this.indexLock.writeLock().lock();
1070        try {
1071            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1072                public void execute(Transaction tx) throws IOException {
1073                    updateIndex(tx, command, location);
1074                }
1075            });
1076        } finally {
1077            this.indexLock.writeLock().unlock();
1078        }
1079    }
1080
1081    protected void processLocation(final Location location) {
1082        this.indexLock.writeLock().lock();
1083        try {
1084            metadata.lastUpdate = location;
1085        } finally {
1086            this.indexLock.writeLock().unlock();
1087        }
1088    }
1089
1090    private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1091    private void push(Runnable after) {
1092        if (after != null) {
1093            synchronized (orderedTransactionAfters) {
1094                orderedTransactionAfters.push(after);
1095            }
1096        }
1097    }
1098
1099    @SuppressWarnings("rawtypes")
1100    protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1101        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1102        List<Operation> inflightTx;
1103        synchronized (inflightTransactions) {
1104            inflightTx = inflightTransactions.remove(key);
1105            if (inflightTx == null) {
1106                inflightTx = preparedTransactions.remove(key);
1107            }
1108        }
1109        if (inflightTx == null) {
1110            if (after != null) {
1111                // since we don't push this after and we may find another, lets run it now
1112                after.run();
1113            }
1114            return;
1115        }
1116
1117        final List<Operation> messagingTx = inflightTx;
1118        this.indexLock.writeLock().lock();
1119        try {
1120            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1121                public void execute(Transaction tx) throws IOException {
1122                    for (Operation op : messagingTx) {
1123                        op.execute(tx);
1124                    }
1125                }
1126            });
1127            metadata.lastUpdate = location;
1128            push(after);
1129        } finally {
1130            this.indexLock.writeLock().unlock();
1131        }
1132    }
1133
1134    @SuppressWarnings("rawtypes")
1135    protected void process(KahaPrepareCommand command, Location location) {
1136        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1137        synchronized (inflightTransactions) {
1138            List<Operation> tx = inflightTransactions.remove(key);
1139            if (tx != null) {
1140                preparedTransactions.put(key, tx);
1141            }
1142        }
1143    }
1144
1145    @SuppressWarnings("rawtypes")
1146    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1147        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1148        List<Operation> updates = null;
1149        synchronized (inflightTransactions) {
1150            updates = inflightTransactions.remove(key);
1151            if (updates == null) {
1152                updates = preparedTransactions.remove(key);
1153            }
1154        }
1155        if (isRewriteOnRedelivery()) {
1156            persistRedeliveryCount(updates);
1157        }
1158    }
1159
1160    @SuppressWarnings("rawtypes")
1161    private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
1162        if (updates != null) {
1163            for (Operation operation : updates) {
1164                operation.getCommand().visit(new Visitor() {
1165                    @Override
1166                    public void visit(KahaRemoveMessageCommand command) throws IOException {
1167                        incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1168                    }
1169                });
1170            }
1171        }
1172    }
1173
1174   abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1175
1176    // /////////////////////////////////////////////////////////////////
1177    // These methods do the actual index updates.
1178    // /////////////////////////////////////////////////////////////////
1179
1180    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1181    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1182
1183    void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1184        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1185
1186        // Skip adding the message to the index if this is a topic and there are
1187        // no subscriptions.
1188        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1189            return;
1190        }
1191
1192        // Add the message.
1193        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1194        long id = sd.orderIndex.getNextMessageId(priority);
1195        Long previous = sd.locationIndex.put(tx, location, id);
1196        if (previous == null) {
1197            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1198            if (previous == null) {
1199                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1200                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1201                    addAckLocationForNewMessage(tx, sd, id);
1202                }
1203            } else {
1204                // If the message ID as indexed, then the broker asked us to
1205                // store a DUP
1206                // message. Bad BOY! Don't do it, and log a warning.
1207                LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1208                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1209                sd.locationIndex.remove(tx, location);
1210                rollbackStatsOnDuplicate(command.getDestination());
1211            }
1212        } else {
1213            // restore the previous value.. Looks like this was a redo of a
1214            // previously
1215            // added message. We don't want to assign it a new id as the other
1216            // indexes would
1217            // be wrong..
1218            //
1219            sd.locationIndex.put(tx, location, previous);
1220        }
1221        // record this id in any event, initial send or recovery
1222        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1223        metadata.lastUpdate = location;
1224    }
1225
1226    abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1227
1228    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1229        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1230        if (!command.hasSubscriptionKey()) {
1231
1232            // In the queue case we just remove the message from the index..
1233            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1234            if (sequenceId != null) {
1235                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1236                if (keys != null) {
1237                    sd.locationIndex.remove(tx, keys.location);
1238                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1239                }  else if (LOG.isDebugEnabled()) {
1240                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1241                }
1242            } else if (LOG.isDebugEnabled()) {
1243                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1244            }
1245        } else {
1246            // In the topic case we need remove the message once it's been acked
1247            // by all the subs
1248            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1249
1250            // Make sure it's a valid message id...
1251            if (sequence != null) {
1252                String subscriptionKey = command.getSubscriptionKey();
1253                if (command.getAck() != UNMATCHED) {
1254                    sd.orderIndex.get(tx, sequence);
1255                    byte priority = sd.orderIndex.lastGetPriority();
1256                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1257                }
1258                // The following method handles deleting un-referenced messages.
1259                removeAckLocation(tx, sd, subscriptionKey, sequence);
1260            } else if (LOG.isDebugEnabled()) {
1261                LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1262            }
1263
1264        }
1265        metadata.lastUpdate = ackLocation;
1266    }
1267
1268    Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1269    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1270        Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1271        if (referenceFileIds == null) {
1272            referenceFileIds = new HashSet<Integer>();
1273            referenceFileIds.add(messageLocation.getDataFileId());
1274            ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1275        } else {
1276            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1277            if (!referenceFileIds.contains(id)) {
1278                referenceFileIds.add(id);
1279            }
1280        }
1281    }
1282
1283    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1284        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1285        sd.orderIndex.remove(tx);
1286
1287        sd.locationIndex.clear(tx);
1288        sd.locationIndex.unload(tx);
1289        tx.free(sd.locationIndex.getPageId());
1290
1291        sd.messageIdIndex.clear(tx);
1292        sd.messageIdIndex.unload(tx);
1293        tx.free(sd.messageIdIndex.getPageId());
1294
1295        if (sd.subscriptions != null) {
1296            sd.subscriptions.clear(tx);
1297            sd.subscriptions.unload(tx);
1298            tx.free(sd.subscriptions.getPageId());
1299
1300            sd.subscriptionAcks.clear(tx);
1301            sd.subscriptionAcks.unload(tx);
1302            tx.free(sd.subscriptionAcks.getPageId());
1303
1304            sd.ackPositions.clear(tx);
1305            sd.ackPositions.unload(tx);
1306            tx.free(sd.ackPositions.getHeadPageId());
1307        }
1308
1309        String key = key(command.getDestination());
1310        storedDestinations.remove(key);
1311        metadata.destinations.remove(tx, key);
1312    }
1313
1314    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1315        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1316        final String subscriptionKey = command.getSubscriptionKey();
1317
1318        // If set then we are creating it.. otherwise we are destroying the sub
1319        if (command.hasSubscriptionInfo()) {
1320            sd.subscriptions.put(tx, subscriptionKey, command);
1321            long ackLocation=NOT_ACKED;
1322            if (!command.getRetroactive()) {
1323                ackLocation = sd.orderIndex.nextMessageId-1;
1324            } else {
1325                addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
1326            }
1327            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1328            sd.subscriptionCache.add(subscriptionKey);
1329        } else {
1330            // delete the sub...
1331            sd.subscriptions.remove(tx, subscriptionKey);
1332            sd.subscriptionAcks.remove(tx, subscriptionKey);
1333            sd.subscriptionCache.remove(subscriptionKey);
1334            removeAckLocationsForSub(tx, sd, subscriptionKey);
1335
1336            if (sd.subscriptions.isEmpty(tx)) {
1337                sd.messageIdIndex.clear(tx);
1338                sd.locationIndex.clear(tx);
1339                sd.orderIndex.clear(tx);
1340            }
1341        }
1342    }
1343
1344    /**
1345     * @param tx
1346     * @throws IOException
1347     */
1348    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1349        LOG.debug("Checkpoint started.");
1350
1351        // reflect last update exclusive of current checkpoint
1352        Location firstTxLocation = metadata.lastUpdate;
1353
1354        metadata.state = OPEN_STATE;
1355        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1356        metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1357        tx.store(metadata.page, metadataMarshaller, true);
1358        pageFile.flush();
1359
1360        if( cleanup ) {
1361
1362            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1363            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1364
1365            if (LOG.isTraceEnabled()) {
1366                LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1367            }
1368
1369            // Don't GC files under replication
1370            if( journalFilesBeingReplicated!=null ) {
1371                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1372            }
1373
1374            if (metadata.producerSequenceIdTrackerLocation != null) {
1375                gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1376            }
1377
1378            // Don't GC files after the first in progress tx
1379            if( metadata.firstInProgressTransactionLocation!=null ) {
1380                if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1381                    firstTxLocation = metadata.firstInProgressTransactionLocation;
1382                }
1383            }
1384
1385            if( firstTxLocation!=null ) {
1386                while( !gcCandidateSet.isEmpty() ) {
1387                    Integer last = gcCandidateSet.last();
1388                    if( last >= firstTxLocation.getDataFileId() ) {
1389                        gcCandidateSet.remove(last);
1390                    } else {
1391                        break;
1392                    }
1393                }
1394                if (LOG.isTraceEnabled()) {
1395                    LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1396                }
1397            }
1398
1399            // Go through all the destinations to see if any of them can remove GC candidates.
1400            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1401                if( gcCandidateSet.isEmpty() ) {
1402                    break;
1403                }
1404
1405                // Use a visitor to cut down the number of pages that we load
1406                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1407                    int last=-1;
1408                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1409                        if( first==null ) {
1410                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1411                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1412                                subset.remove(second.getDataFileId());
1413                            }
1414                            return !subset.isEmpty();
1415                        } else if( second==null ) {
1416                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1417                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1418                                subset.remove(first.getDataFileId());
1419                            }
1420                            return !subset.isEmpty();
1421                        } else {
1422                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1423                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1424                                subset.remove(first.getDataFileId());
1425                            }
1426                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1427                                subset.remove(second.getDataFileId());
1428                            }
1429                            return !subset.isEmpty();
1430                        }
1431                    }
1432
1433                    public void visit(List<Location> keys, List<Long> values) {
1434                        for (Location l : keys) {
1435                            int fileId = l.getDataFileId();
1436                            if( last != fileId ) {
1437                                gcCandidateSet.remove(fileId);
1438                                last = fileId;
1439                            }
1440                        }
1441                    }
1442                });
1443                if (LOG.isTraceEnabled()) {
1444                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1445                }
1446            }
1447
1448            // check we are not deleting file with ack for in-use journal files
1449            if (LOG.isTraceEnabled()) {
1450                LOG.trace("gc candidates: " + gcCandidateSet);
1451            }
1452            final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1453            Iterator<Integer> candidates = gcCandidateSet.iterator();
1454            while (candidates.hasNext()) {
1455                Integer candidate = candidates.next();
1456                Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1457                if (referencedFileIds != null) {
1458                    for (Integer referencedFileId : referencedFileIds) {
1459                        if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1460                            // active file that is not targeted for deletion is referenced so don't delete
1461                            candidates.remove();
1462                            break;
1463                        }
1464                    }
1465                    if (gcCandidateSet.contains(candidate)) {
1466                        ackMessageFileMap.remove(candidate);
1467                    } else {
1468                        if (LOG.isTraceEnabled()) {
1469                            LOG.trace("not removing data file: " + candidate
1470                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1471                        }
1472                    }
1473                }
1474            }
1475
1476            if (!gcCandidateSet.isEmpty()) {
1477                if (LOG.isDebugEnabled()) {
1478                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1479                }
1480                journal.removeDataFiles(gcCandidateSet);
1481            }
1482        }
1483
1484        LOG.debug("Checkpoint done.");
1485    }
1486
1487    final Runnable nullCompletionCallback = new Runnable() {
1488        @Override
1489        public void run() {
1490        }
1491    };
1492    private Location checkpointProducerAudit() throws IOException {
1493        ByteArrayOutputStream baos = new ByteArrayOutputStream();
1494        ObjectOutputStream oout = new ObjectOutputStream(baos);
1495        oout.writeObject(metadata.producerSequenceIdTracker);
1496        oout.flush();
1497        oout.close();
1498        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1499        Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1500        try {
1501            location.getLatch().await();
1502        } catch (InterruptedException e) {
1503            throw new InterruptedIOException(e.toString());
1504        }
1505        return location;
1506    }
1507
1508    public HashSet<Integer> getJournalFilesBeingReplicated() {
1509        return journalFilesBeingReplicated;
1510    }
1511
1512    // /////////////////////////////////////////////////////////////////
1513    // StoredDestination related implementation methods.
1514    // /////////////////////////////////////////////////////////////////
1515
1516    private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1517
1518    class StoredSubscription {
1519        SubscriptionInfo subscriptionInfo;
1520        String lastAckId;
1521        Location lastAckLocation;
1522        Location cursor;
1523    }
1524
1525    static class MessageKeys {
1526        final String messageId;
1527        final Location location;
1528
1529        public MessageKeys(String messageId, Location location) {
1530            this.messageId=messageId;
1531            this.location=location;
1532        }
1533
1534        @Override
1535        public String toString() {
1536            return "["+messageId+","+location+"]";
1537        }
1538    }
1539
1540    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1541        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1542
1543        public MessageKeys readPayload(DataInput dataIn) throws IOException {
1544            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1545        }
1546
1547        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1548            dataOut.writeUTF(object.messageId);
1549            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1550        }
1551    }
1552
1553    class LastAck {
1554        long lastAckedSequence;
1555        byte priority;
1556
1557        public LastAck(LastAck source) {
1558            this.lastAckedSequence = source.lastAckedSequence;
1559            this.priority = source.priority;
1560        }
1561
1562        public LastAck() {
1563            this.priority = MessageOrderIndex.HI;
1564        }
1565
1566        public LastAck(long ackLocation) {
1567            this.lastAckedSequence = ackLocation;
1568            this.priority = MessageOrderIndex.LO;
1569        }
1570
1571        public LastAck(long ackLocation, byte priority) {
1572            this.lastAckedSequence = ackLocation;
1573            this.priority = priority;
1574        }
1575
1576        public String toString() {
1577            return "[" + lastAckedSequence + ":" + priority + "]";
1578        }
1579    }
1580
1581    protected class LastAckMarshaller implements Marshaller<LastAck> {
1582
1583        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1584            dataOut.writeLong(object.lastAckedSequence);
1585            dataOut.writeByte(object.priority);
1586        }
1587
1588        public LastAck readPayload(DataInput dataIn) throws IOException {
1589            LastAck lastAcked = new LastAck();
1590            lastAcked.lastAckedSequence = dataIn.readLong();
1591            if (metadata.version >= 3) {
1592                lastAcked.priority = dataIn.readByte();
1593            }
1594            return lastAcked;
1595        }
1596
1597        public int getFixedSize() {
1598            return 9;
1599        }
1600
1601        public LastAck deepCopy(LastAck source) {
1602            return new LastAck(source);
1603        }
1604
1605        public boolean isDeepCopySupported() {
1606            return true;
1607        }
1608    }
1609
1610    class StoredDestination {
1611
1612        MessageOrderIndex orderIndex = new MessageOrderIndex();
1613        BTreeIndex<Location, Long> locationIndex;
1614        BTreeIndex<String, Long> messageIdIndex;
1615
1616        // These bits are only set for Topics
1617        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1618        BTreeIndex<String, LastAck> subscriptionAcks;
1619        HashMap<String, MessageOrderCursor> subscriptionCursors;
1620        ListIndex<String, SequenceSet> ackPositions;
1621
1622        // Transient data used to track which Messages are no longer needed.
1623        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1624        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1625    }
1626
1627    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1628
1629        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1630            final StoredDestination value = new StoredDestination();
1631            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1632            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1633            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1634
1635            if (dataIn.readBoolean()) {
1636                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1637                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1638                if (metadata.version >= 4) {
1639                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1640                } else {
1641                    // upgrade
1642                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1643                        public void execute(Transaction tx) throws IOException {
1644                            BTreeIndex<Long, HashSet<String>> oldAckPositions =
1645                                new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1646                            oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1647                            oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1648                            oldAckPositions.load(tx);
1649
1650                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1651
1652                            // Do the initial build of the data in memory before writing into the store
1653                            // based Ack Positions List to avoid a lot of disk thrashing.
1654                            Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1655                            while (iterator.hasNext()) {
1656                                Entry<Long, HashSet<String>> entry = iterator.next();
1657
1658                                for(String subKey : entry.getValue()) {
1659                                    SequenceSet pendingAcks = temp.get(subKey);
1660                                    if (pendingAcks == null) {
1661                                        pendingAcks = new SequenceSet();
1662                                        temp.put(subKey, pendingAcks);
1663                                    }
1664
1665                                    pendingAcks.add(entry.getKey());
1666                                }
1667                            }
1668
1669                            // Now move the pending messages to ack data into the store backed
1670                            // structure.
1671                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1672                            for(String subscriptionKey : temp.keySet()) {
1673                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1674                            }
1675
1676                        }
1677                    });
1678                }
1679            }
1680            if (metadata.version >= 2) {
1681                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1682                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1683            } else {
1684                    // upgrade
1685                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1686                        public void execute(Transaction tx) throws IOException {
1687                            value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1688                            value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1689                            value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1690                            value.orderIndex.lowPriorityIndex.load(tx);
1691
1692                            value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1693                            value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1694                            value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1695                            value.orderIndex.highPriorityIndex.load(tx);
1696                        }
1697                    });
1698            }
1699
1700            return value;
1701        }
1702
1703        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1704            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1705            dataOut.writeLong(value.locationIndex.getPageId());
1706            dataOut.writeLong(value.messageIdIndex.getPageId());
1707            if (value.subscriptions != null) {
1708                dataOut.writeBoolean(true);
1709                dataOut.writeLong(value.subscriptions.getPageId());
1710                dataOut.writeLong(value.subscriptionAcks.getPageId());
1711                dataOut.writeLong(value.ackPositions.getHeadPageId());
1712            } else {
1713                dataOut.writeBoolean(false);
1714            }
1715            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1716            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1717        }
1718    }
1719
1720    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1721        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1722
1723        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1724            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1725            rc.mergeFramed((InputStream)dataIn);
1726            return rc;
1727        }
1728
1729        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1730            object.writeFramed((OutputStream)dataOut);
1731        }
1732    }
1733
1734    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1735        String key = key(destination);
1736        StoredDestination rc = storedDestinations.get(key);
1737        if (rc == null) {
1738            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1739            rc = loadStoredDestination(tx, key, topic);
1740            // Cache it. We may want to remove/unload destinations from the
1741            // cache that are not used for a while
1742            // to reduce memory usage.
1743            storedDestinations.put(key, rc);
1744        }
1745        return rc;
1746    }
1747
1748    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1749        String key = key(destination);
1750        StoredDestination rc = storedDestinations.get(key);
1751        if (rc == null && metadata.destinations.containsKey(tx, key)) {
1752            rc = getStoredDestination(destination, tx);
1753        }
1754        return rc;
1755    }
1756
1757    /**
1758     * @param tx
1759     * @param key
1760     * @param topic
1761     * @return
1762     * @throws IOException
1763     */
1764    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1765        // Try to load the existing indexes..
1766        StoredDestination rc = metadata.destinations.get(tx, key);
1767        if (rc == null) {
1768            // Brand new destination.. allocate indexes for it.
1769            rc = new StoredDestination();
1770            rc.orderIndex.allocate(tx);
1771            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1772            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1773
1774            if (topic) {
1775                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1776                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1777                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1778            }
1779            metadata.destinations.put(tx, key, rc);
1780        }
1781
1782        // Configure the marshalers and load.
1783        rc.orderIndex.load(tx);
1784
1785        // Figure out the next key using the last entry in the destination.
1786        rc.orderIndex.configureLast(tx);
1787
1788        rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
1789        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1790        rc.locationIndex.load(tx);
1791
1792        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1793        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1794        rc.messageIdIndex.load(tx);
1795
1796        // If it was a topic...
1797        if (topic) {
1798
1799            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1800            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1801            rc.subscriptions.load(tx);
1802
1803            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1804            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1805            rc.subscriptionAcks.load(tx);
1806
1807            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1808            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1809            rc.ackPositions.load(tx);
1810
1811            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1812
1813            if (metadata.version < 3) {
1814
1815                // on upgrade need to fill ackLocation with available messages past last ack
1816                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1817                    Entry<String, LastAck> entry = iterator.next();
1818                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1819                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1820                        Long sequence = orderIterator.next().getKey();
1821                        addAckLocation(tx, rc, sequence, entry.getKey());
1822                    }
1823                    // modify so it is upgraded
1824                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1825                }
1826            }
1827
1828            // Configure the message references index
1829            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1830            while (subscriptions.hasNext()) {
1831                Entry<String, SequenceSet> subscription = subscriptions.next();
1832                SequenceSet pendingAcks = subscription.getValue();
1833                if (pendingAcks != null && !pendingAcks.isEmpty()) {
1834                    Long lastPendingAck = pendingAcks.getTail().getLast();
1835                    for(Long sequenceId : pendingAcks) {
1836                        Long current = rc.messageReferences.get(sequenceId);
1837                        if (current == null) {
1838                            current = new Long(0);
1839                        }
1840
1841                        // We always add a trailing empty entry for the next position to start from
1842                        // so we need to ensure we don't count that as a message reference on reload.
1843                        if (!sequenceId.equals(lastPendingAck)) {
1844                            current = current.longValue() + 1;
1845                        }
1846
1847                        rc.messageReferences.put(sequenceId, current);
1848                    }
1849                }
1850            }
1851
1852            // Configure the subscription cache
1853            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1854                Entry<String, LastAck> entry = iterator.next();
1855                rc.subscriptionCache.add(entry.getKey());
1856            }
1857
1858            if (rc.orderIndex.nextMessageId == 0) {
1859                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1860                if (!rc.subscriptionAcks.isEmpty(tx)) {
1861                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1862                        Entry<String, LastAck> entry = iterator.next();
1863                        rc.orderIndex.nextMessageId =
1864                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1865                    }
1866                }
1867            } else {
1868                // update based on ackPositions for unmatched, last entry is always the next
1869                if (!rc.messageReferences.isEmpty()) {
1870                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1871                    rc.orderIndex.nextMessageId =
1872                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1873                }
1874            }
1875        }
1876
1877        if (metadata.version < VERSION) {
1878            // store again after upgrade
1879            metadata.destinations.put(tx, key, rc);
1880        }
1881        return rc;
1882    }
1883
1884    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1885        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1886        if (sequences == null) {
1887            sequences = new SequenceSet();
1888            sequences.add(messageSequence);
1889            sd.ackPositions.add(tx, subscriptionKey, sequences);
1890        } else {
1891            sequences.add(messageSequence);
1892            sd.ackPositions.put(tx, subscriptionKey, sequences);
1893        }
1894
1895        Long count = sd.messageReferences.get(messageSequence);
1896        if (count == null) {
1897            count = Long.valueOf(0L);
1898        }
1899        count = count.longValue() + 1;
1900        sd.messageReferences.put(messageSequence, count);
1901    }
1902
1903    // new sub is interested in potentially all existing messages
1904    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1905        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1906        if (sequences == null) {
1907            sequences = new SequenceSet();
1908            sequences.add(messageSequence);
1909            sd.ackPositions.add(tx, subscriptionKey, sequences);
1910        } else {
1911            sequences.add(messageSequence);
1912            sd.ackPositions.put(tx, subscriptionKey, sequences);
1913        }
1914
1915        Long count = sd.messageReferences.get(messageSequence);
1916        if (count == null) {
1917            count = Long.valueOf(0L);
1918        }
1919        count = count.longValue() + 1;
1920        sd.messageReferences.put(messageSequence, count);
1921    }
1922
1923    // on a new message add, all existing subs are interested in this message
1924    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1925        for(String subscriptionKey : sd.subscriptionCache) {
1926            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1927            if (sequences == null) {
1928                sequences = new SequenceSet();
1929                sequences.add(new Sequence(messageSequence, messageSequence + 1));
1930                sd.ackPositions.add(tx, subscriptionKey, sequences);
1931            } else {
1932                sequences.add(new Sequence(messageSequence, messageSequence + 1));
1933                sd.ackPositions.put(tx, subscriptionKey, sequences);
1934            }
1935
1936            Long count = sd.messageReferences.get(messageSequence);
1937            if (count == null) {
1938                count = Long.valueOf(0L);
1939            }
1940            count = count.longValue() + 1;
1941            sd.messageReferences.put(messageSequence, count);
1942            sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1943        }
1944    }
1945
1946    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1947        if (!sd.ackPositions.isEmpty(tx)) {
1948            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
1949            if (sequences == null || sequences.isEmpty()) {
1950                return;
1951            }
1952
1953            ArrayList<Long> unreferenced = new ArrayList<Long>();
1954
1955            for(Long sequenceId : sequences) {
1956                Long references = sd.messageReferences.get(sequenceId);
1957                if (references != null) {
1958                    references = references.longValue() - 1;
1959
1960                    if (references.longValue() > 0) {
1961                        sd.messageReferences.put(sequenceId, references);
1962                    } else {
1963                        sd.messageReferences.remove(sequenceId);
1964                        unreferenced.add(sequenceId);
1965                    }
1966                }
1967            }
1968
1969            for(Long sequenceId : unreferenced) {
1970                // Find all the entries that need to get deleted.
1971                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1972                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1973
1974                // Do the actual deletes.
1975                for (Entry<Long, MessageKeys> entry : deletes) {
1976                    sd.locationIndex.remove(tx, entry.getValue().location);
1977                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1978                    sd.orderIndex.remove(tx, entry.getKey());
1979                }
1980            }
1981        }
1982    }
1983
1984    /**
1985     * @param tx
1986     * @param sd
1987     * @param subscriptionKey
1988     * @param messageSequence
1989     * @throws IOException
1990     */
1991    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
1992        // Remove the sub from the previous location set..
1993        if (messageSequence != null) {
1994            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
1995            if (range != null && !range.isEmpty()) {
1996                range.remove(messageSequence);
1997                if (!range.isEmpty()) {
1998                    sd.ackPositions.put(tx, subscriptionKey, range);
1999                } else {
2000                    sd.ackPositions.remove(tx, subscriptionKey);
2001                }
2002
2003                // Check if the message is reference by any other subscription.
2004                Long count = sd.messageReferences.get(messageSequence);
2005                if (count != null){
2006                long references = count.longValue() - 1;
2007                    if (references > 0) {
2008                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2009                        return;
2010                    } else {
2011                        sd.messageReferences.remove(messageSequence);
2012                    }
2013                }
2014
2015                // Find all the entries that need to get deleted.
2016                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2017                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2018
2019                // Do the actual deletes.
2020                for (Entry<Long, MessageKeys> entry : deletes) {
2021                    sd.locationIndex.remove(tx, entry.getValue().location);
2022                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2023                    sd.orderIndex.remove(tx, entry.getKey());
2024                }
2025            }
2026        }
2027    }
2028
2029    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2030        return sd.subscriptionAcks.get(tx, subscriptionKey);
2031    }
2032
2033    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2034        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2035        if (messageSequences != null) {
2036            long result = messageSequences.rangeSize();
2037            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2038            return result > 0 ? result - 1 : 0;
2039        }
2040
2041        return 0;
2042    }
2043
2044    private String key(KahaDestination destination) {
2045        return destination.getType().getNumber() + ":" + destination.getName();
2046    }
2047
2048    // /////////////////////////////////////////////////////////////////
2049    // Transaction related implementation methods.
2050    // /////////////////////////////////////////////////////////////////
2051    @SuppressWarnings("rawtypes")
2052    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2053    @SuppressWarnings("rawtypes")
2054    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2055    protected final Set<String> ackedAndPrepared = new HashSet<String>();
2056
2057    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2058    // till then they are skipped by the store.
2059    // 'at most once' XA guarantee
2060    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2061        this.indexLock.writeLock().lock();
2062        try {
2063            for (MessageAck ack : acks) {
2064                ackedAndPrepared.add(ack.getLastMessageId().toString());
2065            }
2066        } finally {
2067            this.indexLock.writeLock().unlock();
2068        }
2069    }
2070
2071    public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2072        if (acks != null) {
2073            this.indexLock.writeLock().lock();
2074            try {
2075                for (MessageAck ack : acks) {
2076                    ackedAndPrepared.remove(ack.getLastMessageId().toString());
2077                }
2078            } finally {
2079                this.indexLock.writeLock().unlock();
2080            }
2081        }
2082    }
2083
2084    @SuppressWarnings("rawtypes")
2085    private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2086        TransactionId key = TransactionIdConversion.convert(info);
2087        List<Operation> tx;
2088        synchronized (inflightTransactions) {
2089            tx = inflightTransactions.get(key);
2090            if (tx == null) {
2091                tx = Collections.synchronizedList(new ArrayList<Operation>());
2092                inflightTransactions.put(key, tx);
2093            }
2094        }
2095        return tx;
2096    }
2097
2098    @SuppressWarnings("unused")
2099    private TransactionId key(KahaTransactionInfo transactionInfo) {
2100        return TransactionIdConversion.convert(transactionInfo);
2101    }
2102
2103    abstract class Operation <T extends JournalCommand<T>> {
2104        final T command;
2105        final Location location;
2106
2107        public Operation(T command, Location location) {
2108            this.command = command;
2109            this.location = location;
2110        }
2111
2112        public Location getLocation() {
2113            return location;
2114        }
2115
2116        public T getCommand() {
2117            return command;
2118        }
2119
2120        abstract public void execute(Transaction tx) throws IOException;
2121    }
2122
2123    class AddOpperation extends Operation<KahaAddMessageCommand> {
2124
2125        public AddOpperation(KahaAddMessageCommand command, Location location) {
2126            super(command, location);
2127        }
2128
2129        @Override
2130        public void execute(Transaction tx) throws IOException {
2131            upadateIndex(tx, command, location);
2132        }
2133
2134    }
2135
2136    class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2137
2138        public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2139            super(command, location);
2140        }
2141
2142        @Override
2143        public void execute(Transaction tx) throws IOException {
2144            updateIndex(tx, command, location);
2145        }
2146    }
2147
2148    // /////////////////////////////////////////////////////////////////
2149    // Initialization related implementation methods.
2150    // /////////////////////////////////////////////////////////////////
2151
2152    private PageFile createPageFile() {
2153        PageFile index = new PageFile(directory, "db");
2154        index.setEnableWriteThread(isEnableIndexWriteAsync());
2155        index.setWriteBatchSize(getIndexWriteBatchSize());
2156        index.setPageCacheSize(indexCacheSize);
2157        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2158        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2159        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2160        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2161        index.setEnablePageCaching(isEnableIndexPageCaching());
2162        return index;
2163    }
2164
2165    private Journal createJournal() throws IOException {
2166        Journal manager = new Journal();
2167        manager.setDirectory(directory);
2168        manager.setMaxFileLength(getJournalMaxFileLength());
2169        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2170        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2171        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2172        manager.setArchiveDataLogs(isArchiveDataLogs());
2173        manager.setSizeAccumulator(storeSize);
2174        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2175        if (getDirectoryArchive() != null) {
2176            IOHelper.mkdirs(getDirectoryArchive());
2177            manager.setDirectoryArchive(getDirectoryArchive());
2178        }
2179        return manager;
2180    }
2181
2182    public int getJournalMaxWriteBatchSize() {
2183        return journalMaxWriteBatchSize;
2184    }
2185
2186    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2187        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2188    }
2189
2190    public File getDirectory() {
2191        return directory;
2192    }
2193
2194    public void setDirectory(File directory) {
2195        this.directory = directory;
2196    }
2197
2198    public boolean isDeleteAllMessages() {
2199        return deleteAllMessages;
2200    }
2201
2202    public void setDeleteAllMessages(boolean deleteAllMessages) {
2203        this.deleteAllMessages = deleteAllMessages;
2204    }
2205
2206    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2207        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2208    }
2209
2210    public int getIndexWriteBatchSize() {
2211        return setIndexWriteBatchSize;
2212    }
2213
2214    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2215        this.enableIndexWriteAsync = enableIndexWriteAsync;
2216    }
2217
2218    boolean isEnableIndexWriteAsync() {
2219        return enableIndexWriteAsync;
2220    }
2221
2222    public boolean isEnableJournalDiskSyncs() {
2223        return enableJournalDiskSyncs;
2224    }
2225
2226    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2227        this.enableJournalDiskSyncs = syncWrites;
2228    }
2229
2230    public long getCheckpointInterval() {
2231        return checkpointInterval;
2232    }
2233
2234    public void setCheckpointInterval(long checkpointInterval) {
2235        this.checkpointInterval = checkpointInterval;
2236    }
2237
2238    public long getCleanupInterval() {
2239        return cleanupInterval;
2240    }
2241
2242    public void setCleanupInterval(long cleanupInterval) {
2243        this.cleanupInterval = cleanupInterval;
2244    }
2245
2246    public void setJournalMaxFileLength(int journalMaxFileLength) {
2247        this.journalMaxFileLength = journalMaxFileLength;
2248    }
2249
2250    public int getJournalMaxFileLength() {
2251        return journalMaxFileLength;
2252    }
2253
2254    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2255        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2256    }
2257
2258    public int getMaxFailoverProducersToTrack() {
2259        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2260    }
2261
2262    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2263        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2264    }
2265
2266    public int getFailoverProducersAuditDepth() {
2267        return this.metadata.producerSequenceIdTracker.getAuditDepth();
2268    }
2269
2270    public PageFile getPageFile() {
2271        if (pageFile == null) {
2272            pageFile = createPageFile();
2273        }
2274        return pageFile;
2275    }
2276
2277    public Journal getJournal() throws IOException {
2278        if (journal == null) {
2279            journal = createJournal();
2280        }
2281        return journal;
2282    }
2283
2284    public boolean isFailIfDatabaseIsLocked() {
2285        return failIfDatabaseIsLocked;
2286    }
2287
2288    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2289        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2290    }
2291
2292    public boolean isIgnoreMissingJournalfiles() {
2293        return ignoreMissingJournalfiles;
2294    }
2295
2296    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2297        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2298    }
2299
2300    public int getIndexCacheSize() {
2301        return indexCacheSize;
2302    }
2303
2304    public void setIndexCacheSize(int indexCacheSize) {
2305        this.indexCacheSize = indexCacheSize;
2306    }
2307
2308    public boolean isCheckForCorruptJournalFiles() {
2309        return checkForCorruptJournalFiles;
2310    }
2311
2312    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2313        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2314    }
2315
2316    public boolean isChecksumJournalFiles() {
2317        return checksumJournalFiles;
2318    }
2319
2320    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2321        this.checksumJournalFiles = checksumJournalFiles;
2322    }
2323
2324    public void setBrokerService(BrokerService brokerService) {
2325        this.brokerService = brokerService;
2326    }
2327
2328    /**
2329     * @return the archiveDataLogs
2330     */
2331    public boolean isArchiveDataLogs() {
2332        return this.archiveDataLogs;
2333    }
2334
2335    /**
2336     * @param archiveDataLogs the archiveDataLogs to set
2337     */
2338    public void setArchiveDataLogs(boolean archiveDataLogs) {
2339        this.archiveDataLogs = archiveDataLogs;
2340    }
2341
2342    /**
2343     * @return the directoryArchive
2344     */
2345    public File getDirectoryArchive() {
2346        return this.directoryArchive;
2347    }
2348
2349    /**
2350     * @param directoryArchive the directoryArchive to set
2351     */
2352    public void setDirectoryArchive(File directoryArchive) {
2353        this.directoryArchive = directoryArchive;
2354    }
2355
2356    /**
2357     * @return the databaseLockedWaitDelay
2358     */
2359    public int getDatabaseLockedWaitDelay() {
2360        return this.databaseLockedWaitDelay;
2361    }
2362
2363    /**
2364     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
2365     */
2366    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
2367        this.databaseLockedWaitDelay = databaseLockedWaitDelay;
2368    }
2369
2370    public boolean isRewriteOnRedelivery() {
2371        return rewriteOnRedelivery;
2372    }
2373
2374    public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2375        this.rewriteOnRedelivery = rewriteOnRedelivery;
2376    }
2377
2378    public boolean isArchiveCorruptedIndex() {
2379        return archiveCorruptedIndex;
2380    }
2381
2382    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2383        this.archiveCorruptedIndex = archiveCorruptedIndex;
2384    }
2385
2386    public float getIndexLFUEvictionFactor() {
2387        return indexLFUEvictionFactor;
2388    }
2389
2390    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2391        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2392    }
2393
2394    public boolean isUseIndexLFRUEviction() {
2395        return useIndexLFRUEviction;
2396    }
2397
2398    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2399        this.useIndexLFRUEviction = useIndexLFRUEviction;
2400    }
2401
2402    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2403        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2404    }
2405
2406    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2407        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2408    }
2409
2410    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2411        this.enableIndexPageCaching = enableIndexPageCaching;
2412    }
2413
2414    public boolean isEnableIndexDiskSyncs() {
2415        return enableIndexDiskSyncs;
2416    }
2417
2418    public boolean isEnableIndexRecoveryFile() {
2419        return enableIndexRecoveryFile;
2420    }
2421
2422    public boolean isEnableIndexPageCaching() {
2423        return enableIndexPageCaching;
2424    }
2425
2426    // /////////////////////////////////////////////////////////////////
2427    // Internal conversion methods.
2428    // /////////////////////////////////////////////////////////////////
2429
2430    class MessageOrderCursor{
2431        long defaultCursorPosition;
2432        long lowPriorityCursorPosition;
2433        long highPriorityCursorPosition;
2434        MessageOrderCursor(){
2435        }
2436
2437        MessageOrderCursor(long position){
2438            this.defaultCursorPosition=position;
2439            this.lowPriorityCursorPosition=position;
2440            this.highPriorityCursorPosition=position;
2441        }
2442
2443        MessageOrderCursor(MessageOrderCursor other){
2444            this.defaultCursorPosition=other.defaultCursorPosition;
2445            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2446            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2447        }
2448
2449        MessageOrderCursor copy() {
2450            return new MessageOrderCursor(this);
2451        }
2452
2453        void reset() {
2454            this.defaultCursorPosition=0;
2455            this.highPriorityCursorPosition=0;
2456            this.lowPriorityCursorPosition=0;
2457        }
2458
2459        void increment() {
2460            if (defaultCursorPosition!=0) {
2461                defaultCursorPosition++;
2462            }
2463            if (highPriorityCursorPosition!=0) {
2464                highPriorityCursorPosition++;
2465            }
2466            if (lowPriorityCursorPosition!=0) {
2467                lowPriorityCursorPosition++;
2468            }
2469        }
2470
2471        public String toString() {
2472           return "MessageOrderCursor:[def:" + defaultCursorPosition
2473                   + ", low:" + lowPriorityCursorPosition
2474                   + ", high:" +  highPriorityCursorPosition + "]";
2475        }
2476
2477        public void sync(MessageOrderCursor other) {
2478            this.defaultCursorPosition=other.defaultCursorPosition;
2479            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2480            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2481        }
2482    }
2483
2484    class MessageOrderIndex {
2485        static final byte HI = 9;
2486        static final byte LO = 0;
2487        static final byte DEF = 4;
2488
2489        long nextMessageId;
2490        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2491        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2492        BTreeIndex<Long, MessageKeys> highPriorityIndex;
2493        MessageOrderCursor cursor = new MessageOrderCursor();
2494        Long lastDefaultKey;
2495        Long lastHighKey;
2496        Long lastLowKey;
2497        byte lastGetPriority;
2498
2499        MessageKeys remove(Transaction tx, Long key) throws IOException {
2500            MessageKeys result = defaultPriorityIndex.remove(tx, key);
2501            if (result == null && highPriorityIndex!=null) {
2502                result = highPriorityIndex.remove(tx, key);
2503                if (result ==null && lowPriorityIndex!=null) {
2504                    result = lowPriorityIndex.remove(tx, key);
2505                }
2506            }
2507            return result;
2508        }
2509
2510        void load(Transaction tx) throws IOException {
2511            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2512            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2513            defaultPriorityIndex.load(tx);
2514            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2515            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2516            lowPriorityIndex.load(tx);
2517            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2518            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2519            highPriorityIndex.load(tx);
2520        }
2521
2522        void allocate(Transaction tx) throws IOException {
2523            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2524            if (metadata.version >= 2) {
2525                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2526                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2527            }
2528        }
2529
2530        void configureLast(Transaction tx) throws IOException {
2531            // Figure out the next key using the last entry in the destination.
2532            if (highPriorityIndex != null) {
2533                Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2534                if (lastEntry != null) {
2535                    nextMessageId = lastEntry.getKey() + 1;
2536                } else {
2537                    lastEntry = defaultPriorityIndex.getLast(tx);
2538                    if (lastEntry != null) {
2539                        nextMessageId = lastEntry.getKey() + 1;
2540                    } else {
2541                        lastEntry = lowPriorityIndex.getLast(tx);
2542                        if (lastEntry != null) {
2543                            nextMessageId = lastEntry.getKey() + 1;
2544                        }
2545                    }
2546                }
2547            } else {
2548                Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2549                if (lastEntry != null) {
2550                    nextMessageId = lastEntry.getKey() + 1;
2551                }
2552            }
2553        }
2554
2555        void clear(Transaction tx) throws IOException {
2556            this.remove(tx);
2557            this.resetCursorPosition();
2558            this.allocate(tx);
2559            this.load(tx);
2560            this.configureLast(tx);
2561        }
2562
2563        void remove(Transaction tx) throws IOException {
2564            defaultPriorityIndex.clear(tx);
2565            defaultPriorityIndex.unload(tx);
2566            tx.free(defaultPriorityIndex.getPageId());
2567            if (lowPriorityIndex != null) {
2568                lowPriorityIndex.clear(tx);
2569                lowPriorityIndex.unload(tx);
2570
2571                tx.free(lowPriorityIndex.getPageId());
2572            }
2573            if (highPriorityIndex != null) {
2574                highPriorityIndex.clear(tx);
2575                highPriorityIndex.unload(tx);
2576                tx.free(highPriorityIndex.getPageId());
2577            }
2578        }
2579
2580        void resetCursorPosition() {
2581            this.cursor.reset();
2582            lastDefaultKey = null;
2583            lastHighKey = null;
2584            lastLowKey = null;
2585        }
2586
2587        void setBatch(Transaction tx, Long sequence) throws IOException {
2588            if (sequence != null) {
2589                Long nextPosition = new Long(sequence.longValue() + 1);
2590                if (defaultPriorityIndex.containsKey(tx, sequence)) {
2591                    lastDefaultKey = sequence;
2592                    cursor.defaultCursorPosition = nextPosition.longValue();
2593                } else if (highPriorityIndex != null) {
2594                    if (highPriorityIndex.containsKey(tx, sequence)) {
2595                        lastHighKey = sequence;
2596                        cursor.highPriorityCursorPosition = nextPosition.longValue();
2597                    } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2598                        lastLowKey = sequence;
2599                        cursor.lowPriorityCursorPosition = nextPosition.longValue();
2600                    }
2601                } else {
2602                    LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2603                    lastDefaultKey = sequence;
2604                    cursor.defaultCursorPosition = nextPosition.longValue();
2605                }
2606            }
2607        }
2608
2609        void setBatch(Transaction tx, LastAck last) throws IOException {
2610            setBatch(tx, last.lastAckedSequence);
2611            if (cursor.defaultCursorPosition == 0
2612                    && cursor.highPriorityCursorPosition == 0
2613                    && cursor.lowPriorityCursorPosition == 0) {
2614                long next = last.lastAckedSequence + 1;
2615                switch (last.priority) {
2616                    case DEF:
2617                        cursor.defaultCursorPosition = next;
2618                        cursor.highPriorityCursorPosition = next;
2619                        break;
2620                    case HI:
2621                        cursor.highPriorityCursorPosition = next;
2622                        break;
2623                    case LO:
2624                        cursor.lowPriorityCursorPosition = next;
2625                        cursor.defaultCursorPosition = next;
2626                        cursor.highPriorityCursorPosition = next;
2627                        break;
2628                }
2629            }
2630        }
2631
2632        void stoppedIterating() {
2633            if (lastDefaultKey!=null) {
2634                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2635            }
2636            if (lastHighKey!=null) {
2637                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2638            }
2639            if (lastLowKey!=null) {
2640                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2641            }
2642            lastDefaultKey = null;
2643            lastHighKey = null;
2644            lastLowKey = null;
2645        }
2646
2647        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2648                throws IOException {
2649            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2650                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2651            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2652                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2653            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2654                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2655            }
2656        }
2657
2658        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2659                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2660
2661            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2662            deletes.add(iterator.next());
2663        }
2664
2665        long getNextMessageId(int priority) {
2666            return nextMessageId++;
2667        }
2668
2669        MessageKeys get(Transaction tx, Long key) throws IOException {
2670            MessageKeys result = defaultPriorityIndex.get(tx, key);
2671            if (result == null) {
2672                result = highPriorityIndex.get(tx, key);
2673                if (result == null) {
2674                    result = lowPriorityIndex.get(tx, key);
2675                    lastGetPriority = LO;
2676                } else {
2677                    lastGetPriority = HI;
2678                }
2679            } else {
2680                lastGetPriority = DEF;
2681            }
2682            return result;
2683        }
2684
2685        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2686            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2687                return defaultPriorityIndex.put(tx, key, value);
2688            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2689                return highPriorityIndex.put(tx, key, value);
2690            } else {
2691                return lowPriorityIndex.put(tx, key, value);
2692            }
2693        }
2694
2695        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2696            return new MessageOrderIterator(tx,cursor);
2697        }
2698
2699        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2700            return new MessageOrderIterator(tx,m);
2701        }
2702
2703        public byte lastGetPriority() {
2704            return lastGetPriority;
2705        }
2706
2707        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2708            Iterator<Entry<Long, MessageKeys>>currentIterator;
2709            final Iterator<Entry<Long, MessageKeys>>highIterator;
2710            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2711            final Iterator<Entry<Long, MessageKeys>>lowIterator;
2712
2713            MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2714                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2715                if (highPriorityIndex != null) {
2716                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2717                } else {
2718                    this.highIterator = null;
2719                }
2720                if (lowPriorityIndex != null) {
2721                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2722                } else {
2723                    this.lowIterator = null;
2724                }
2725            }
2726
2727            public boolean hasNext() {
2728                if (currentIterator == null) {
2729                    if (highIterator != null) {
2730                        if (highIterator.hasNext()) {
2731                            currentIterator = highIterator;
2732                            return currentIterator.hasNext();
2733                        }
2734                        if (defaultIterator.hasNext()) {
2735                            currentIterator = defaultIterator;
2736                            return currentIterator.hasNext();
2737                        }
2738                        if (lowIterator.hasNext()) {
2739                            currentIterator = lowIterator;
2740                            return currentIterator.hasNext();
2741                        }
2742                        return false;
2743                    } else {
2744                        currentIterator = defaultIterator;
2745                        return currentIterator.hasNext();
2746                    }
2747                }
2748                if (highIterator != null) {
2749                    if (currentIterator.hasNext()) {
2750                        return true;
2751                    }
2752                    if (currentIterator == highIterator) {
2753                        if (defaultIterator.hasNext()) {
2754                            currentIterator = defaultIterator;
2755                            return currentIterator.hasNext();
2756                        }
2757                        if (lowIterator.hasNext()) {
2758                            currentIterator = lowIterator;
2759                            return currentIterator.hasNext();
2760                        }
2761                        return false;
2762                    }
2763
2764                    if (currentIterator == defaultIterator) {
2765                        if (lowIterator.hasNext()) {
2766                            currentIterator = lowIterator;
2767                            return currentIterator.hasNext();
2768                        }
2769                        return false;
2770                    }
2771                }
2772                return currentIterator.hasNext();
2773            }
2774
2775            public Entry<Long, MessageKeys> next() {
2776                Entry<Long, MessageKeys> result = currentIterator.next();
2777                if (result != null) {
2778                    Long key = result.getKey();
2779                    if (highIterator != null) {
2780                        if (currentIterator == defaultIterator) {
2781                            lastDefaultKey = key;
2782                        } else if (currentIterator == highIterator) {
2783                            lastHighKey = key;
2784                        } else {
2785                            lastLowKey = key;
2786                        }
2787                    } else {
2788                        lastDefaultKey = key;
2789                    }
2790                }
2791                return result;
2792            }
2793
2794            public void remove() {
2795                throw new UnsupportedOperationException();
2796            }
2797
2798        }
2799    }
2800
2801    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2802        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2803
2804        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2805            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2806            ObjectOutputStream oout = new ObjectOutputStream(baos);
2807            oout.writeObject(object);
2808            oout.flush();
2809            oout.close();
2810            byte[] data = baos.toByteArray();
2811            dataOut.writeInt(data.length);
2812            dataOut.write(data);
2813        }
2814
2815        @SuppressWarnings("unchecked")
2816        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2817            int dataLen = dataIn.readInt();
2818            byte[] data = new byte[dataLen];
2819            dataIn.readFully(data);
2820            ByteArrayInputStream bais = new ByteArrayInputStream(data);
2821            ObjectInputStream oin = new ObjectInputStream(bais);
2822            try {
2823                return (HashSet<String>) oin.readObject();
2824            } catch (ClassNotFoundException cfe) {
2825                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2826                ioe.initCause(cfe);
2827                throw ioe;
2828            }
2829        }
2830    }
2831}