001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicInteger; 024import java.util.concurrent.atomic.AtomicLong; 025 026import javax.jms.JMSException; 027 028import org.apache.activemq.ActiveMQMessageAudit; 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 034import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 035import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 036import org.apache.activemq.command.ConsumerControl; 037import org.apache.activemq.command.ConsumerInfo; 038import org.apache.activemq.command.Message; 039import org.apache.activemq.command.MessageAck; 040import org.apache.activemq.command.MessageDispatch; 041import org.apache.activemq.command.MessageDispatchNotification; 042import org.apache.activemq.command.MessageId; 043import org.apache.activemq.command.MessagePull; 044import org.apache.activemq.command.Response; 045import org.apache.activemq.thread.Scheduler; 046import org.apache.activemq.transaction.Synchronization; 047import org.apache.activemq.transport.TransmitCallback; 048import org.apache.activemq.usage.SystemUsage; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052public class TopicSubscription extends AbstractSubscription { 053 054 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); 055 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 056 057 protected PendingMessageCursor matched; 058 protected final SystemUsage usageManager; 059 boolean singleDestination = true; 060 Destination destination; 061 private final Scheduler scheduler; 062 063 private int maximumPendingMessages = -1; 064 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 065 private int discarded; 066 private final Object matchedListMutex = new Object(); 067 private final AtomicInteger prefetchExtension = new AtomicInteger(0); 068 private int memoryUsageHighWaterMark = 95; 069 // allow duplicate suppression in a ring network of brokers 070 protected int maxProducersToAudit = 1024; 071 protected int maxAuditDepth = 1000; 072 protected boolean enableAudit = false; 073 protected ActiveMQMessageAudit audit; 074 protected boolean active = false; 075 protected boolean discarding = false; 076 077 //Used for inflight message size calculations 078 protected final Object dispatchLock = new Object(); 079 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 080 081 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 082 super(broker, context, info); 083 this.usageManager = usageManager; 084 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 085 if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) { 086 this.matched = new VMPendingMessageCursor(false); 087 } else { 088 this.matched = new FilePendingMessageCursor(broker,matchedName,false); 089 } 090 091 this.scheduler = broker.getScheduler(); 092 } 093 094 public void init() throws Exception { 095 this.matched.setSystemUsage(usageManager); 096 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 097 this.matched.start(); 098 if (enableAudit) { 099 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); 100 } 101 this.active=true; 102 } 103 104 @Override 105 public void add(MessageReference node) throws Exception { 106 if (isDuplicate(node)) { 107 return; 108 } 109 // Lets use an indirect reference so that we can associate a unique 110 // locator /w the message. 111 node = new IndirectMessageReference(node.getMessage()); 112 getSubscriptionStatistics().getEnqueues().increment(); 113 synchronized (matchedListMutex) { 114 // if this subscriber is already discarding a message, we don't want to add 115 // any more messages to it as those messages can only be advisories generated in the process, 116 // which can trigger the recursive call loop 117 if (discarding) return; 118 119 if (!isFull() && matched.isEmpty()) { 120 // if maximumPendingMessages is set we will only discard messages which 121 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 122 dispatch(node); 123 setSlowConsumer(false); 124 } else { 125 if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { 126 // Slow consumers should log and set their state as such. 127 if (!isSlowConsumer()) { 128 LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); 129 setSlowConsumer(true); 130 for (Destination dest: destinations) { 131 dest.slowConsumer(getContext(), this); 132 } 133 } 134 } 135 if (maximumPendingMessages != 0) { 136 boolean warnedAboutWait = false; 137 while (active) { 138 while (matched.isFull()) { 139 if (getContext().getStopping().get()) { 140 LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); 141 getSubscriptionStatistics().getEnqueues().decrement(); 142 return; 143 } 144 if (!warnedAboutWait) { 145 LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.", 146 new Object[]{ 147 toString(), 148 matched, 149 matched.getSystemUsage().getTempUsage().getPercentUsage(), 150 matched.getSystemUsage().getMemoryUsage().getPercentUsage() 151 }); 152 warnedAboutWait = true; 153 } 154 matchedListMutex.wait(20); 155 } 156 // Temporary storage could be full - so just try to add the message 157 // see https://issues.apache.org/activemq/browse/AMQ-2475 158 if (matched.tryAddMessageLast(node, 10)) { 159 break; 160 } 161 } 162 if (maximumPendingMessages > 0) { 163 // calculate the high water mark from which point we 164 // will eagerly evict expired messages 165 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 166 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 167 max = maximumPendingMessages; 168 } 169 if (!matched.isEmpty() && matched.size() > max) { 170 removeExpiredMessages(); 171 } 172 // lets discard old messages as we are a slow consumer 173 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 174 int pageInSize = matched.size() - maximumPendingMessages; 175 // only page in a 1000 at a time - else we could blow the memory 176 pageInSize = Math.max(1000, pageInSize); 177 LinkedList<MessageReference> list = null; 178 MessageReference[] oldMessages=null; 179 synchronized(matched){ 180 list = matched.pageInList(pageInSize); 181 oldMessages = messageEvictionStrategy.evictMessages(list); 182 for (MessageReference ref : list) { 183 ref.decrementReferenceCount(); 184 } 185 } 186 int messagesToEvict = 0; 187 if (oldMessages != null){ 188 messagesToEvict = oldMessages.length; 189 for (int i = 0; i < messagesToEvict; i++) { 190 MessageReference oldMessage = oldMessages[i]; 191 discard(oldMessage); 192 } 193 } 194 // lets avoid an infinite loop if we are given a bad eviction strategy 195 // for a bad strategy lets just not evict 196 if (messagesToEvict == 0) { 197 LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{ 198 destination, messageEvictionStrategy, list.size() 199 }); 200 break; 201 } 202 } 203 } 204 dispatchMatched(); 205 } 206 } 207 } 208 } 209 210 private boolean isDuplicate(MessageReference node) { 211 boolean duplicate = false; 212 if (enableAudit && audit != null) { 213 duplicate = audit.isDuplicate(node); 214 if (LOG.isDebugEnabled()) { 215 if (duplicate) { 216 LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId()); 217 } 218 } 219 } 220 return duplicate; 221 } 222 223 /** 224 * Discard any expired messages from the matched list. Called from a 225 * synchronized block. 226 * 227 * @throws IOException 228 */ 229 protected void removeExpiredMessages() throws IOException { 230 try { 231 matched.reset(); 232 while (matched.hasNext()) { 233 MessageReference node = matched.next(); 234 node.decrementReferenceCount(); 235 if (node.isExpired()) { 236 matched.remove(); 237 getSubscriptionStatistics().getDispatched().increment(); 238 node.decrementReferenceCount(); 239 if (broker.isExpired(node)) { 240 ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); 241 broker.messageExpired(getContext(), node, this); 242 } 243 break; 244 } 245 } 246 } finally { 247 matched.release(); 248 } 249 } 250 251 @Override 252 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 253 synchronized (matchedListMutex) { 254 try { 255 matched.reset(); 256 while (matched.hasNext()) { 257 MessageReference node = matched.next(); 258 node.decrementReferenceCount(); 259 if (node.getMessageId().equals(mdn.getMessageId())) { 260 synchronized(dispatchLock) { 261 matched.remove(); 262 getSubscriptionStatistics().getDispatched().increment(); 263 dispatched.add(node); 264 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 265 node.decrementReferenceCount(); 266 } 267 break; 268 } 269 } 270 } finally { 271 matched.release(); 272 } 273 } 274 } 275 276 @Override 277 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 278 super.acknowledge(context, ack); 279 280 // Handle the standard acknowledgment case. 281 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { 282 if (context.isInTransaction()) { 283 context.getTransaction().addSynchronization(new Synchronization() { 284 @Override 285 public void afterCommit() throws Exception { 286 updateStatsOnAck(ack); 287 dispatchMatched(); 288 } 289 }); 290 } else { 291 updateStatsOnAck(ack); 292 } 293 updatePrefetch(ack); 294 dispatchMatched(); 295 return; 296 } else if (ack.isDeliveredAck()) { 297 // Message was delivered but not acknowledged: update pre-fetch counters. 298 prefetchExtension.addAndGet(ack.getMessageCount()); 299 dispatchMatched(); 300 return; 301 } else if (ack.isExpiredAck()) { 302 updateStatsOnAck(ack); 303 updatePrefetch(ack); 304 dispatchMatched(); 305 return; 306 } else if (ack.isRedeliveredAck()) { 307 // nothing to do atm 308 return; 309 } 310 throw new JMSException("Invalid acknowledgment: " + ack); 311 } 312 313 @Override 314 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 315 316 // The slave should not deliver pull messages. 317 if (getPrefetchSize() == 0) { 318 319 final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount(); 320 prefetchExtension.set(pull.getQuantity()); 321 dispatchMatched(); 322 323 // If there was nothing dispatched.. we may need to setup a timeout. 324 if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 325 326 // immediate timeout used by receiveNoWait() 327 if (pull.getTimeout() == -1) { 328 // Send a NULL message to signal nothing pending. 329 dispatch(null); 330 prefetchExtension.set(0); 331 } 332 333 if (pull.getTimeout() > 0) { 334 scheduler.executeAfterDelay(new Runnable() { 335 336 @Override 337 public void run() { 338 pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone()); 339 } 340 }, pull.getTimeout()); 341 } 342 } 343 } 344 return null; 345 } 346 347 /** 348 * Occurs when a pull times out. If nothing has been dispatched since the 349 * timeout was setup, then send the NULL message. 350 */ 351 private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) { 352 synchronized (matchedListMutex) { 353 if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) { 354 try { 355 dispatch(null); 356 } catch (Exception e) { 357 context.getConnection().serviceException(e); 358 } finally { 359 prefetchExtension.set(0); 360 } 361 } 362 } 363 } 364 365 /** 366 * Update the statistics on message ack. 367 * @param ack 368 */ 369 private void updateStatsOnAck(final MessageAck ack) { 370 synchronized(dispatchLock) { 371 boolean inAckRange = false; 372 List<MessageReference> removeList = new ArrayList<MessageReference>(); 373 for (final MessageReference node : dispatched) { 374 MessageId messageId = node.getMessageId(); 375 if (ack.getFirstMessageId() == null 376 || ack.getFirstMessageId().equals(messageId)) { 377 inAckRange = true; 378 } 379 if (inAckRange) { 380 removeList.add(node); 381 if (ack.getLastMessageId().equals(messageId)) { 382 break; 383 } 384 } 385 } 386 387 for (final MessageReference node : removeList) { 388 dispatched.remove(node); 389 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 390 getSubscriptionStatistics().getDequeues().increment(); 391 ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); 392 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 393 if (info.isNetworkSubscription()) { 394 ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); 395 } 396 if (ack.isExpiredAck()) { 397 destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); 398 } 399 } 400 } 401 } 402 403 private void updatePrefetch(MessageAck ack) { 404 while (true) { 405 int currentExtension = prefetchExtension.get(); 406 int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); 407 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 408 break; 409 } 410 } 411 } 412 413 @Override 414 public int countBeforeFull() { 415 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize(); 416 } 417 418 @Override 419 public int getPendingQueueSize() { 420 return matched(); 421 } 422 423 @Override 424 public long getPendingMessageSize() { 425 synchronized (matchedListMutex) { 426 return matched.messageSize(); 427 } 428 } 429 430 @Override 431 public int getDispatchedQueueSize() { 432 return (int)(getSubscriptionStatistics().getDispatched().getCount() - 433 prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount()); 434 } 435 436 public int getMaximumPendingMessages() { 437 return maximumPendingMessages; 438 } 439 440 @Override 441 public long getDispatchedCounter() { 442 return getSubscriptionStatistics().getDispatched().getCount(); 443 } 444 445 @Override 446 public long getEnqueueCounter() { 447 return getSubscriptionStatistics().getEnqueues().getCount(); 448 } 449 450 @Override 451 public long getDequeueCounter() { 452 return getSubscriptionStatistics().getDequeues().getCount(); 453 } 454 455 /** 456 * @return the number of messages discarded due to being a slow consumer 457 */ 458 public int discarded() { 459 synchronized (matchedListMutex) { 460 return discarded; 461 } 462 } 463 464 /** 465 * @return the number of matched messages (messages targeted for the 466 * subscription but not yet able to be dispatched due to the 467 * prefetch buffer being full). 468 */ 469 public int matched() { 470 synchronized (matchedListMutex) { 471 return matched.size(); 472 } 473 } 474 475 /** 476 * Sets the maximum number of pending messages that can be matched against 477 * this consumer before old messages are discarded. 478 */ 479 public void setMaximumPendingMessages(int maximumPendingMessages) { 480 this.maximumPendingMessages = maximumPendingMessages; 481 } 482 483 public MessageEvictionStrategy getMessageEvictionStrategy() { 484 return messageEvictionStrategy; 485 } 486 487 /** 488 * Sets the eviction strategy used to decide which message to evict when the 489 * slow consumer needs to discard messages 490 */ 491 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 492 this.messageEvictionStrategy = messageEvictionStrategy; 493 } 494 495 public int getMaxProducersToAudit() { 496 return maxProducersToAudit; 497 } 498 499 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 500 this.maxProducersToAudit = maxProducersToAudit; 501 if (audit != null) { 502 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 503 } 504 } 505 506 public int getMaxAuditDepth() { 507 return maxAuditDepth; 508 } 509 510 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 511 this.maxAuditDepth = maxAuditDepth; 512 if (audit != null) { 513 audit.setAuditDepth(maxAuditDepth); 514 } 515 } 516 517 public boolean isEnableAudit() { 518 return enableAudit; 519 } 520 521 public synchronized void setEnableAudit(boolean enableAudit) { 522 this.enableAudit = enableAudit; 523 if (enableAudit && audit == null) { 524 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 525 } 526 } 527 528 // Implementation methods 529 // ------------------------------------------------------------------------- 530 @Override 531 public boolean isFull() { 532 return getDispatchedQueueSize() >= info.getPrefetchSize(); 533 } 534 535 @Override 536 public int getInFlightSize() { 537 return getDispatchedQueueSize(); 538 } 539 540 /** 541 * @return true when 60% or more room is left for dispatching messages 542 */ 543 @Override 544 public boolean isLowWaterMark() { 545 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); 546 } 547 548 /** 549 * @return true when 10% or less room is left for dispatching messages 550 */ 551 @Override 552 public boolean isHighWaterMark() { 553 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); 554 } 555 556 /** 557 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 558 */ 559 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 560 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 561 } 562 563 /** 564 * @return the memoryUsageHighWaterMark 565 */ 566 public int getMemoryUsageHighWaterMark() { 567 return this.memoryUsageHighWaterMark; 568 } 569 570 /** 571 * @return the usageManager 572 */ 573 public SystemUsage getUsageManager() { 574 return this.usageManager; 575 } 576 577 /** 578 * @return the matched 579 */ 580 public PendingMessageCursor getMatched() { 581 return this.matched; 582 } 583 584 /** 585 * @param matched the matched to set 586 */ 587 public void setMatched(PendingMessageCursor matched) { 588 this.matched = matched; 589 } 590 591 /** 592 * inform the MessageConsumer on the client to change it's prefetch 593 * 594 * @param newPrefetch 595 */ 596 @Override 597 public void updateConsumerPrefetch(int newPrefetch) { 598 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 599 ConsumerControl cc = new ConsumerControl(); 600 cc.setConsumerId(info.getConsumerId()); 601 cc.setPrefetch(newPrefetch); 602 context.getConnection().dispatchAsync(cc); 603 } 604 } 605 606 private void dispatchMatched() throws IOException { 607 synchronized (matchedListMutex) { 608 if (!matched.isEmpty() && !isFull()) { 609 try { 610 matched.reset(); 611 612 while (matched.hasNext() && !isFull()) { 613 MessageReference message = matched.next(); 614 message.decrementReferenceCount(); 615 matched.remove(); 616 // Message may have been sitting in the matched list a while 617 // waiting for the consumer to ak the message. 618 if (message.isExpired()) { 619 discard(message); 620 continue; // just drop it. 621 } 622 dispatch(message); 623 } 624 } finally { 625 matched.release(); 626 } 627 } 628 } 629 } 630 631 private void dispatch(final MessageReference node) throws IOException { 632 Message message = node != null ? node.getMessage() : null; 633 if (node != null) { 634 node.incrementReferenceCount(); 635 } 636 // Make sure we can dispatch a message. 637 MessageDispatch md = new MessageDispatch(); 638 md.setMessage(message); 639 md.setConsumerId(info.getConsumerId()); 640 if (node != null) { 641 md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); 642 synchronized(dispatchLock) { 643 getSubscriptionStatistics().getDispatched().increment(); 644 dispatched.add(node); 645 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 646 } 647 648 // Keep track if this subscription is receiving messages from a single destination. 649 if (singleDestination) { 650 if (destination == null) { 651 destination = (Destination)node.getRegionDestination(); 652 } else { 653 if (destination != node.getRegionDestination()) { 654 singleDestination = false; 655 } 656 } 657 } 658 } 659 if (info.isDispatchAsync()) { 660 if (node != null) { 661 md.setTransmitCallback(new TransmitCallback() { 662 663 @Override 664 public void onSuccess() { 665 Destination regionDestination = (Destination) node.getRegionDestination(); 666 regionDestination.getDestinationStatistics().getDispatched().increment(); 667 regionDestination.getDestinationStatistics().getInflight().increment(); 668 node.decrementReferenceCount(); 669 } 670 671 @Override 672 public void onFailure() { 673 Destination regionDestination = (Destination) node.getRegionDestination(); 674 regionDestination.getDestinationStatistics().getDispatched().increment(); 675 regionDestination.getDestinationStatistics().getInflight().increment(); 676 node.decrementReferenceCount(); 677 } 678 }); 679 } 680 context.getConnection().dispatchAsync(md); 681 } else { 682 context.getConnection().dispatchSync(md); 683 if (node != null) { 684 Destination regionDestination = (Destination) node.getRegionDestination(); 685 regionDestination.getDestinationStatistics().getDispatched().increment(); 686 regionDestination.getDestinationStatistics().getInflight().increment(); 687 node.decrementReferenceCount(); 688 } 689 } 690 } 691 692 private void discard(MessageReference message) { 693 discarding = true; 694 try { 695 message.decrementReferenceCount(); 696 matched.remove(message); 697 discarded++; 698 if (destination != null) { 699 destination.getDestinationStatistics().getDequeues().increment(); 700 } 701 LOG.debug("{}, discarding message {}", this, message); 702 Destination dest = (Destination) message.getRegionDestination(); 703 if (dest != null) { 704 dest.messageDiscarded(getContext(), this, message); 705 } 706 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); 707 } finally { 708 discarding = false; 709 } 710 } 711 712 @Override 713 public String toString() { 714 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 715 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); 716 } 717 718 @Override 719 public void destroy() { 720 this.active=false; 721 synchronized (matchedListMutex) { 722 try { 723 matched.destroy(); 724 } catch (Exception e) { 725 LOG.warn("Failed to destroy cursor", e); 726 } 727 } 728 setSlowConsumer(false); 729 synchronized(dispatchLock) { 730 dispatched.clear(); 731 } 732 } 733 734 @Override 735 public int getPrefetchSize() { 736 return info.getPrefetchSize(); 737 } 738 739 @Override 740 public void setPrefetchSize(int newSize) { 741 info.setPrefetchSize(newSize); 742 try { 743 dispatchMatched(); 744 } catch(Exception e) { 745 LOG.trace("Caught exception on dispatch after prefetch size change."); 746 } 747 } 748}