001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.BrokerServiceAware;
021import org.apache.activemq.broker.ConnectionContext;
022import org.apache.activemq.command.ActiveMQDestination;
023import org.apache.activemq.command.ActiveMQQueue;
024import org.apache.activemq.command.ActiveMQTopic;
025import org.apache.activemq.command.LocalTransactionId;
026import org.apache.activemq.command.ProducerId;
027import org.apache.activemq.command.TransactionId;
028import org.apache.activemq.command.XATransactionId;
029import org.apache.activemq.protobuf.Buffer;
030import org.apache.activemq.store.MessageStore;
031import org.apache.activemq.store.PersistenceAdapter;
032import org.apache.activemq.store.TopicMessageStore;
033import org.apache.activemq.store.TransactionStore;
034import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
035import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
036import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
037import org.apache.activemq.usage.SystemUsage;
038
039import java.io.File;
040import java.io.IOException;
041import java.util.Set;
042
043/**
044 * An implementation of {@link PersistenceAdapter} designed for use with
045 * KahaDB - Embedded Lightweight Non-Relational Database
046 * 
047 * @org.apache.xbean.XBean element="kahaDB"
048 * 
049 */
050public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
051    private final KahaDBStore letter = new KahaDBStore();
052
053    /**
054     * @param context
055     * @throws IOException
056     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
057     */
058    public void beginTransaction(ConnectionContext context) throws IOException {
059        this.letter.beginTransaction(context);
060    }
061
062    /**
063     * @param sync
064     * @throws IOException
065     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
066     */
067    public void checkpoint(boolean sync) throws IOException {
068        this.letter.checkpoint(sync);
069    }
070
071    /**
072     * @param context
073     * @throws IOException
074     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
075     */
076    public void commitTransaction(ConnectionContext context) throws IOException {
077        this.letter.commitTransaction(context);
078    }
079
080    /**
081     * @param destination
082     * @return MessageStore
083     * @throws IOException
084     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
085     */
086    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
087        return this.letter.createQueueMessageStore(destination);
088    }
089
090    /**
091     * @param destination
092     * @return TopicMessageStore
093     * @throws IOException
094     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
095     */
096    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
097        return this.letter.createTopicMessageStore(destination);
098    }
099
100    /**
101     * @return TransactionStore
102     * @throws IOException
103     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
104     */
105    public TransactionStore createTransactionStore() throws IOException {
106        return this.letter.createTransactionStore();
107    }
108
109    /**
110     * @throws IOException
111     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
112     */
113    public void deleteAllMessages() throws IOException {
114        this.letter.deleteAllMessages();
115    }
116
117    /**
118     * @return destinations
119     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
120     */
121    public Set<ActiveMQDestination> getDestinations() {
122        return this.letter.getDestinations();
123    }
124
125    /**
126     * @return lastMessageBrokerSequenceId
127     * @throws IOException
128     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
129     */
130    public long getLastMessageBrokerSequenceId() throws IOException {
131        return this.letter.getLastMessageBrokerSequenceId();
132    }
133
134    public long getLastProducerSequenceId(ProducerId id) throws IOException {
135        return this.letter.getLastProducerSequenceId(id);
136    }
137
138    /**
139     * @param destination
140     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
141     */
142    public void removeQueueMessageStore(ActiveMQQueue destination) {
143        this.letter.removeQueueMessageStore(destination);
144    }
145
146    /**
147     * @param destination
148     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
149     */
150    public void removeTopicMessageStore(ActiveMQTopic destination) {
151        this.letter.removeTopicMessageStore(destination);
152    }
153
154    /**
155     * @param context
156     * @throws IOException
157     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
158     */
159    public void rollbackTransaction(ConnectionContext context) throws IOException {
160        this.letter.rollbackTransaction(context);
161    }
162
163    /**
164     * @param brokerName
165     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
166     */
167    public void setBrokerName(String brokerName) {
168        this.letter.setBrokerName(brokerName);
169    }
170
171    /**
172     * @param usageManager
173     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
174     */
175    public void setUsageManager(SystemUsage usageManager) {
176        this.letter.setUsageManager(usageManager);
177    }
178
179    /**
180     * @return the size of the store
181     * @see org.apache.activemq.store.PersistenceAdapter#size()
182     */
183    public long size() {
184        return this.letter.size();
185    }
186
187    /**
188     * @throws Exception
189     * @see org.apache.activemq.Service#start()
190     */
191    public void start() throws Exception {
192        this.letter.start();
193    }
194
195    /**
196     * @throws Exception
197     * @see org.apache.activemq.Service#stop()
198     */
199    public void stop() throws Exception {
200        this.letter.stop();
201    }
202
203    /**
204     * Get the journalMaxFileLength
205     * 
206     * @return the journalMaxFileLength
207     */
208    public int getJournalMaxFileLength() {
209        return this.letter.getJournalMaxFileLength();
210    }
211
212    /**
213     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
214     * be used
215     * 
216     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
217     */
218    public void setJournalMaxFileLength(int journalMaxFileLength) {
219        this.letter.setJournalMaxFileLength(journalMaxFileLength);
220    }
221
222    /**
223     * Set the max number of producers (LRU cache) to track for duplicate sends
224     */
225    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
226        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
227    }
228    
229    public int getMaxFailoverProducersToTrack() {
230        return this.letter.getMaxFailoverProducersToTrack();
231    }
232
233    /**
234     * set the audit window depth for duplicate suppression (should exceed the max transaction
235     * batch)
236     */
237    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
238        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
239    }
240    
241    public int getFailoverProducersAuditDepth() {
242        return this.letter.getFailoverProducersAuditDepth();
243    }
244    
245    /**
246     * Get the checkpointInterval
247     * 
248     * @return the checkpointInterval
249     */
250    public long getCheckpointInterval() {
251        return this.letter.getCheckpointInterval();
252    }
253
254    /**
255     * Set the checkpointInterval
256     * 
257     * @param checkpointInterval
258     *            the checkpointInterval to set
259     */
260    public void setCheckpointInterval(long checkpointInterval) {
261        this.letter.setCheckpointInterval(checkpointInterval);
262    }
263
264    /**
265     * Get the cleanupInterval
266     * 
267     * @return the cleanupInterval
268     */
269    public long getCleanupInterval() {
270        return this.letter.getCleanupInterval();
271    }
272
273    /**
274     * Set the cleanupInterval
275     * 
276     * @param cleanupInterval
277     *            the cleanupInterval to set
278     */
279    public void setCleanupInterval(long cleanupInterval) {
280        this.letter.setCleanupInterval(cleanupInterval);
281    }
282
283    /**
284     * Get the indexWriteBatchSize
285     * 
286     * @return the indexWriteBatchSize
287     */
288    public int getIndexWriteBatchSize() {
289        return this.letter.getIndexWriteBatchSize();
290    }
291
292    /**
293     * Set the indexWriteBatchSize
294     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
295     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
296     * @param indexWriteBatchSize
297     *            the indexWriteBatchSize to set
298     */
299    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
300        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
301    }
302
303    /**
304     * Get the journalMaxWriteBatchSize
305     * 
306     * @return the journalMaxWriteBatchSize
307     */
308    public int getJournalMaxWriteBatchSize() {
309        return this.letter.getJournalMaxWriteBatchSize();
310    }
311
312    /**
313     * Set the journalMaxWriteBatchSize
314     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
315     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
316     * @param journalMaxWriteBatchSize
317     *            the journalMaxWriteBatchSize to set
318     */
319    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
320        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
321    }
322
323    /**
324     * Get the enableIndexWriteAsync
325     * 
326     * @return the enableIndexWriteAsync
327     */
328    public boolean isEnableIndexWriteAsync() {
329        return this.letter.isEnableIndexWriteAsync();
330    }
331
332    /**
333     * Set the enableIndexWriteAsync
334     * 
335     * @param enableIndexWriteAsync
336     *            the enableIndexWriteAsync to set
337     */
338    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
339        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
340    }
341
342    /**
343     * Get the directory
344     * 
345     * @return the directory
346     */
347    public File getDirectory() {
348        return this.letter.getDirectory();
349    }
350
351    /**
352     * @param dir
353     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
354     */
355    public void setDirectory(File dir) {
356        this.letter.setDirectory(dir);
357    }
358
359    /**
360     * Get the enableJournalDiskSyncs
361     * 
362     * @return the enableJournalDiskSyncs
363     */
364    public boolean isEnableJournalDiskSyncs() {
365        return this.letter.isEnableJournalDiskSyncs();
366    }
367
368    /**
369     * Set the enableJournalDiskSyncs
370     * 
371     * @param enableJournalDiskSyncs
372     *            the enableJournalDiskSyncs to set
373     */
374    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
375        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
376    }
377
378    /**
379     * Get the indexCacheSize
380     * 
381     * @return the indexCacheSize
382     */
383    public int getIndexCacheSize() {
384        return this.letter.getIndexCacheSize();
385    }
386
387    /**
388     * Set the indexCacheSize
389     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
390     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
391     * @param indexCacheSize
392     *            the indexCacheSize to set
393     */
394    public void setIndexCacheSize(int indexCacheSize) {
395        this.letter.setIndexCacheSize(indexCacheSize);
396    }
397
398    /**
399     * Get the ignoreMissingJournalfiles
400     * 
401     * @return the ignoreMissingJournalfiles
402     */
403    public boolean isIgnoreMissingJournalfiles() {
404        return this.letter.isIgnoreMissingJournalfiles();
405    }
406
407    /**
408     * Set the ignoreMissingJournalfiles
409     * 
410     * @param ignoreMissingJournalfiles
411     *            the ignoreMissingJournalfiles to set
412     */
413    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
414        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
415    }
416
417    public boolean isChecksumJournalFiles() {
418        return letter.isChecksumJournalFiles();
419    }
420
421    public boolean isCheckForCorruptJournalFiles() {
422        return letter.isCheckForCorruptJournalFiles();
423    }
424
425    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
426        letter.setChecksumJournalFiles(checksumJournalFiles);
427    }
428
429    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
430        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
431    }
432
433    public void setBrokerService(BrokerService brokerService) {
434        letter.setBrokerService(brokerService);
435    }
436
437    public boolean isArchiveDataLogs() {
438        return letter.isArchiveDataLogs();
439    }
440
441    public void setArchiveDataLogs(boolean archiveDataLogs) {
442        letter.setArchiveDataLogs(archiveDataLogs);
443    }
444
445    public File getDirectoryArchive() {
446        return letter.getDirectoryArchive();
447    }
448
449    public void setDirectoryArchive(File directoryArchive) {
450        letter.setDirectoryArchive(directoryArchive);
451    }
452
453    public boolean isConcurrentStoreAndDispatchQueues() {
454        return letter.isConcurrentStoreAndDispatchQueues();
455    }
456
457    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
458        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
459    }
460
461    public boolean isConcurrentStoreAndDispatchTopics() {
462        return letter.isConcurrentStoreAndDispatchTopics();
463    }
464
465    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
466        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
467    }
468
469    public int getMaxAsyncJobs() {
470        return letter.getMaxAsyncJobs();
471    }
472    /**
473     * @param maxAsyncJobs
474     *            the maxAsyncJobs to set
475     */
476    public void setMaxAsyncJobs(int maxAsyncJobs) {
477        letter.setMaxAsyncJobs(maxAsyncJobs);
478    }
479    
480    /**
481     * @return the databaseLockedWaitDelay
482     */
483    public int getDatabaseLockedWaitDelay() {
484        return letter.getDatabaseLockedWaitDelay();
485    }
486
487    /**
488     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
489     */
490    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
491       letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
492    }
493
494    public boolean getForceRecoverIndex() {
495        return letter.getForceRecoverIndex();
496    }
497
498    public void setForceRecoverIndex(boolean forceRecoverIndex) {
499        letter.setForceRecoverIndex(forceRecoverIndex);
500    }
501
502    public boolean isArchiveCorruptedIndex() {
503        return letter.isArchiveCorruptedIndex();
504    }
505
506    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
507        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
508    }
509
510    /**
511     * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
512     * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)  true
513     */
514    public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
515        letter.setRewriteOnRedelivery(rewriteOnRedelivery);
516    }
517
518    public boolean isRewriteOnRedelivery() {
519        return letter.isRewriteOnRedelivery();
520    }
521
522    public float getIndexLFUEvictionFactor() {
523        return letter.getIndexLFUEvictionFactor();
524    }
525
526    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
527        letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
528    }
529
530    public boolean isUseIndexLFRUEviction() {
531        return letter.isUseIndexLFRUEviction();
532    }
533
534    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
535        letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
536    }
537
538    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
539        letter.setEnableIndexDiskSyncs(diskSyncs);
540    }
541
542    public boolean isEnableIndexDiskSyncs() {
543        return letter.isEnableIndexDiskSyncs();
544    }
545
546    public void setEnableIndexRecoveryFile(boolean enable) {
547        letter.setEnableIndexRecoveryFile(enable);
548    }
549
550    public boolean  isEnableIndexRecoveryFile() {
551        return letter.isEnableIndexRecoveryFile();
552    }
553
554    public void setEnableIndexPageCaching(boolean enable) {
555        letter.setEnableIndexPageCaching(enable);
556    }
557
558    public boolean isEnableIndexPageCaching() {
559        return letter.isEnableIndexPageCaching();
560    }
561
562    public KahaDBStore getStore() {
563        return letter;
564    }
565
566    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
567        if (txid == null) {
568            return null;
569        }
570        KahaTransactionInfo rc = new KahaTransactionInfo();
571
572        if (txid.isLocalTransaction()) {
573            LocalTransactionId t = (LocalTransactionId) txid;
574            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
575            kahaTxId.setConnectionId(t.getConnectionId().getValue());
576            kahaTxId.setTransacitonId(t.getValue());
577            rc.setLocalTransacitonId(kahaTxId);
578        } else {
579            XATransactionId t = (XATransactionId) txid;
580            KahaXATransactionId kahaTxId = new KahaXATransactionId();
581            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
582            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
583            kahaTxId.setFormatId(t.getFormatId());
584            rc.setXaTransacitonId(kahaTxId);
585        }
586        return rc;
587    }
588
589    @Override
590    public String toString() {
591        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
592        return "KahaDBPersistenceAdapter[" + path + "]";
593    }
594
595}