001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import javax.jms.JMSException; 029 030import org.apache.activemq.broker.Broker; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 034import org.apache.activemq.command.ConsumerControl; 035import org.apache.activemq.command.ConsumerInfo; 036import org.apache.activemq.command.Message; 037import org.apache.activemq.command.MessageAck; 038import org.apache.activemq.command.MessageDispatch; 039import org.apache.activemq.command.MessageDispatchNotification; 040import org.apache.activemq.command.MessageId; 041import org.apache.activemq.command.MessagePull; 042import org.apache.activemq.command.Response; 043import org.apache.activemq.thread.Scheduler; 044import org.apache.activemq.transaction.Synchronization; 045import org.apache.activemq.transport.TransmitCallback; 046import org.apache.activemq.usage.SystemUsage; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A subscription that honors the pre-fetch option of the ConsumerInfo. 052 */ 053public abstract class PrefetchSubscription extends AbstractSubscription { 054 055 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 056 protected final Scheduler scheduler; 057 058 protected PendingMessageCursor pending; 059 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 060 protected final AtomicInteger prefetchExtension = new AtomicInteger(); 061 protected boolean usePrefetchExtension = true; 062 private int maxProducersToAudit=32; 063 private int maxAuditDepth=2048; 064 protected final SystemUsage usageManager; 065 protected final Object pendingLock = new Object(); 066 protected final Object dispatchLock = new Object(); 067 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 068 069 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException { 070 super(broker,context, info); 071 this.usageManager=usageManager; 072 pending = cursor; 073 try { 074 pending.start(); 075 } catch (Exception e) { 076 throw new JMSException(e.getMessage()); 077 } 078 this.scheduler = broker.getScheduler(); 079 } 080 081 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { 082 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 083 } 084 085 /** 086 * Allows a message to be pulled on demand by a client 087 */ 088 @Override 089 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 090 // The slave should not deliver pull messages. 091 // TODO: when the slave becomes a master, He should send a NULL message to all the 092 // consumers to 'wake them up' in case they were waiting for a message. 093 if (getPrefetchSize() == 0) { 094 prefetchExtension.set(pull.getQuantity()); 095 final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount(); 096 097 // Have the destination push us some messages. 098 for (Destination dest : destinations) { 099 dest.iterate(); 100 } 101 dispatchPending(); 102 103 synchronized(this) { 104 // If there was nothing dispatched.. we may need to setup a timeout. 105 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 106 // immediate timeout used by receiveNoWait() 107 if (pull.getTimeout() == -1) { 108 // Null message indicates the pull is done or did not have pending. 109 prefetchExtension.set(1); 110 add(QueueMessageReference.NULL_MESSAGE); 111 dispatchPending(); 112 } 113 if (pull.getTimeout() > 0) { 114 scheduler.executeAfterDelay(new Runnable() { 115 @Override 116 public void run() { 117 pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone()); 118 } 119 }, pull.getTimeout()); 120 } 121 } 122 } 123 } 124 return null; 125 } 126 127 /** 128 * Occurs when a pull times out. If nothing has been dispatched since the 129 * timeout was setup, then send the NULL message. 130 */ 131 final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) { 132 synchronized (pendingLock) { 133 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) { 134 try { 135 prefetchExtension.set(1); 136 add(QueueMessageReference.NULL_MESSAGE); 137 dispatchPending(); 138 } catch (Exception e) { 139 context.getConnection().serviceException(e); 140 } finally { 141 prefetchExtension.set(0); 142 } 143 } 144 } 145 } 146 147 @Override 148 public void add(MessageReference node) throws Exception { 149 synchronized (pendingLock) { 150 // The destination may have just been removed... 151 if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) { 152 // perhaps we should inform the caller that we are no longer valid to dispatch to? 153 return; 154 } 155 156 // Don't increment for the pullTimeout control message. 157 if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { 158 getSubscriptionStatistics().getEnqueues().increment(); 159 } 160 pending.addMessageLast(node); 161 } 162 dispatchPending(); 163 } 164 165 @Override 166 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 167 synchronized(pendingLock) { 168 try { 169 pending.reset(); 170 while (pending.hasNext()) { 171 MessageReference node = pending.next(); 172 node.decrementReferenceCount(); 173 if (node.getMessageId().equals(mdn.getMessageId())) { 174 // Synchronize between dispatched list and removal of messages from pending list 175 // related to remove subscription action 176 synchronized(dispatchLock) { 177 pending.remove(); 178 createMessageDispatch(node, node.getMessage()); 179 dispatched.add(node); 180 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 181 onDispatch(node, node.getMessage()); 182 } 183 return; 184 } 185 } 186 } finally { 187 pending.release(); 188 } 189 } 190 throw new JMSException( 191 "Slave broker out of sync with master: Dispatched message (" 192 + mdn.getMessageId() + ") was not in the pending list for " 193 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 194 } 195 196 @Override 197 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 198 // Handle the standard acknowledgment case. 199 boolean callDispatchMatched = false; 200 Destination destination = null; 201 202 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 203 // suppress unexpected ack exception in this expected case 204 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack); 205 return; 206 } 207 208 LOG.trace("ack: {}", ack); 209 210 synchronized(dispatchLock) { 211 if (ack.isStandardAck()) { 212 // First check if the ack matches the dispatched. When using failover this might 213 // not be the case. We don't ever want to ack the wrong messages. 214 assertAckMatchesDispatched(ack); 215 216 // Acknowledge all dispatched messages up till the message id of 217 // the acknowledgment. 218 boolean inAckRange = false; 219 List<MessageReference> removeList = new ArrayList<MessageReference>(); 220 for (final MessageReference node : dispatched) { 221 MessageId messageId = node.getMessageId(); 222 if (ack.getFirstMessageId() == null 223 || ack.getFirstMessageId().equals(messageId)) { 224 inAckRange = true; 225 } 226 if (inAckRange) { 227 // Don't remove the nodes until we are committed. 228 if (!context.isInTransaction()) { 229 getSubscriptionStatistics().getDequeues().increment(); 230 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 231 removeList.add(node); 232 } else { 233 registerRemoveSync(context, node); 234 } 235 acknowledge(context, ack, node); 236 if (ack.getLastMessageId().equals(messageId)) { 237 destination = (Destination) node.getRegionDestination(); 238 callDispatchMatched = true; 239 break; 240 } 241 } 242 } 243 for (final MessageReference node : removeList) { 244 dispatched.remove(node); 245 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 246 } 247 // this only happens after a reconnect - get an ack which is not 248 // valid 249 if (!callDispatchMatched) { 250 LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack); 251 } 252 } else if (ack.isIndividualAck()) { 253 // Message was delivered and acknowledge - but only delete the 254 // individual message 255 for (final MessageReference node : dispatched) { 256 MessageId messageId = node.getMessageId(); 257 if (ack.getLastMessageId().equals(messageId)) { 258 // Don't remove the nodes until we are committed - immediateAck option 259 if (!context.isInTransaction()) { 260 getSubscriptionStatistics().getDequeues().increment(); 261 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 262 dispatched.remove(node); 263 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 264 } else { 265 registerRemoveSync(context, node); 266 } 267 268 if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) { 269 // allow transaction batch to exceed prefetch 270 while (true) { 271 int currentExtension = prefetchExtension.get(); 272 int newExtension = Math.max(currentExtension, currentExtension + 1); 273 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 274 break; 275 } 276 } 277 } 278 279 acknowledge(context, ack, node); 280 destination = (Destination) node.getRegionDestination(); 281 callDispatchMatched = true; 282 break; 283 } 284 } 285 }else if (ack.isDeliveredAck()) { 286 // Message was delivered but not acknowledged: update pre-fetch 287 // counters. 288 int index = 0; 289 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 290 final MessageReference node = iter.next(); 291 Destination nodeDest = (Destination) node.getRegionDestination(); 292 if (ack.getLastMessageId().equals(node.getMessageId())) { 293 if (usePrefetchExtension && getPrefetchSize() != 0) { 294 // allow batch to exceed prefetch 295 while (true) { 296 int currentExtension = prefetchExtension.get(); 297 int newExtension = Math.max(currentExtension, index + 1); 298 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 299 break; 300 } 301 } 302 } 303 destination = nodeDest; 304 callDispatchMatched = true; 305 break; 306 } 307 } 308 if (!callDispatchMatched) { 309 throw new JMSException( 310 "Could not correlate acknowledgment with dispatched message: " 311 + ack); 312 } 313 } else if (ack.isExpiredAck()) { 314 // Message was expired 315 int index = 0; 316 boolean inAckRange = false; 317 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 318 final MessageReference node = iter.next(); 319 Destination nodeDest = (Destination) node.getRegionDestination(); 320 MessageId messageId = node.getMessageId(); 321 if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { 322 inAckRange = true; 323 } 324 if (inAckRange) { 325 Destination regionDestination = nodeDest; 326 if (broker.isExpired(node)) { 327 regionDestination.messageExpired(context, this, node); 328 } 329 iter.remove(); 330 nodeDest.getDestinationStatistics().getInflight().decrement(); 331 332 if (ack.getLastMessageId().equals(messageId)) { 333 if (usePrefetchExtension && getPrefetchSize() != 0) { 334 // allow batch to exceed prefetch 335 while (true) { 336 int currentExtension = prefetchExtension.get(); 337 int newExtension = Math.max(currentExtension, index + 1); 338 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 339 break; 340 } 341 } 342 } 343 344 destination = (Destination) node.getRegionDestination(); 345 callDispatchMatched = true; 346 break; 347 } 348 } 349 } 350 if (!callDispatchMatched) { 351 throw new JMSException( 352 "Could not correlate expiration acknowledgment with dispatched message: " 353 + ack); 354 } 355 } else if (ack.isRedeliveredAck()) { 356 // Message was re-delivered but it was not yet considered to be 357 // a DLQ message. 358 boolean inAckRange = false; 359 for (final MessageReference node : dispatched) { 360 MessageId messageId = node.getMessageId(); 361 if (ack.getFirstMessageId() == null 362 || ack.getFirstMessageId().equals(messageId)) { 363 inAckRange = true; 364 } 365 if (inAckRange) { 366 if (ack.getLastMessageId().equals(messageId)) { 367 destination = (Destination) node.getRegionDestination(); 368 callDispatchMatched = true; 369 break; 370 } 371 } 372 } 373 if (!callDispatchMatched) { 374 throw new JMSException( 375 "Could not correlate acknowledgment with dispatched message: " 376 + ack); 377 } 378 } else if (ack.isPoisonAck()) { 379 // TODO: what if the message is already in a DLQ??? 380 // Handle the poison ACK case: we need to send the message to a 381 // DLQ 382 if (ack.isInTransaction()) { 383 throw new JMSException("Poison ack cannot be transacted: " 384 + ack); 385 } 386 int index = 0; 387 boolean inAckRange = false; 388 List<MessageReference> removeList = new ArrayList<MessageReference>(); 389 for (final MessageReference node : dispatched) { 390 MessageId messageId = node.getMessageId(); 391 if (ack.getFirstMessageId() == null 392 || ack.getFirstMessageId().equals(messageId)) { 393 inAckRange = true; 394 } 395 if (inAckRange) { 396 sendToDLQ(context, node, ack.getPoisonCause()); 397 Destination nodeDest = (Destination) node.getRegionDestination(); 398 nodeDest.getDestinationStatistics() 399 .getInflight().decrement(); 400 removeList.add(node); 401 getSubscriptionStatistics().getDequeues().increment(); 402 index++; 403 acknowledge(context, ack, node); 404 if (ack.getLastMessageId().equals(messageId)) { 405 while (true) { 406 int currentExtension = prefetchExtension.get(); 407 int newExtension = Math.max(0, currentExtension - (index + 1)); 408 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 409 break; 410 } 411 } 412 destination = nodeDest; 413 callDispatchMatched = true; 414 break; 415 } 416 } 417 } 418 for (final MessageReference node : removeList) { 419 dispatched.remove(node); 420 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 421 } 422 if (!callDispatchMatched) { 423 throw new JMSException( 424 "Could not correlate acknowledgment with dispatched message: " 425 + ack); 426 } 427 } 428 } 429 if (callDispatchMatched && destination != null) { 430 destination.wakeup(); 431 dispatchPending(); 432 433 if (pending.isEmpty()) { 434 for (Destination dest : destinations) { 435 dest.wakeup(); 436 } 437 } 438 } else { 439 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack); 440 } 441 } 442 443 private void registerRemoveSync(ConnectionContext context, final MessageReference node) { 444 // setup a Synchronization to remove nodes from the 445 // dispatched list. 446 context.getTransaction().addSynchronization( 447 new Synchronization() { 448 449 @Override 450 public void beforeEnd() { 451 if (usePrefetchExtension && getPrefetchSize() != 0) { 452 while (true) { 453 int currentExtension = prefetchExtension.get(); 454 int newExtension = Math.max(0, currentExtension - 1); 455 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 456 break; 457 } 458 } 459 } 460 } 461 462 @Override 463 public void afterCommit() 464 throws Exception { 465 Destination nodeDest = (Destination) node.getRegionDestination(); 466 synchronized(dispatchLock) { 467 getSubscriptionStatistics().getDequeues().increment(); 468 dispatched.remove(node); 469 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 470 nodeDest.getDestinationStatistics().getInflight().decrement(); 471 } 472 nodeDest.wakeup(); 473 dispatchPending(); 474 } 475 476 @Override 477 public void afterRollback() throws Exception { 478 synchronized(dispatchLock) { 479 // poisionAck will decrement - otherwise still inflight on client 480 } 481 } 482 }); 483 } 484 485 /** 486 * Checks an ack versus the contents of the dispatched list. 487 * called with dispatchLock held 488 * @param ack 489 * @throws JMSException if it does not match 490 */ 491 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 492 MessageId firstAckedMsg = ack.getFirstMessageId(); 493 MessageId lastAckedMsg = ack.getLastMessageId(); 494 int checkCount = 0; 495 boolean checkFoundStart = false; 496 boolean checkFoundEnd = false; 497 for (MessageReference node : dispatched) { 498 499 if (firstAckedMsg == null) { 500 checkFoundStart = true; 501 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 502 checkFoundStart = true; 503 } 504 505 if (checkFoundStart) { 506 checkCount++; 507 } 508 509 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 510 checkFoundEnd = true; 511 break; 512 } 513 } 514 if (!checkFoundStart && firstAckedMsg != null) 515 throw new JMSException("Unmatched acknowledge: " + ack 516 + "; Could not find Message-ID " + firstAckedMsg 517 + " in dispatched-list (start of ack)"); 518 if (!checkFoundEnd && lastAckedMsg != null) 519 throw new JMSException("Unmatched acknowledge: " + ack 520 + "; Could not find Message-ID " + lastAckedMsg 521 + " in dispatched-list (end of ack)"); 522 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 523 throw new JMSException("Unmatched acknowledge: " + ack 524 + "; Expected message count (" + ack.getMessageCount() 525 + ") differs from count in dispatched-list (" + checkCount 526 + ")"); 527 } 528 } 529 530 /** 531 * 532 * @param context 533 * @param node 534 * @param poisonCause 535 * @throws IOException 536 * @throws Exception 537 */ 538 protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception { 539 broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause); 540 } 541 542 @Override 543 public int getInFlightSize() { 544 return dispatched.size(); 545 } 546 547 /** 548 * Used to determine if the broker can dispatch to the consumer. 549 * 550 * @return true if the subscription is full 551 */ 552 @Override 553 public boolean isFull() { 554 return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); 555 } 556 557 /** 558 * @return true when 60% or more room is left for dispatching messages 559 */ 560 @Override 561 public boolean isLowWaterMark() { 562 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 563 } 564 565 /** 566 * @return true when 10% or less room is left for dispatching messages 567 */ 568 @Override 569 public boolean isHighWaterMark() { 570 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 571 } 572 573 @Override 574 public int countBeforeFull() { 575 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); 576 } 577 578 @Override 579 public int getPendingQueueSize() { 580 return pending.size(); 581 } 582 583 @Override 584 public long getPendingMessageSize() { 585 synchronized (pendingLock) { 586 return pending.messageSize(); 587 } 588 } 589 590 @Override 591 public int getDispatchedQueueSize() { 592 return dispatched.size(); 593 } 594 595 @Override 596 public long getDequeueCounter() { 597 return getSubscriptionStatistics().getDequeues().getCount(); 598 } 599 600 @Override 601 public long getDispatchedCounter() { 602 return getSubscriptionStatistics().getDispatched().getCount(); 603 } 604 605 @Override 606 public long getEnqueueCounter() { 607 return getSubscriptionStatistics().getEnqueues().getCount(); 608 } 609 610 @Override 611 public boolean isRecoveryRequired() { 612 return pending.isRecoveryRequired(); 613 } 614 615 public PendingMessageCursor getPending() { 616 return this.pending; 617 } 618 619 public void setPending(PendingMessageCursor pending) { 620 this.pending = pending; 621 if (this.pending!=null) { 622 this.pending.setSystemUsage(usageManager); 623 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 624 } 625 } 626 627 @Override 628 public void add(ConnectionContext context, Destination destination) throws Exception { 629 synchronized(pendingLock) { 630 super.add(context, destination); 631 pending.add(context, destination); 632 } 633 } 634 635 @Override 636 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 637 return remove(context, destination, dispatched); 638 } 639 640 public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception { 641 LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>(); 642 synchronized(pendingLock) { 643 super.remove(context, destination); 644 // Here is a potential problem concerning Inflight stat: 645 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 646 // Except if each commit or rollback callback action comes before remove of subscriber. 647 redispatch.addAll(pending.remove(context, destination)); 648 649 if (dispatched == null) { 650 return redispatch; 651 } 652 653 // Synchronized to DispatchLock if necessary 654 if (dispatched == this.dispatched) { 655 synchronized(dispatchLock) { 656 addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); 657 } 658 } else { 659 addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); 660 } 661 } 662 663 return redispatch; 664 } 665 666 private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) { 667 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 668 for (MessageReference r : dispatched) { 669 if (r.getRegionDestination() == destination) { 670 references.add(r); 671 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); 672 } 673 } 674 redispatch.addAll(0, references); 675 destination.getDestinationStatistics().getInflight().subtract(references.size()); 676 dispatched.removeAll(references); 677 } 678 679 // made public so it can be used in MQTTProtocolConverter 680 public void dispatchPending() throws IOException { 681 List<Destination> slowConsumerTargets = null; 682 683 synchronized(pendingLock) { 684 try { 685 int numberToDispatch = countBeforeFull(); 686 if (numberToDispatch > 0) { 687 setSlowConsumer(false); 688 setPendingBatchSize(pending, numberToDispatch); 689 int count = 0; 690 pending.reset(); 691 while (pending.hasNext() && !isFull() && count < numberToDispatch) { 692 MessageReference node = pending.next(); 693 if (node == null) { 694 break; 695 } 696 697 // Synchronize between dispatched list and remove of message from pending list 698 // related to remove subscription action 699 synchronized(dispatchLock) { 700 pending.remove(); 701 if (!isDropped(node) && canDispatch(node)) { 702 703 // Message may have been sitting in the pending 704 // list a while waiting for the consumer to ak the message. 705 if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 706 //increment number to dispatch 707 numberToDispatch++; 708 if (broker.isExpired(node)) { 709 ((Destination)node.getRegionDestination()).messageExpired(context, this, node); 710 } 711 712 if (!isBrowser()) { 713 node.decrementReferenceCount(); 714 continue; 715 } 716 } 717 dispatch(node); 718 count++; 719 } 720 } 721 // decrement after dispatch has taken ownership to avoid usage jitter 722 node.decrementReferenceCount(); 723 } 724 } else if (!isSlowConsumer()) { 725 setSlowConsumer(true); 726 slowConsumerTargets = destinations; 727 } 728 } finally { 729 pending.release(); 730 } 731 } 732 733 if (slowConsumerTargets != null) { 734 for (Destination dest : slowConsumerTargets) { 735 dest.slowConsumer(context, this); 736 } 737 } 738 } 739 740 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 741 pending.setMaxBatchSize(numberToDispatch); 742 } 743 744 // called with dispatchLock held 745 protected boolean dispatch(final MessageReference node) throws IOException { 746 final Message message = node.getMessage(); 747 if (message == null) { 748 return false; 749 } 750 751 okForAckAsDispatchDone.countDown(); 752 753 MessageDispatch md = createMessageDispatch(node, message); 754 if (node != QueueMessageReference.NULL_MESSAGE) { 755 getSubscriptionStatistics().getDispatched().increment(); 756 dispatched.add(node); 757 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 758 } 759 if (getPrefetchSize() == 0) { 760 while (true) { 761 int currentExtension = prefetchExtension.get(); 762 int newExtension = Math.max(0, currentExtension - 1); 763 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 764 break; 765 } 766 } 767 } 768 if (info.isDispatchAsync()) { 769 md.setTransmitCallback(new TransmitCallback() { 770 771 @Override 772 public void onSuccess() { 773 // Since the message gets queued up in async dispatch, we don't want to 774 // decrease the reference count until it gets put on the wire. 775 onDispatch(node, message); 776 } 777 778 @Override 779 public void onFailure() { 780 Destination nodeDest = (Destination) node.getRegionDestination(); 781 if (nodeDest != null) { 782 if (node != QueueMessageReference.NULL_MESSAGE) { 783 nodeDest.getDestinationStatistics().getDispatched().increment(); 784 nodeDest.getDestinationStatistics().getInflight().increment(); 785 LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 786 } 787 } 788 if (node instanceof QueueMessageReference) { 789 ((QueueMessageReference) node).unlock(); 790 } 791 } 792 }); 793 context.getConnection().dispatchAsync(md); 794 } else { 795 context.getConnection().dispatchSync(md); 796 onDispatch(node, message); 797 } 798 return true; 799 } 800 801 protected void onDispatch(final MessageReference node, final Message message) { 802 Destination nodeDest = (Destination) node.getRegionDestination(); 803 if (nodeDest != null) { 804 if (node != QueueMessageReference.NULL_MESSAGE) { 805 nodeDest.getDestinationStatistics().getDispatched().increment(); 806 nodeDest.getDestinationStatistics().getInflight().increment(); 807 LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 808 } 809 } 810 811 if (info.isDispatchAsync()) { 812 try { 813 dispatchPending(); 814 } catch (IOException e) { 815 context.getConnection().serviceExceptionAsync(e); 816 } 817 } 818 } 819 820 /** 821 * inform the MessageConsumer on the client to change it's prefetch 822 * 823 * @param newPrefetch 824 */ 825 @Override 826 public void updateConsumerPrefetch(int newPrefetch) { 827 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 828 ConsumerControl cc = new ConsumerControl(); 829 cc.setConsumerId(info.getConsumerId()); 830 cc.setPrefetch(newPrefetch); 831 context.getConnection().dispatchAsync(cc); 832 } 833 } 834 835 /** 836 * @param node 837 * @param message 838 * @return MessageDispatch 839 */ 840 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 841 MessageDispatch md = new MessageDispatch(); 842 md.setConsumerId(info.getConsumerId()); 843 844 if (node == QueueMessageReference.NULL_MESSAGE) { 845 md.setMessage(null); 846 md.setDestination(null); 847 } else { 848 Destination regionDestination = (Destination) node.getRegionDestination(); 849 md.setDestination(regionDestination.getActiveMQDestination()); 850 md.setMessage(message); 851 md.setRedeliveryCounter(node.getRedeliveryCounter()); 852 } 853 854 return md; 855 } 856 857 /** 858 * Use when a matched message is about to be dispatched to the client. 859 * 860 * @param node 861 * @return false if the message should not be dispatched to the client 862 * (another sub may have already dispatched it for example). 863 * @throws IOException 864 */ 865 protected abstract boolean canDispatch(MessageReference node) throws IOException; 866 867 protected abstract boolean isDropped(MessageReference node); 868 869 /** 870 * Used during acknowledgment to remove the message. 871 * 872 * @throws IOException 873 */ 874 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 875 876 877 public int getMaxProducersToAudit() { 878 return maxProducersToAudit; 879 } 880 881 public void setMaxProducersToAudit(int maxProducersToAudit) { 882 this.maxProducersToAudit = maxProducersToAudit; 883 if (this.pending != null) { 884 this.pending.setMaxProducersToAudit(maxProducersToAudit); 885 } 886 } 887 888 public int getMaxAuditDepth() { 889 return maxAuditDepth; 890 } 891 892 public void setMaxAuditDepth(int maxAuditDepth) { 893 this.maxAuditDepth = maxAuditDepth; 894 if (this.pending != null) { 895 this.pending.setMaxAuditDepth(maxAuditDepth); 896 } 897 } 898 899 public boolean isUsePrefetchExtension() { 900 return usePrefetchExtension; 901 } 902 903 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 904 this.usePrefetchExtension = usePrefetchExtension; 905 } 906 907 protected int getPrefetchExtension() { 908 return this.prefetchExtension.get(); 909 } 910 911 @Override 912 public void setPrefetchSize(int prefetchSize) { 913 this.info.setPrefetchSize(prefetchSize); 914 try { 915 this.dispatchPending(); 916 } catch (Exception e) { 917 LOG.trace("Caught exception during dispatch after prefetch change.", e); 918 } 919 } 920}