001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ConsumerInfo; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.TopicMessageStore; 055import org.apache.activemq.thread.Task; 056import org.apache.activemq.thread.TaskRunner; 057import org.apache.activemq.thread.TaskRunnerFactory; 058import org.apache.activemq.transaction.Synchronization; 059import org.apache.activemq.util.SubscriptionKey; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * The Topic is a destination that sends a copy of a message to every active 065 * Subscription registered. 066 */ 067public class Topic extends BaseDestination implements Task { 068 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 069 private final TopicMessageStore topicStore; 070 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 071 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 072 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 073 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 074 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 075 private final TaskRunner taskRunner; 076 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 077 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 078 @Override 079 public void run() { 080 try { 081 Topic.this.taskRunner.wakeup(); 082 } catch (InterruptedException e) { 083 } 084 }; 085 }; 086 087 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 088 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 089 super(brokerService, store, destination, parentStats); 090 this.topicStore = store; 091 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 092 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 093 } 094 095 @Override 096 public void initialize() throws Exception { 097 super.initialize(); 098 // set non default subscription recovery policy (override policyEntries) 099 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 100 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 101 setAlwaysRetroactive(true); 102 } 103 if (store != null) { 104 // AMQ-2586: Better to leave this stat at zero than to give the user 105 // misleading metrics. 106 // int messageCount = store.getMessageCount(); 107 // destinationStatistics.getMessages().setCount(messageCount); 108 store.start(); 109 } 110 } 111 112 @Override 113 public List<Subscription> getConsumers() { 114 synchronized (consumers) { 115 return new ArrayList<Subscription>(consumers); 116 } 117 } 118 119 public boolean lock(MessageReference node, LockOwner sub) { 120 return true; 121 } 122 123 @Override 124 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 125 if (!sub.getConsumerInfo().isDurable()) { 126 127 // Do a retroactive recovery if needed. 128 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 129 130 // synchronize with dispatch method so that no new messages are sent 131 // while we are recovering a subscription to avoid out of order messages. 132 dispatchLock.writeLock().lock(); 133 try { 134 boolean applyRecovery = false; 135 synchronized (consumers) { 136 if (!consumers.contains(sub)){ 137 sub.add(context, this); 138 consumers.add(sub); 139 applyRecovery=true; 140 super.addSubscription(context, sub); 141 } 142 } 143 if (applyRecovery){ 144 subscriptionRecoveryPolicy.recover(context, this, sub); 145 } 146 } finally { 147 dispatchLock.writeLock().unlock(); 148 } 149 150 } else { 151 synchronized (consumers) { 152 if (!consumers.contains(sub)){ 153 sub.add(context, this); 154 consumers.add(sub); 155 super.addSubscription(context, sub); 156 } 157 } 158 } 159 } else { 160 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 161 super.addSubscription(context, sub); 162 sub.add(context, this); 163 if(dsub.isActive()) { 164 synchronized (consumers) { 165 boolean hasSubscription = false; 166 167 if (consumers.size() == 0) { 168 hasSubscription = false; 169 } else { 170 for (Subscription currentSub : consumers) { 171 if (currentSub.getConsumerInfo().isDurable()) { 172 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 173 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 174 hasSubscription = true; 175 break; 176 } 177 } 178 } 179 } 180 181 if (!hasSubscription) { 182 consumers.add(sub); 183 } 184 } 185 } 186 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 187 } 188 } 189 190 @Override 191 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 192 if (!sub.getConsumerInfo().isDurable()) { 193 super.removeSubscription(context, sub, lastDeliveredSequenceId); 194 synchronized (consumers) { 195 consumers.remove(sub); 196 } 197 } 198 sub.remove(context, this); 199 } 200 201 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 202 if (topicStore != null) { 203 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 204 DurableTopicSubscription removed = durableSubscribers.remove(key); 205 if (removed != null) { 206 destinationStatistics.getConsumers().decrement(); 207 // deactivate and remove 208 removed.deactivate(false, 0l); 209 consumers.remove(removed); 210 } 211 } 212 } 213 214 private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) { 215 if (hasSelectorChanged(info1, info2)) { 216 return true; 217 } 218 219 return hasNoLocalChanged(info1, info2); 220 } 221 222 private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) { 223 // Prior to V11 the broker did not store the noLocal value for durable subs. 224 if (brokerService.getStoreOpenWireVersion() >= 11) { 225 if (info1.isNoLocal() ^ info2.isNoLocal()) { 226 return true; 227 } 228 } 229 230 return false; 231 } 232 233 private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { 234 if (info1.getSelector() != null ^ info2.getSelector() != null) { 235 return true; 236 } 237 238 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 239 return true; 240 } 241 242 return false; 243 } 244 245 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 246 // synchronize with dispatch method so that no new messages are sent 247 // while we are recovering a subscription to avoid out of order messages. 248 dispatchLock.writeLock().lock(); 249 try { 250 251 if (topicStore == null) { 252 return; 253 } 254 255 // Recover the durable subscription. 256 String clientId = subscription.getSubscriptionKey().getClientId(); 257 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 258 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 259 if (info != null) { 260 // Check to see if selector changed. 261 if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { 262 // Need to delete the subscription 263 topicStore.deleteSubscription(clientId, subscriptionName); 264 info = null; 265 // Force a rebuild of the selector chain for the subscription otherwise 266 // the stored subscription is updated but the selector expression is not 267 // and the subscription will not behave according to the new configuration. 268 subscription.setSelector(subscription.getConsumerInfo().getSelector()); 269 synchronized (consumers) { 270 consumers.remove(subscription); 271 } 272 } else { 273 synchronized (consumers) { 274 if (!consumers.contains(subscription)) { 275 consumers.add(subscription); 276 } 277 } 278 } 279 } 280 281 // Do we need to create the subscription? 282 if (info == null) { 283 info = new SubscriptionInfo(); 284 info.setClientId(clientId); 285 info.setSelector(subscription.getConsumerInfo().getSelector()); 286 info.setSubscriptionName(subscriptionName); 287 info.setDestination(getActiveMQDestination()); 288 info.setNoLocal(subscription.getConsumerInfo().isNoLocal()); 289 // This destination is an actual destination id. 290 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 291 // This destination might be a pattern 292 synchronized (consumers) { 293 consumers.add(subscription); 294 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 295 } 296 } 297 298 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 299 msgContext.setDestination(destination); 300 if (subscription.isRecoveryRequired()) { 301 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 302 @Override 303 public boolean recoverMessage(Message message) throws Exception { 304 message.setRegionDestination(Topic.this); 305 try { 306 msgContext.setMessageReference(message); 307 if (subscription.matches(message, msgContext)) { 308 subscription.add(message); 309 } 310 } catch (IOException e) { 311 LOG.error("Failed to recover this message {}", message, e); 312 } 313 return true; 314 } 315 316 @Override 317 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 318 throw new RuntimeException("Should not be called."); 319 } 320 321 @Override 322 public boolean hasSpace() { 323 return true; 324 } 325 326 @Override 327 public boolean isDuplicate(MessageId id) { 328 return false; 329 } 330 }); 331 } 332 } finally { 333 dispatchLock.writeLock().unlock(); 334 } 335 } 336 337 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 338 synchronized (consumers) { 339 consumers.remove(sub); 340 } 341 sub.remove(context, this, dispatched); 342 } 343 344 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 345 if (subscription.getConsumerInfo().isRetroactive()) { 346 subscriptionRecoveryPolicy.recover(context, this, subscription); 347 } 348 } 349 350 @Override 351 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 352 final ConnectionContext context = producerExchange.getConnectionContext(); 353 354 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 355 producerExchange.incrementSend(); 356 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 357 && !context.isInRecoveryMode(); 358 359 message.setRegionDestination(this); 360 361 // There is delay between the client sending it and it arriving at the 362 // destination.. it may have expired. 363 if (message.isExpired()) { 364 broker.messageExpired(context, message, null); 365 getDestinationStatistics().getExpired().increment(); 366 if (sendProducerAck) { 367 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 368 context.getConnection().dispatchAsync(ack); 369 } 370 return; 371 } 372 373 if (memoryUsage.isFull()) { 374 isFull(context, memoryUsage); 375 fastProducer(context, producerInfo); 376 377 if (isProducerFlowControl() && context.isProducerFlowControl()) { 378 379 if (warnOnProducerFlowControl) { 380 warnOnProducerFlowControl = false; 381 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 382 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 383 } 384 385 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 386 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 387 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 388 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 389 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 390 } 391 392 // We can avoid blocking due to low usage if the producer is sending a sync message or 393 // if it is using a producer window 394 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 395 synchronized (messagesWaitingForSpace) { 396 messagesWaitingForSpace.add(new Runnable() { 397 @Override 398 public void run() { 399 try { 400 401 // While waiting for space to free up... the 402 // message may have expired. 403 if (message.isExpired()) { 404 broker.messageExpired(context, message, null); 405 getDestinationStatistics().getExpired().increment(); 406 } else { 407 doMessageSend(producerExchange, message); 408 } 409 410 if (sendProducerAck) { 411 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 412 .getSize()); 413 context.getConnection().dispatchAsync(ack); 414 } else { 415 Response response = new Response(); 416 response.setCorrelationId(message.getCommandId()); 417 context.getConnection().dispatchAsync(response); 418 } 419 420 } catch (Exception e) { 421 if (!sendProducerAck && !context.isInRecoveryMode()) { 422 ExceptionResponse response = new ExceptionResponse(e); 423 response.setCorrelationId(message.getCommandId()); 424 context.getConnection().dispatchAsync(response); 425 } 426 } 427 } 428 }); 429 430 registerCallbackForNotFullNotification(); 431 context.setDontSendReponse(true); 432 return; 433 } 434 435 } else { 436 // Producer flow control cannot be used, so we have do the flow control 437 // at the broker by blocking this thread until there is space available. 438 439 if (memoryUsage.isFull()) { 440 if (context.isInTransaction()) { 441 442 int count = 0; 443 while (!memoryUsage.waitForSpace(1000)) { 444 if (context.getStopping().get()) { 445 throw new IOException("Connection closed, send aborted."); 446 } 447 if (count > 2 && context.isInTransaction()) { 448 count = 0; 449 int size = context.getTransaction().size(); 450 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 451 } 452 count++; 453 } 454 } else { 455 waitForSpace( 456 context, 457 producerExchange, 458 memoryUsage, 459 "Usage Manager Memory Usage limit reached. Stopping producer (" 460 + message.getProducerId() 461 + ") to prevent flooding " 462 + getActiveMQDestination().getQualifiedName() 463 + "." 464 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 465 } 466 } 467 468 // The usage manager could have delayed us by the time 469 // we unblock the message could have expired.. 470 if (message.isExpired()) { 471 getDestinationStatistics().getExpired().increment(); 472 LOG.debug("Expired message: {}", message); 473 return; 474 } 475 } 476 } 477 } 478 479 doMessageSend(producerExchange, message); 480 messageDelivered(context, message); 481 if (sendProducerAck) { 482 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 483 context.getConnection().dispatchAsync(ack); 484 } 485 } 486 487 /** 488 * do send the message - this needs to be synchronized to ensure messages 489 * are stored AND dispatched in the right order 490 * 491 * @param producerExchange 492 * @param message 493 * @throws IOException 494 * @throws Exception 495 */ 496 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 497 throws IOException, Exception { 498 final ConnectionContext context = producerExchange.getConnectionContext(); 499 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 500 Future<Object> result = null; 501 502 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 503 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 504 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 505 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 506 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 507 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 508 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 509 throw new javax.jms.ResourceAllocationException(logMessage); 510 } 511 512 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 513 } 514 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 515 516 if (isReduceMemoryFootprint()) { 517 message.clearMarshalledState(); 518 } 519 } 520 521 message.incrementReferenceCount(); 522 523 if (context.isInTransaction()) { 524 context.getTransaction().addSynchronization(new Synchronization() { 525 @Override 526 public void afterCommit() throws Exception { 527 // It could take while before we receive the commit 528 // operation.. by that time the message could have 529 // expired.. 530 if (message.isExpired()) { 531 if (broker.isExpired(message)) { 532 getDestinationStatistics().getExpired().increment(); 533 broker.messageExpired(context, message, null); 534 } 535 message.decrementReferenceCount(); 536 return; 537 } 538 try { 539 dispatch(context, message); 540 } finally { 541 message.decrementReferenceCount(); 542 } 543 } 544 545 @Override 546 public void afterRollback() throws Exception { 547 message.decrementReferenceCount(); 548 } 549 }); 550 551 } else { 552 try { 553 dispatch(context, message); 554 } finally { 555 message.decrementReferenceCount(); 556 } 557 } 558 559 if (result != null && !result.isCancelled()) { 560 try { 561 result.get(); 562 } catch (CancellationException e) { 563 // ignore - the task has been cancelled if the message 564 // has already been deleted 565 } 566 } 567 } 568 569 private boolean canOptimizeOutPersistence() { 570 return durableSubscribers.size() == 0; 571 } 572 573 @Override 574 public String toString() { 575 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 576 } 577 578 @Override 579 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 580 final MessageReference node) throws IOException { 581 if (topicStore != null && node.isPersistent()) { 582 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 583 SubscriptionKey key = dsub.getSubscriptionKey(); 584 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 585 convertToNonRangedAck(ack, node)); 586 } 587 messageConsumed(context, node); 588 } 589 590 @Override 591 public void gc() { 592 } 593 594 public Message loadMessage(MessageId messageId) throws IOException { 595 return topicStore != null ? topicStore.getMessage(messageId) : null; 596 } 597 598 @Override 599 public void start() throws Exception { 600 this.subscriptionRecoveryPolicy.start(); 601 if (memoryUsage != null) { 602 memoryUsage.start(); 603 } 604 605 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 606 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 607 } 608 } 609 610 @Override 611 public void stop() throws Exception { 612 if (taskRunner != null) { 613 taskRunner.shutdown(); 614 } 615 this.subscriptionRecoveryPolicy.stop(); 616 if (memoryUsage != null) { 617 memoryUsage.stop(); 618 } 619 if (this.topicStore != null) { 620 this.topicStore.stop(); 621 } 622 623 scheduler.cancel(expireMessagesTask); 624 } 625 626 @Override 627 public Message[] browse() { 628 final List<Message> result = new ArrayList<Message>(); 629 doBrowse(result, getMaxBrowsePageSize()); 630 return result.toArray(new Message[result.size()]); 631 } 632 633 private void doBrowse(final List<Message> browseList, final int max) { 634 try { 635 if (topicStore != null) { 636 final List<Message> toExpire = new ArrayList<Message>(); 637 topicStore.recover(new MessageRecoveryListener() { 638 @Override 639 public boolean recoverMessage(Message message) throws Exception { 640 if (message.isExpired()) { 641 toExpire.add(message); 642 } 643 browseList.add(message); 644 return true; 645 } 646 647 @Override 648 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 649 return true; 650 } 651 652 @Override 653 public boolean hasSpace() { 654 return browseList.size() < max; 655 } 656 657 @Override 658 public boolean isDuplicate(MessageId id) { 659 return false; 660 } 661 }); 662 final ConnectionContext connectionContext = createConnectionContext(); 663 for (Message message : toExpire) { 664 for (DurableTopicSubscription sub : durableSubscribers.values()) { 665 if (!sub.isActive()) { 666 message.setRegionDestination(this); 667 messageExpired(connectionContext, sub, message); 668 } 669 } 670 } 671 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 672 if (msgs != null) { 673 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 674 browseList.add(msgs[i]); 675 } 676 } 677 } 678 } catch (Throwable e) { 679 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 680 } 681 } 682 683 @Override 684 public boolean iterate() { 685 synchronized (messagesWaitingForSpace) { 686 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 687 Runnable op = messagesWaitingForSpace.removeFirst(); 688 op.run(); 689 } 690 691 if (!messagesWaitingForSpace.isEmpty()) { 692 registerCallbackForNotFullNotification(); 693 } 694 } 695 return false; 696 } 697 698 private void registerCallbackForNotFullNotification() { 699 // If the usage manager is not full, then the task will not 700 // get called.. 701 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 702 // so call it directly here. 703 sendMessagesWaitingForSpaceTask.run(); 704 } 705 } 706 707 // Properties 708 // ------------------------------------------------------------------------- 709 710 public DispatchPolicy getDispatchPolicy() { 711 return dispatchPolicy; 712 } 713 714 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 715 this.dispatchPolicy = dispatchPolicy; 716 } 717 718 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 719 return subscriptionRecoveryPolicy; 720 } 721 722 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 723 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 724 // allow users to combine retained message policy with other ActiveMQ policies 725 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 726 policy.setWrapped(recoveryPolicy); 727 } else { 728 this.subscriptionRecoveryPolicy = recoveryPolicy; 729 } 730 } 731 732 // Implementation methods 733 // ------------------------------------------------------------------------- 734 735 @Override 736 public final void wakeup() { 737 } 738 739 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 740 // AMQ-2586: Better to leave this stat at zero than to give the user 741 // misleading metrics. 742 // destinationStatistics.getMessages().increment(); 743 destinationStatistics.getEnqueues().increment(); 744 destinationStatistics.getMessageSize().addSize(message.getSize()); 745 MessageEvaluationContext msgContext = null; 746 747 dispatchLock.readLock().lock(); 748 try { 749 if (!subscriptionRecoveryPolicy.add(context, message)) { 750 return; 751 } 752 synchronized (consumers) { 753 if (consumers.isEmpty()) { 754 onMessageWithNoConsumers(context, message); 755 return; 756 } 757 } 758 msgContext = context.getMessageEvaluationContext(); 759 msgContext.setDestination(destination); 760 msgContext.setMessageReference(message); 761 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 762 onMessageWithNoConsumers(context, message); 763 } 764 765 } finally { 766 dispatchLock.readLock().unlock(); 767 if (msgContext != null) { 768 msgContext.clear(); 769 } 770 } 771 } 772 773 private final Runnable expireMessagesTask = new Runnable() { 774 @Override 775 public void run() { 776 List<Message> browsedMessages = new InsertionCountList<Message>(); 777 doBrowse(browsedMessages, getMaxExpirePageSize()); 778 } 779 }; 780 781 @Override 782 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 783 broker.messageExpired(context, reference, subs); 784 // AMQ-2586: Better to leave this stat at zero than to give the user 785 // misleading metrics. 786 // destinationStatistics.getMessages().decrement(); 787 destinationStatistics.getExpired().increment(); 788 MessageAck ack = new MessageAck(); 789 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 790 ack.setDestination(destination); 791 ack.setMessageID(reference.getMessageId()); 792 try { 793 if (subs instanceof DurableTopicSubscription) { 794 ((DurableTopicSubscription)subs).removePending(reference); 795 } 796 acknowledge(context, subs, ack, reference); 797 } catch (Exception e) { 798 LOG.error("Failed to remove expired Message from the store ", e); 799 } 800 } 801 802 @Override 803 protected Logger getLog() { 804 return LOG; 805 } 806 807 protected boolean isOptimizeStorage(){ 808 boolean result = false; 809 810 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 811 result = true; 812 for (DurableTopicSubscription s : durableSubscribers.values()) { 813 if (s.isActive()== false){ 814 result = false; 815 break; 816 } 817 if (s.getPrefetchSize()==0){ 818 result = false; 819 break; 820 } 821 if (s.isSlowConsumer()){ 822 result = false; 823 break; 824 } 825 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 826 result = false; 827 break; 828 } 829 } 830 } 831 return result; 832 } 833 834 /** 835 * force a reread of the store - after transaction recovery completion 836 */ 837 @Override 838 public void clearPendingMessages() { 839 dispatchLock.readLock().lock(); 840 try { 841 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 842 clearPendingAndDispatch(durableTopicSubscription); 843 } 844 } finally { 845 dispatchLock.readLock().unlock(); 846 } 847 } 848 849 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 850 synchronized (durableTopicSubscription.pendingLock) { 851 durableTopicSubscription.pending.clear(); 852 try { 853 durableTopicSubscription.dispatchPending(); 854 } catch (IOException exception) { 855 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 856 durableTopicSubscription, 857 destination, 858 durableTopicSubscription.pending }, exception); 859 } 860 } 861 } 862 863 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 864 return durableSubscribers; 865 } 866}