001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicLong; 027 028import javax.jms.InvalidSelectorException; 029import javax.jms.JMSException; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; 034import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 035import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 036import org.apache.activemq.broker.region.policy.PolicyEntry; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ConsumerInfo; 039import org.apache.activemq.command.Message; 040import org.apache.activemq.command.MessageAck; 041import org.apache.activemq.command.MessageDispatch; 042import org.apache.activemq.command.MessageId; 043import org.apache.activemq.store.TopicMessageStore; 044import org.apache.activemq.usage.SystemUsage; 045import org.apache.activemq.usage.Usage; 046import org.apache.activemq.usage.UsageListener; 047import org.apache.activemq.util.SubscriptionKey; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 052 053 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); 054 private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 055 private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 056 private final SubscriptionKey subscriptionKey; 057 private boolean keepDurableSubsActive; 058 private final AtomicBoolean active = new AtomicBoolean(); 059 private final AtomicLong offlineTimestamp = new AtomicLong(-1); 060 061 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 062 throws JMSException { 063 super(broker, usageManager, context, info); 064 this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 065 this.pending.setSystemUsage(usageManager); 066 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 067 this.keepDurableSubsActive = keepDurableSubsActive; 068 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 069 } 070 071 public final boolean isActive() { 072 return active.get(); 073 } 074 075 public final long getOfflineTimestamp() { 076 return offlineTimestamp.get(); 077 } 078 079 public void setOfflineTimestamp(long timestamp) { 080 offlineTimestamp.set(timestamp); 081 } 082 083 @Override 084 public boolean isFull() { 085 return !active.get() || super.isFull(); 086 } 087 088 @Override 089 public void gc() { 090 } 091 092 /** 093 * store will have a pending ack for all durables, irrespective of the 094 * selector so we need to ack if node is un-matched 095 */ 096 @Override 097 public void unmatched(MessageReference node) throws IOException { 098 MessageAck ack = new MessageAck(); 099 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); 100 ack.setMessageID(node.getMessageId()); 101 Destination regionDestination = (Destination) node.getRegionDestination(); 102 regionDestination.acknowledge(this.getContext(), this, ack, node); 103 } 104 105 @Override 106 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 107 // statically configured via maxPageSize 108 } 109 110 @Override 111 public void add(ConnectionContext context, Destination destination) throws Exception { 112 if (!destinations.contains(destination)) { 113 super.add(context, destination); 114 } 115 // do it just once per destination 116 if (durableDestinations.containsKey(destination.getActiveMQDestination())) { 117 return; 118 } 119 durableDestinations.put(destination.getActiveMQDestination(), destination); 120 121 if (active.get() || keepDurableSubsActive) { 122 Topic topic = (Topic) destination; 123 topic.activate(context, this); 124 getSubscriptionStatistics().getEnqueues().add(pending.size()); 125 } else if (destination.getMessageStore() != null) { 126 TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); 127 try { 128 getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName())); 129 } catch (IOException e) { 130 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e); 131 jmsEx.setLinkedException(e); 132 throw jmsEx; 133 } 134 } 135 dispatchPending(); 136 } 137 138 // used by RetaineMessageSubscriptionRecoveryPolicy 139 public boolean isEmpty(Topic topic) { 140 return pending.isEmpty(topic); 141 } 142 143 public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception { 144 if (!active.get()) { 145 this.context = context; 146 this.info = info; 147 148 LOG.debug("Activating {}", this); 149 if (!keepDurableSubsActive) { 150 for (Destination destination : durableDestinations.values()) { 151 Topic topic = (Topic) destination; 152 add(context, topic); 153 topic.activate(context, this); 154 } 155 156 // On Activation we should update the configuration based on our new consumer info. 157 ActiveMQDestination dest = this.info.getDestination(); 158 if (dest != null && regionBroker.getDestinationPolicy() != null) { 159 PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest); 160 if (entry != null) { 161 entry.configure(broker, usageManager, this); 162 } 163 } 164 } 165 166 synchronized (pendingLock) { 167 if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) { 168 pending.setSystemUsage(memoryManager); 169 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 170 pending.setMaxAuditDepth(getMaxAuditDepth()); 171 pending.setMaxProducersToAudit(getMaxProducersToAudit()); 172 pending.start(); 173 } 174 // use recovery policy every time sub is activated for retroactive topics and consumers 175 for (Destination destination : durableDestinations.values()) { 176 Topic topic = (Topic) destination; 177 if (topic.isAlwaysRetroactive() || info.isRetroactive()) { 178 topic.recoverRetroactiveMessages(context, this); 179 } 180 } 181 } 182 this.active.set(true); 183 this.offlineTimestamp.set(-1); 184 dispatchPending(); 185 this.usageManager.getMemoryUsage().addUsageListener(this); 186 } 187 } 188 189 public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception { 190 LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this); 191 active.set(false); 192 this.keepDurableSubsActive = keepDurableSubsActive; 193 offlineTimestamp.set(System.currentTimeMillis()); 194 usageManager.getMemoryUsage().removeUsageListener(this); 195 196 ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>(); 197 List<MessageReference> savedDispateched = null; 198 199 synchronized (pendingLock) { 200 if (!keepDurableSubsActive) { 201 pending.stop(); 202 } 203 204 synchronized (dispatchLock) { 205 for (Destination destination : durableDestinations.values()) { 206 Topic topic = (Topic) destination; 207 if (!keepDurableSubsActive) { 208 topicsToDeactivate.add(topic); 209 } else { 210 topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); 211 } 212 } 213 214 // Before we add these back to pending they need to be in producer order not 215 // dispatch order so we can add them to the front of the pending list. 216 Collections.reverse(dispatched); 217 218 for (final MessageReference node : dispatched) { 219 // Mark the dispatched messages as redelivered for next time. 220 if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) { 221 Integer count = redeliveredMessages.get(node.getMessageId()); 222 if (count != null) { 223 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 224 } else { 225 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 226 } 227 } 228 if (keepDurableSubsActive && pending.isTransient()) { 229 pending.addMessageFirst(node); 230 pending.rollback(node.getMessageId()); 231 } 232 // createMessageDispatch increments on remove from pending for dispatch 233 node.decrementReferenceCount(); 234 } 235 236 if (!topicsToDeactivate.isEmpty()) { 237 savedDispateched = new ArrayList<MessageReference>(dispatched); 238 } 239 dispatched.clear(); 240 getSubscriptionStatistics().getInflightMessageSize().reset(); 241 } 242 if (!keepDurableSubsActive && pending.isTransient()) { 243 try { 244 pending.reset(); 245 while (pending.hasNext()) { 246 MessageReference node = pending.next(); 247 node.decrementReferenceCount(); 248 pending.remove(); 249 } 250 } finally { 251 pending.release(); 252 } 253 } 254 } 255 for(Topic topic: topicsToDeactivate) { 256 topic.deactivate(context, this, savedDispateched); 257 } 258 prefetchExtension.set(0); 259 } 260 261 @Override 262 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 263 MessageDispatch md = super.createMessageDispatch(node, message); 264 if (node != QueueMessageReference.NULL_MESSAGE) { 265 node.incrementReferenceCount(); 266 Integer count = redeliveredMessages.get(node.getMessageId()); 267 if (count != null) { 268 md.setRedeliveryCounter(count.intValue()); 269 } 270 } 271 return md; 272 } 273 274 @Override 275 public void add(MessageReference node) throws Exception { 276 if (!active.get() && !keepDurableSubsActive) { 277 return; 278 } 279 super.add(node); 280 } 281 282 @Override 283 public void dispatchPending() throws IOException { 284 if (isActive()) { 285 super.dispatchPending(); 286 } 287 } 288 289 public void removePending(MessageReference node) throws IOException { 290 pending.remove(node); 291 } 292 293 @Override 294 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 295 synchronized (pending) { 296 pending.addRecoveredMessage(message); 297 } 298 } 299 300 @Override 301 public int getPendingQueueSize() { 302 if (active.get() || keepDurableSubsActive) { 303 return super.getPendingQueueSize(); 304 } 305 // TODO: need to get from store 306 return 0; 307 } 308 309 @Override 310 public void setSelector(String selector) throws InvalidSelectorException { 311 if (active.get()) { 312 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 313 } else { 314 super.setSelector(getSelector()); 315 } 316 } 317 318 @Override 319 protected boolean canDispatch(MessageReference node) { 320 return true; // let them go, our dispatchPending gates the active / inactive state. 321 } 322 323 @Override 324 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { 325 this.setTimeOfLastMessageAck(System.currentTimeMillis()); 326 Destination regionDestination = (Destination) node.getRegionDestination(); 327 regionDestination.acknowledge(context, this, ack, node); 328 redeliveredMessages.remove(node.getMessageId()); 329 node.decrementReferenceCount(); 330 ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); 331 if (info.isNetworkSubscription()) { 332 ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); 333 } 334 } 335 336 @Override 337 public synchronized String toString() { 338 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" 339 + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() 340 + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 341 } 342 343 public SubscriptionKey getSubscriptionKey() { 344 return subscriptionKey; 345 } 346 347 /** 348 * Release any references that we are holding. 349 */ 350 @Override 351 public void destroy() { 352 synchronized (pendingLock) { 353 try { 354 pending.reset(); 355 while (pending.hasNext()) { 356 MessageReference node = pending.next(); 357 node.decrementReferenceCount(); 358 } 359 } finally { 360 pending.release(); 361 pending.clear(); 362 } 363 } 364 synchronized (dispatchLock) { 365 for (MessageReference node : dispatched) { 366 node.decrementReferenceCount(); 367 } 368 dispatched.clear(); 369 } 370 setSlowConsumer(false); 371 } 372 373 @Override 374 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 375 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 376 try { 377 dispatchPending(); 378 } catch (IOException e) { 379 LOG.warn("problem calling dispatchMatched", e); 380 } 381 } 382 } 383 384 @Override 385 protected boolean isDropped(MessageReference node) { 386 return false; 387 } 388 389 public boolean isKeepDurableSubsActive() { 390 return keepDurableSubsActive; 391 } 392}