001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.command.SubscriptionInfo;
032import org.apache.activemq.store.MessageRecoveryListener;
033import org.apache.activemq.store.MessageStoreStatistics;
034import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
035import org.apache.activemq.store.TopicMessageStore;
036import org.apache.activemq.util.LRUCache;
037import org.apache.activemq.util.SubscriptionKey;
038
039/**
040 *
041 */
042public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
043
044    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
045    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
046    private final Map<MessageId, Message> originalMessageTable;
047
048    public MemoryTopicMessageStore(ActiveMQDestination destination) {
049        this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
050
051        //Set the messageStoreStatistics after the super class is initialized so that the stats can be
052        //properly updated on cache eviction
053        MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
054        cache.setMessageStoreStatistics(messageStoreStatistics);
055    }
056
057    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
058        super(destination, messageTable);
059        this.subscriberDatabase = subscriberDatabase;
060        this.topicSubMap = makeSubMap();
061        //this is only necessary so that messageStoreStatistics can be set if necessary
062        //We need the original reference since messageTable is wrapped in a synchronized map in the parent class
063        this.originalMessageTable = messageTable;
064    }
065
066    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
067        return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
068    }
069
070    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
071        return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
072    }
073
074    @Override
075    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
076        super.addMessage(context, message);
077        for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
078            MemoryTopicSub sub = i.next();
079            sub.addMessage(message.getMessageId(), message);
080        }
081    }
082
083    @Override
084    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
085                                         MessageId messageId, MessageAck ack) throws IOException {
086        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
087        MemoryTopicSub sub = topicSubMap.get(key);
088        if (sub != null) {
089            sub.removeMessage(messageId);
090        }
091    }
092
093    @Override
094    public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
095        return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
096    }
097
098    @Override
099    public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
100        SubscriptionKey key = new SubscriptionKey(info);
101        MemoryTopicSub sub = new MemoryTopicSub();
102        topicSubMap.put(key, sub);
103        if (retroactive) {
104            for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
105                Map.Entry entry = (Entry)i.next();
106                sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
107            }
108        }
109        subscriberDatabase.put(key, info);
110    }
111
112    @Override
113    public synchronized void deleteSubscription(String clientId, String subscriptionName) {
114        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
115        subscriberDatabase.remove(key);
116        topicSubMap.remove(key);
117    }
118
119    @Override
120    public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
121        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
122        if (sub != null) {
123            sub.recoverSubscription(listener);
124        }
125    }
126
127    @Override
128    public synchronized void delete() {
129        super.delete();
130        subscriberDatabase.clear();
131        topicSubMap.clear();
132    }
133
134    @Override
135    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
136        return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
137    }
138
139    @Override
140    public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
141        int result = 0;
142        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
143        if (sub != null) {
144            result = sub.size();
145        }
146        return result;
147    }
148
149    @Override
150    public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException {
151        long result = 0;
152        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
153        if (sub != null) {
154            result = sub.messageSize();
155        }
156        return result;
157    }
158
159    @Override
160    public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
161        MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
162        if (sub != null) {
163            sub.recoverNextMessages(maxReturned, listener);
164        }
165    }
166
167    @Override
168    public void resetBatching(String clientId, String subscriptionName) {
169        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
170        if (sub != null) {
171            sub.resetBatching();
172        }
173    }
174
175    //Disabled for the memory store, can be enabled later if necessary
176    private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
177
178    @Override
179    public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
180        return stats;
181    }
182
183    /**
184     * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
185     * when computing the message store statistics.
186     *
187     */
188    private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
189        private static final long serialVersionUID = -342098639681884413L;
190        private MessageStoreStatistics messageStoreStatistics;
191
192        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
193                float loadFactor, boolean accessOrder) {
194            super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
195        }
196
197        public void setMessageStoreStatistics(
198                MessageStoreStatistics messageStoreStatistics) {
199            this.messageStoreStatistics = messageStoreStatistics;
200        }
201
202        @Override
203        protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
204            decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
205        }
206    }
207}