001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.store.IndexListener;
032import org.apache.activemq.store.MessageRecoveryListener;
033import org.apache.activemq.store.AbstractMessageStore;
034import org.apache.activemq.store.MessageStoreStatistics;
035
036/**
037 * An implementation of {@link org.apache.activemq.store.MessageStore} which
038 * uses a
039 *
040 *
041 */
042public class MemoryMessageStore extends AbstractMessageStore {
043
044    protected final Map<MessageId, Message> messageTable;
045    protected MessageId lastBatchId;
046    protected long sequenceId;
047
048    public MemoryMessageStore(ActiveMQDestination destination) {
049        this(destination, new LinkedHashMap<MessageId, Message>());
050    }
051
052    public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
053        super(destination);
054        this.messageTable = Collections.synchronizedMap(messageTable);
055    }
056
057    @Override
058    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
059        synchronized (messageTable) {
060            messageTable.put(message.getMessageId(), message);
061            incMessageStoreStatistics(getMessageStoreStatistics(), message);
062            message.incrementReferenceCount();
063            message.getMessageId().setFutureOrSequenceLong(sequenceId++);
064            if (indexListener != null) {
065                indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
066            }
067        }
068    }
069
070    // public void addMessageReference(ConnectionContext context,MessageId
071    // messageId,long expirationTime,String messageRef)
072    // throws IOException{
073    // synchronized(messageTable){
074    // messageTable.put(messageId,messageRef);
075    // }
076    // }
077
078    @Override
079    public Message getMessage(MessageId identity) throws IOException {
080        return messageTable.get(identity);
081    }
082
083    // public String getMessageReference(MessageId identity) throws IOException{
084    // return (String)messageTable.get(identity);
085    // }
086
087    @Override
088    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
089        removeMessage(ack.getLastMessageId());
090    }
091
092    public void removeMessage(MessageId msgId) throws IOException {
093        synchronized (messageTable) {
094            Message removed = messageTable.remove(msgId);
095            if( removed !=null ) {
096                removed.decrementReferenceCount();
097                decMessageStoreStatistics(getMessageStoreStatistics(), removed);
098            }
099            if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
100                lastBatchId = null;
101            }
102        }
103    }
104
105    @Override
106    public void recover(MessageRecoveryListener listener) throws Exception {
107        // the message table is a synchronizedMap - so just have to synchronize
108        // here
109        synchronized (messageTable) {
110            for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
111                Message msg = iter.next();
112                listener.recoverMessage(msg);
113            }
114        }
115    }
116
117    @Override
118    public void removeAllMessages(ConnectionContext context) throws IOException {
119        synchronized (messageTable) {
120            messageTable.clear();
121            getMessageStoreStatistics().reset();
122        }
123    }
124
125    public void delete() {
126        synchronized (messageTable) {
127            messageTable.clear();
128            getMessageStoreStatistics().reset();
129        }
130    }
131
132    @Override
133    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
134        synchronized (messageTable) {
135            boolean pastLackBatch = lastBatchId == null;
136            int count = 0;
137            for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
138                Map.Entry entry = (Entry)iter.next();
139                if (pastLackBatch) {
140                    count++;
141                    Object msg = entry.getValue();
142                    lastBatchId = (MessageId)entry.getKey();
143                    if (msg.getClass() == MessageId.class) {
144                        listener.recoverMessageReference((MessageId)msg);
145                    } else {
146                        listener.recoverMessage((Message)msg);
147                    }
148                } else {
149                    pastLackBatch = entry.getKey().equals(lastBatchId);
150                }
151            }
152        }
153    }
154
155    @Override
156    public void resetBatching() {
157        lastBatchId = null;
158    }
159
160    @Override
161    public void setBatch(MessageId messageId) {
162        lastBatchId = messageId;
163    }
164
165    @Override
166    public void updateMessage(Message message) {
167        synchronized (messageTable) {
168            Message original = messageTable.get(message.getMessageId());
169
170            //if can't be found then increment count, else remove old size
171            if (original == null) {
172                getMessageStoreStatistics().getMessageCount().increment();
173            } else {
174                getMessageStoreStatistics().getMessageSize().addSize(-original.getSize());
175            }
176            messageTable.put(message.getMessageId(), message);
177            getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
178        }
179    }
180
181    @Override
182    public void recoverMessageStoreStatistics() throws IOException {
183        synchronized (messageTable) {
184            long size = 0;
185            int count = 0;
186            for (Iterator<Message> iter = messageTable.values().iterator(); iter
187                    .hasNext();) {
188                Message msg = iter.next();
189                size += msg.getSize();
190            }
191
192            getMessageStoreStatistics().reset();
193            getMessageStoreStatistics().getMessageCount().setCount(count);
194            getMessageStoreStatistics().getMessageSize().setTotalSize(size);
195        }
196    }
197
198    protected static final void incMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) {
199        if (stats != null && message != null) {
200            stats.getMessageCount().increment();
201            stats.getMessageSize().addSize(message.getSize());
202        }
203    }
204
205    protected static final void decMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) {
206        if (stats != null && message != null) {
207            stats.getMessageCount().decrement();
208            stats.getMessageSize().addSize(-message.getSize());
209        }
210    }
211
212}