001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Set;
026import java.util.Map.Entry;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQQueue;
030import org.apache.activemq.command.ActiveMQTempQueue;
031import org.apache.activemq.command.ActiveMQTempTopic;
032import org.apache.activemq.command.ActiveMQTopic;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageId;
036import org.apache.activemq.command.ProducerId;
037import org.apache.activemq.command.SubscriptionInfo;
038import org.apache.activemq.command.TransactionId;
039import org.apache.activemq.command.XATransactionId;
040import org.apache.activemq.openwire.OpenWireFormat;
041import org.apache.activemq.protobuf.Buffer;
042import org.apache.activemq.store.AbstractMessageStore;
043import org.apache.activemq.store.MessageRecoveryListener;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.PersistenceAdapter;
046import org.apache.activemq.store.TopicMessageStore;
047import org.apache.activemq.store.TransactionRecoveryListener;
048import org.apache.activemq.store.TransactionStore;
049import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
050import org.apache.activemq.store.kahadb.data.KahaDestination;
051import org.apache.activemq.store.kahadb.data.KahaLocation;
052import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
053import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
054import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
055import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
056import org.apache.activemq.usage.MemoryUsage;
057import org.apache.activemq.usage.SystemUsage;
058import org.apache.activemq.util.ByteSequence;
059import org.apache.activemq.wireformat.WireFormat;
060import org.apache.kahadb.journal.Location;
061import org.apache.kahadb.page.Transaction;
062
063public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
064
065    private final WireFormat wireFormat = new OpenWireFormat();
066
067    public void setBrokerName(String brokerName) {
068    }
069    public void setUsageManager(SystemUsage usageManager) {
070    }
071
072    public TransactionStore createTransactionStore() throws IOException {
073        return new TransactionStore(){
074            
075            public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
076                if (preCommit != null) {
077                    preCommit.run();
078                }
079                processCommit(txid);
080                if (postCommit != null) {
081                    postCommit.run();
082                }
083            }
084            public void prepare(TransactionId txid) throws IOException {
085                processPrepare(txid);
086            }
087            public void rollback(TransactionId txid) throws IOException {
088                processRollback(txid);
089            }
090            public void recover(TransactionRecoveryListener listener) throws IOException {
091                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
092                    XATransactionId xid = (XATransactionId)entry.getKey();
093                    ArrayList<Message> messageList = new ArrayList<Message>();
094                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
095                    
096                    for (Operation op : entry.getValue()) {
097                        if( op.getClass() == AddOpperation.class ) {
098                            AddOpperation addOp = (AddOpperation)op;
099                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
100                            messageList.add(msg);
101                        } else {
102                            RemoveOpperation rmOp = (RemoveOpperation)op;
103                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
104                            ackList.add(ack);
105                        }
106                    }
107                    
108                    Message[] addedMessages = new Message[messageList.size()];
109                    MessageAck[] acks = new MessageAck[ackList.size()];
110                    messageList.toArray(addedMessages);
111                    ackList.toArray(acks);
112                    listener.recover(xid, addedMessages, acks);
113                }
114            }
115            public void start() throws Exception {
116            }
117            public void stop() throws Exception {
118            }
119        };
120    }
121
122    public class KahaDBMessageStore extends AbstractMessageStore {
123        protected KahaDestination dest;
124
125        public KahaDBMessageStore(ActiveMQDestination destination) {
126            super(destination);
127            this.dest = convert( destination );
128        }
129
130        @Override
131        public ActiveMQDestination getDestination() {
132            return destination;
133        }
134
135        public void addMessage(ConnectionContext context, Message message) throws IOException {
136            KahaAddMessageCommand command = new KahaAddMessageCommand();
137            command.setDestination(dest);
138            command.setMessageId(message.getMessageId().toString());
139            processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
140        }
141        
142        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
143            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
144            command.setDestination(dest);
145            command.setMessageId(ack.getLastMessageId().toString());
146            processRemove(command, ack.getTransactionId());
147        }
148
149        public void removeAllMessages(ConnectionContext context) throws IOException {
150            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
151            command.setDestination(dest);
152            process(command);
153        }
154
155        public Message getMessage(MessageId identity) throws IOException {
156            final String key = identity.toString();
157            
158            // Hopefully one day the page file supports concurrent read operations... but for now we must
159            // externally synchronize...
160            ByteSequence data;
161            synchronized(indexMutex) {
162                data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
163                    public ByteSequence execute(Transaction tx) throws IOException {
164                        StoredDestination sd = getStoredDestination(dest, tx);
165                        Long sequence = sd.messageIdIndex.get(tx, key);
166                        if( sequence ==null ) {
167                            return null;
168                        }
169                        return sd.orderIndex.get(tx, sequence).data;
170                    }
171                });
172            }
173            if( data == null ) {
174                return null;
175            }
176            
177            Message msg = (Message)wireFormat.unmarshal( data );
178                        return msg;
179        }
180        
181        public int getMessageCount() throws IOException {
182            synchronized(indexMutex) {
183                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
184                    public Integer execute(Transaction tx) throws IOException {
185                        // Iterate through all index entries to get a count of messages in the destination.
186                        StoredDestination sd = getStoredDestination(dest, tx);
187                        int rc=0;
188                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
189                            iterator.next();
190                            rc++;
191                        }
192                        return rc;
193                    }
194                });
195            }
196        }
197
198        public void recover(final MessageRecoveryListener listener) throws Exception {
199            synchronized(indexMutex) {
200                pageFile.tx().execute(new Transaction.Closure<Exception>(){
201                    public void execute(Transaction tx) throws Exception {
202                        StoredDestination sd = getStoredDestination(dest, tx);
203                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
204                            Entry<Long, MessageRecord> entry = iterator.next();
205                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
206                        }
207                    }
208                });
209            }
210        }
211
212        long cursorPos=0;
213        
214        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
215            synchronized(indexMutex) {
216                pageFile.tx().execute(new Transaction.Closure<Exception>(){
217                    public void execute(Transaction tx) throws Exception {
218                        StoredDestination sd = getStoredDestination(dest, tx);
219                        Entry<Long, MessageRecord> entry=null;
220                        int counter = 0;
221                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
222                            entry = iterator.next();
223                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
224                            counter++;
225                            if( counter >= maxReturned ) {
226                                break;
227                            }
228                        }
229                        if( entry!=null ) {
230                            cursorPos = entry.getKey()+1;
231                        }
232                    }
233                });
234            }
235        }
236
237        public void resetBatching() {
238            cursorPos=0;
239        }
240
241        
242        @Override
243        public void setBatch(MessageId identity) throws IOException {
244            final String key = identity.toString();
245            
246            // Hopefully one day the page file supports concurrent read operations... but for now we must
247            // externally synchronize...
248            Long location;
249            synchronized(indexMutex) {
250                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
251                    public Long execute(Transaction tx) throws IOException {
252                        StoredDestination sd = getStoredDestination(dest, tx);
253                        return sd.messageIdIndex.get(tx, key);
254                    }
255                });
256            }
257            if( location!=null ) {
258                cursorPos=location+1;
259            }
260            
261        }
262
263        @Override
264        public void setMemoryUsage(MemoryUsage memoeyUSage) {
265        }
266        @Override
267        public void start() throws Exception {
268        }
269        @Override
270        public void stop() throws Exception {
271        }
272        
273    }
274        
275    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
276        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
277            super(destination);
278        }
279        
280        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
281                                MessageId messageId, MessageAck ack) throws IOException {
282            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
283            command.setDestination(dest);
284            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
285            command.setMessageId(messageId.toString());
286            // We are not passed a transaction info.. so we can't participate in a transaction.
287            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
288            // to pass back to the XA recover method.
289            // command.setTransactionInfo();
290            processRemove(command, null);
291        }
292
293        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
294            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
295            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
296            command.setDestination(dest);
297            command.setSubscriptionKey(subscriptionKey);
298            command.setRetroactive(retroactive);
299            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
300            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
301            process(command);
302        }
303
304        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
305            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
306            command.setDestination(dest);
307            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
308            process(command);
309        }
310
311        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
312            
313            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
314            synchronized(indexMutex) {
315                pageFile.tx().execute(new Transaction.Closure<IOException>(){
316                    public void execute(Transaction tx) throws IOException {
317                        StoredDestination sd = getStoredDestination(dest, tx);
318                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
319                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
320                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
321                            subscriptions.add(info);
322
323                        }
324                    }
325                });
326            }
327            
328            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
329            subscriptions.toArray(rc);
330            return rc;
331        }
332
333        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
334            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
335            synchronized(indexMutex) {
336                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
337                    public SubscriptionInfo execute(Transaction tx) throws IOException {
338                        StoredDestination sd = getStoredDestination(dest, tx);
339                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
340                        if( command ==null ) {
341                            return null;
342                        }
343                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
344                    }
345                });
346            }
347        }
348       
349        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
350            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
351            synchronized(indexMutex) {
352                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
353                    public Integer execute(Transaction tx) throws IOException {
354                        StoredDestination sd = getStoredDestination(dest, tx);
355                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
356                        if ( cursorPos==null ) {
357                            // The subscription might not exist.
358                            return 0;
359                        }
360                        cursorPos += 1;
361                        
362                        int counter = 0;
363                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
364                            iterator.next();
365                            counter++;
366                        }
367                        return counter;
368                    }
369                });
370            }        
371        }
372
373        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
374            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
375            synchronized(indexMutex) {
376                pageFile.tx().execute(new Transaction.Closure<Exception>(){
377                    public void execute(Transaction tx) throws Exception {
378                        StoredDestination sd = getStoredDestination(dest, tx);
379                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
380                        cursorPos += 1;
381                        
382                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
383                            Entry<Long, MessageRecord> entry = iterator.next();
384                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
385                        }
386                    }
387                });
388            }
389        }
390
391        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
392            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
393            synchronized(indexMutex) {
394                pageFile.tx().execute(new Transaction.Closure<Exception>(){
395                    public void execute(Transaction tx) throws Exception {
396                        StoredDestination sd = getStoredDestination(dest, tx);
397                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
398                        if( cursorPos == null ) {
399                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
400                            cursorPos += 1;
401                        }
402                        
403                        Entry<Long, MessageRecord> entry=null;
404                        int counter = 0;
405                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
406                            entry = iterator.next();
407                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
408                            counter++;
409                            if( counter >= maxReturned ) {
410                                break;
411                            }
412                        }
413                        if( entry!=null ) {
414                            sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
415                        }
416                    }
417                });
418            }
419        }
420
421        public void resetBatching(String clientId, String subscriptionName) {
422            try {
423                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
424                synchronized(indexMutex) {
425                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
426                        public void execute(Transaction tx) throws IOException {
427                            StoredDestination sd = getStoredDestination(dest, tx);
428                            sd.subscriptionCursors.remove(subscriptionKey);
429                        }
430                    });
431                }
432            } catch (IOException e) {
433                throw new RuntimeException(e);
434            }
435        }
436    }
437
438    String subscriptionKey(String clientId, String subscriptionName){
439        return clientId+":"+subscriptionName;
440    }
441    
442    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
443        return new KahaDBMessageStore(destination);
444    }
445
446    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
447        return new KahaDBTopicMessageStore(destination);
448    }
449
450    /**
451     * Cleanup method to remove any state associated with the given destination.
452     * This method does not stop the message store (it might not be cached).
453     *
454     * @param destination Destination to forget
455     */
456    public void removeQueueMessageStore(ActiveMQQueue destination) {
457    }
458
459    /**
460     * Cleanup method to remove any state associated with the given destination
461     * This method does not stop the message store (it might not be cached).
462     *
463     * @param destination Destination to forget
464     */
465    public void removeTopicMessageStore(ActiveMQTopic destination) {
466    }
467
468    public void deleteAllMessages() throws IOException {
469    }
470    
471    
472    public Set<ActiveMQDestination> getDestinations() {
473        try {
474            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
475            synchronized(indexMutex) {
476                pageFile.tx().execute(new Transaction.Closure<IOException>(){
477                    public void execute(Transaction tx) throws IOException {
478                        for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
479                            Entry<String, StoredDestination> entry = iterator.next();
480                            rc.add(convert(entry.getKey()));
481                        }
482                    }
483                });
484            }
485            return rc;
486        } catch (IOException e) {
487            throw new RuntimeException(e);
488        }
489    }
490    
491    public long getLastMessageBrokerSequenceId() throws IOException {
492        return 0;
493    }
494    
495    public long size() {
496        if ( !started.get() ) {
497            return 0;
498        }
499        try {
500            return pageFile.getDiskSize();
501        } catch (IOException e) {
502            throw new RuntimeException(e);
503        }
504    }
505
506    public void beginTransaction(ConnectionContext context) throws IOException {
507        throw new IOException("Not yet implemented.");
508    }
509    public void commitTransaction(ConnectionContext context) throws IOException {
510        throw new IOException("Not yet implemented.");
511    }
512    public void rollbackTransaction(ConnectionContext context) throws IOException {
513        throw new IOException("Not yet implemented.");
514    }
515    
516    public void checkpoint(boolean sync) throws IOException {
517    }    
518
519    ///////////////////////////////////////////////////////////////////
520    // Internal conversion methods.
521    ///////////////////////////////////////////////////////////////////
522    
523
524    
525    KahaLocation convert(Location location) {
526        KahaLocation rc = new KahaLocation();
527        rc.setLogId(location.getDataFileId());
528        rc.setOffset(location.getOffset());
529        return rc;
530    }
531    
532    KahaDestination convert(ActiveMQDestination dest) {
533        KahaDestination rc = new KahaDestination();
534        rc.setName(dest.getPhysicalName());
535        switch( dest.getDestinationType() ) {
536        case ActiveMQDestination.QUEUE_TYPE:
537            rc.setType(DestinationType.QUEUE);
538            return rc;
539        case ActiveMQDestination.TOPIC_TYPE:
540            rc.setType(DestinationType.TOPIC);
541            return rc;
542        case ActiveMQDestination.TEMP_QUEUE_TYPE:
543            rc.setType(DestinationType.TEMP_QUEUE);
544            return rc;
545        case ActiveMQDestination.TEMP_TOPIC_TYPE:
546            rc.setType(DestinationType.TEMP_TOPIC);
547            return rc;
548        default:
549            return null;
550        }
551    }
552
553    ActiveMQDestination convert(String dest) {
554        int p = dest.indexOf(":");
555        if( p<0 ) {
556            throw new IllegalArgumentException("Not in the valid destination format");
557        }
558        int type = Integer.parseInt(dest.substring(0, p));
559        String name = dest.substring(p+1);
560        
561        switch( KahaDestination.DestinationType.valueOf(type) ) {
562        case QUEUE:
563            return new ActiveMQQueue(name);
564        case TOPIC:
565            return new ActiveMQTopic(name);
566        case TEMP_QUEUE:
567            return new ActiveMQTempQueue(name);
568        case TEMP_TOPIC:
569            return new ActiveMQTempTopic(name);
570        default:    
571            throw new IllegalArgumentException("Not in the valid destination format");
572        }
573    }
574    
575    public long getLastProducerSequenceId(ProducerId id) {
576        return -1;
577    }
578        
579}