001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.Map.Entry; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageAck; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.command.SubscriptionInfo; 032import org.apache.activemq.store.MessageRecoveryListener; 033import org.apache.activemq.store.MessageStoreStatistics; 034import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 035import org.apache.activemq.store.TopicMessageStore; 036import org.apache.activemq.util.LRUCache; 037import org.apache.activemq.util.SubscriptionKey; 038 039/** 040 * 041 */ 042public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { 043 044 private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase; 045 private Map<SubscriptionKey, MemoryTopicSub> topicSubMap; 046 private final Map<MessageId, Message> originalMessageTable; 047 048 public MemoryTopicMessageStore(ActiveMQDestination destination) { 049 this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap()); 050 051 //Set the messageStoreStatistics after the super class is initialized so that the stats can be 052 //properly updated on cache eviction 053 MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable; 054 cache.setMessageStoreStatistics(messageStoreStatistics); 055 } 056 057 public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) { 058 super(destination, messageTable); 059 this.subscriberDatabase = subscriberDatabase; 060 this.topicSubMap = makeSubMap(); 061 //this is only necessary so that messageStoreStatistics can be set if necessary 062 //We need the original reference since messageTable is wrapped in a synchronized map in the parent class 063 this.originalMessageTable = messageTable; 064 } 065 066 protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() { 067 return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>()); 068 } 069 070 protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() { 071 return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>()); 072 } 073 074 @Override 075 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 076 super.addMessage(context, message); 077 for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) { 078 MemoryTopicSub sub = i.next(); 079 sub.addMessage(message.getMessageId(), message); 080 } 081 } 082 083 @Override 084 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 085 MessageId messageId, MessageAck ack) throws IOException { 086 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 087 MemoryTopicSub sub = topicSubMap.get(key); 088 if (sub != null) { 089 sub.removeMessage(messageId); 090 } 091 } 092 093 @Override 094 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 095 return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); 096 } 097 098 @Override 099 public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException { 100 SubscriptionKey key = new SubscriptionKey(info); 101 MemoryTopicSub sub = new MemoryTopicSub(); 102 topicSubMap.put(key, sub); 103 if (retroactive) { 104 for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) { 105 Map.Entry entry = (Entry)i.next(); 106 sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue()); 107 } 108 } 109 subscriberDatabase.put(key, info); 110 } 111 112 @Override 113 public synchronized void deleteSubscription(String clientId, String subscriptionName) { 114 org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 115 subscriberDatabase.remove(key); 116 topicSubMap.remove(key); 117 } 118 119 @Override 120 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 121 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 122 if (sub != null) { 123 sub.recoverSubscription(listener); 124 } 125 } 126 127 @Override 128 public synchronized void delete() { 129 super.delete(); 130 subscriberDatabase.clear(); 131 topicSubMap.clear(); 132 } 133 134 @Override 135 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 136 return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); 137 } 138 139 @Override 140 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 141 int result = 0; 142 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 143 if (sub != null) { 144 result = sub.size(); 145 } 146 return result; 147 } 148 149 @Override 150 public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException { 151 long result = 0; 152 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 153 if (sub != null) { 154 result = sub.messageSize(); 155 } 156 return result; 157 } 158 159 @Override 160 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 161 MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 162 if (sub != null) { 163 sub.recoverNextMessages(maxReturned, listener); 164 } 165 } 166 167 @Override 168 public void resetBatching(String clientId, String subscriptionName) { 169 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 170 if (sub != null) { 171 sub.resetBatching(); 172 } 173 } 174 175 //Disabled for the memory store, can be enabled later if necessary 176 private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); 177 178 @Override 179 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 180 return stats; 181 } 182 183 /** 184 * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions 185 * when computing the message store statistics. 186 * 187 */ 188 private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> { 189 private static final long serialVersionUID = -342098639681884413L; 190 private MessageStoreStatistics messageStoreStatistics; 191 192 public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize, 193 float loadFactor, boolean accessOrder) { 194 super(initialCapacity, maximumCacheSize, loadFactor, accessOrder); 195 } 196 197 public void setMessageStoreStatistics( 198 MessageStoreStatistics messageStoreStatistics) { 199 this.messageStoreStatistics = messageStoreStatistics; 200 } 201 202 @Override 203 protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) { 204 decMessageStoreStatistics(messageStoreStatistics, eldest.getValue()); 205 } 206 } 207}