001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicInteger;
024import java.util.concurrent.atomic.AtomicLong;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.ActiveMQMessageAudit;
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
035import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
036import org.apache.activemq.command.ConsumerControl;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.Message;
039import org.apache.activemq.command.MessageAck;
040import org.apache.activemq.command.MessageDispatch;
041import org.apache.activemq.command.MessageDispatchNotification;
042import org.apache.activemq.command.MessageId;
043import org.apache.activemq.command.MessagePull;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.thread.Scheduler;
046import org.apache.activemq.transaction.Synchronization;
047import org.apache.activemq.transport.TransmitCallback;
048import org.apache.activemq.usage.SystemUsage;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052public class TopicSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
055    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
056
057    protected PendingMessageCursor matched;
058    protected final SystemUsage usageManager;
059    boolean singleDestination = true;
060    Destination destination;
061    private final Scheduler scheduler;
062
063    private int maximumPendingMessages = -1;
064    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
065    private int discarded;
066    private final Object matchedListMutex = new Object();
067    private final AtomicInteger prefetchExtension = new AtomicInteger(0);
068    private int memoryUsageHighWaterMark = 95;
069    // allow duplicate suppression in a ring network of brokers
070    protected int maxProducersToAudit = 1024;
071    protected int maxAuditDepth = 1000;
072    protected boolean enableAudit = false;
073    protected ActiveMQMessageAudit audit;
074    protected boolean active = false;
075    protected boolean discarding = false;
076
077    //Used for inflight message size calculations
078    protected final Object dispatchLock = new Object();
079    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
080
081    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
082        super(broker, context, info);
083        this.usageManager = usageManager;
084        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
085        if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
086            this.matched = new VMPendingMessageCursor(false);
087        } else {
088            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
089        }
090
091        this.scheduler = broker.getScheduler();
092    }
093
094    public void init() throws Exception {
095        this.matched.setSystemUsage(usageManager);
096        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
097        this.matched.start();
098        if (enableAudit) {
099            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
100        }
101        this.active=true;
102    }
103
104    @Override
105    public void add(MessageReference node) throws Exception {
106        if (isDuplicate(node)) {
107            return;
108        }
109        // Lets use an indirect reference so that we can associate a unique
110        // locator /w the message.
111        node = new IndirectMessageReference(node.getMessage());
112        getSubscriptionStatistics().getEnqueues().increment();
113        synchronized (matchedListMutex) {
114            // if this subscriber is already discarding a message, we don't want to add
115            // any more messages to it as those messages can only be advisories generated in the process,
116            // which can trigger the recursive call loop
117            if (discarding) return;
118
119            if (!isFull() && matched.isEmpty()) {
120                // if maximumPendingMessages is set we will only discard messages which
121                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
122                dispatch(node);
123                setSlowConsumer(false);
124            } else {
125                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
126                    // Slow consumers should log and set their state as such.
127                    if (!isSlowConsumer()) {
128                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
129                        setSlowConsumer(true);
130                        for (Destination dest: destinations) {
131                            dest.slowConsumer(getContext(), this);
132                        }
133                    }
134                }
135                if (maximumPendingMessages != 0) {
136                    boolean warnedAboutWait = false;
137                    while (active) {
138                        while (matched.isFull()) {
139                            if (getContext().getStopping().get()) {
140                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
141                                getSubscriptionStatistics().getEnqueues().decrement();
142                                return;
143                            }
144                            if (!warnedAboutWait) {
145                                LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
146                                        new Object[]{
147                                                toString(),
148                                                matched,
149                                                matched.getSystemUsage().getTempUsage().getPercentUsage(),
150                                                matched.getSystemUsage().getMemoryUsage().getPercentUsage()
151                                        });
152                                warnedAboutWait = true;
153                            }
154                            matchedListMutex.wait(20);
155                        }
156                        // Temporary storage could be full - so just try to add the message
157                        // see https://issues.apache.org/activemq/browse/AMQ-2475
158                        if (matched.tryAddMessageLast(node, 10)) {
159                            break;
160                        }
161                    }
162                    if (maximumPendingMessages > 0) {
163                        // calculate the high water mark from which point we
164                        // will eagerly evict expired messages
165                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
166                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
167                            max = maximumPendingMessages;
168                        }
169                        if (!matched.isEmpty() && matched.size() > max) {
170                            removeExpiredMessages();
171                        }
172                        // lets discard old messages as we are a slow consumer
173                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
174                            int pageInSize = matched.size() - maximumPendingMessages;
175                            // only page in a 1000 at a time - else we could blow the memory
176                            pageInSize = Math.max(1000, pageInSize);
177                            LinkedList<MessageReference> list = null;
178                            MessageReference[] oldMessages=null;
179                            synchronized(matched){
180                                list = matched.pageInList(pageInSize);
181                                oldMessages = messageEvictionStrategy.evictMessages(list);
182                                for (MessageReference ref : list) {
183                                    ref.decrementReferenceCount();
184                                }
185                            }
186                            int messagesToEvict = 0;
187                            if (oldMessages != null){
188                                messagesToEvict = oldMessages.length;
189                                for (int i = 0; i < messagesToEvict; i++) {
190                                    MessageReference oldMessage = oldMessages[i];
191                                    discard(oldMessage);
192                                }
193                            }
194                            // lets avoid an infinite loop if we are given a bad eviction strategy
195                            // for a bad strategy lets just not evict
196                            if (messagesToEvict == 0) {
197                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
198                                        destination, messageEvictionStrategy, list.size()
199                                });
200                                break;
201                            }
202                        }
203                    }
204                    dispatchMatched();
205                }
206            }
207        }
208    }
209
210    private boolean isDuplicate(MessageReference node) {
211        boolean duplicate = false;
212        if (enableAudit && audit != null) {
213            duplicate = audit.isDuplicate(node);
214            if (LOG.isDebugEnabled()) {
215                if (duplicate) {
216                    LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId());
217                }
218            }
219        }
220        return duplicate;
221    }
222
223    /**
224     * Discard any expired messages from the matched list. Called from a
225     * synchronized block.
226     *
227     * @throws IOException
228     */
229    protected void removeExpiredMessages() throws IOException {
230        try {
231            matched.reset();
232            while (matched.hasNext()) {
233                MessageReference node = matched.next();
234                node.decrementReferenceCount();
235                if (node.isExpired()) {
236                    matched.remove();
237                    getSubscriptionStatistics().getDispatched().increment();
238                    node.decrementReferenceCount();
239                    if (broker.isExpired(node)) {
240                        ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
241                        broker.messageExpired(getContext(), node, this);
242                    }
243                    break;
244                }
245            }
246        } finally {
247            matched.release();
248        }
249    }
250
251    @Override
252    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
253        synchronized (matchedListMutex) {
254            try {
255                matched.reset();
256                while (matched.hasNext()) {
257                    MessageReference node = matched.next();
258                    node.decrementReferenceCount();
259                    if (node.getMessageId().equals(mdn.getMessageId())) {
260                        synchronized(dispatchLock) {
261                            matched.remove();
262                            getSubscriptionStatistics().getDispatched().increment();
263                            dispatched.add(node);
264                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
265                            node.decrementReferenceCount();
266                        }
267                        break;
268                    }
269                }
270            } finally {
271                matched.release();
272            }
273        }
274    }
275
276    @Override
277    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
278        super.acknowledge(context, ack);
279
280        // Handle the standard acknowledgment case.
281        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
282            if (context.isInTransaction()) {
283                context.getTransaction().addSynchronization(new Synchronization() {
284                    @Override
285                    public void afterCommit() throws Exception {
286                        updateStatsOnAck(ack);
287                        dispatchMatched();
288                    }
289                });
290            } else {
291                updateStatsOnAck(ack);
292            }
293            updatePrefetch(ack);
294            dispatchMatched();
295            return;
296        } else if (ack.isDeliveredAck()) {
297            // Message was delivered but not acknowledged: update pre-fetch counters.
298            prefetchExtension.addAndGet(ack.getMessageCount());
299            dispatchMatched();
300            return;
301        } else if (ack.isExpiredAck()) {
302            updateStatsOnAck(ack);
303            updatePrefetch(ack);
304            dispatchMatched();
305            return;
306        } else if (ack.isRedeliveredAck()) {
307            // nothing to do atm
308            return;
309        }
310        throw new JMSException("Invalid acknowledgment: " + ack);
311    }
312
313    @Override
314    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
315
316        // The slave should not deliver pull messages.
317        if (getPrefetchSize() == 0) {
318
319            final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
320            prefetchExtension.set(pull.getQuantity());
321            dispatchMatched();
322
323            // If there was nothing dispatched.. we may need to setup a timeout.
324            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
325
326                // immediate timeout used by receiveNoWait()
327                if (pull.getTimeout() == -1) {
328                    // Send a NULL message to signal nothing pending.
329                    dispatch(null);
330                    prefetchExtension.set(0);
331                }
332
333                if (pull.getTimeout() > 0) {
334                    scheduler.executeAfterDelay(new Runnable() {
335
336                        @Override
337                        public void run() {
338                            pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
339                        }
340                    }, pull.getTimeout());
341                }
342            }
343        }
344        return null;
345    }
346
347    /**
348     * Occurs when a pull times out. If nothing has been dispatched since the
349     * timeout was setup, then send the NULL message.
350     */
351    private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
352        synchronized (matchedListMutex) {
353            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) {
354                try {
355                    dispatch(null);
356                } catch (Exception e) {
357                    context.getConnection().serviceException(e);
358                } finally {
359                    prefetchExtension.set(0);
360                }
361            }
362        }
363    }
364
365    /**
366     * Update the statistics on message ack.
367     * @param ack
368     */
369    private void updateStatsOnAck(final MessageAck ack) {
370        synchronized(dispatchLock) {
371            boolean inAckRange = false;
372            List<MessageReference> removeList = new ArrayList<MessageReference>();
373            for (final MessageReference node : dispatched) {
374                MessageId messageId = node.getMessageId();
375                if (ack.getFirstMessageId() == null
376                        || ack.getFirstMessageId().equals(messageId)) {
377                    inAckRange = true;
378                }
379                if (inAckRange) {
380                    removeList.add(node);
381                    if (ack.getLastMessageId().equals(messageId)) {
382                        break;
383                    }
384                }
385            }
386
387            for (final MessageReference node : removeList) {
388                dispatched.remove(node);
389                getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
390                getSubscriptionStatistics().getDequeues().increment();
391                ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
392                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
393                if (info.isNetworkSubscription()) {
394                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
395                }
396                if (ack.isExpiredAck()) {
397                    destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
398                }
399            }
400        }
401    }
402
403    private void updatePrefetch(MessageAck ack) {
404        while (true) {
405            int currentExtension = prefetchExtension.get();
406            int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
407            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
408                break;
409            }
410        }
411    }
412
413    @Override
414    public int countBeforeFull() {
415        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
416    }
417
418    @Override
419    public int getPendingQueueSize() {
420        return matched();
421    }
422
423    @Override
424    public long getPendingMessageSize() {
425        synchronized (matchedListMutex) {
426            return matched.messageSize();
427        }
428    }
429
430    @Override
431    public int getDispatchedQueueSize() {
432        return (int)(getSubscriptionStatistics().getDispatched().getCount() -
433                prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
434    }
435
436    public int getMaximumPendingMessages() {
437        return maximumPendingMessages;
438    }
439
440    @Override
441    public long getDispatchedCounter() {
442        return getSubscriptionStatistics().getDispatched().getCount();
443    }
444
445    @Override
446    public long getEnqueueCounter() {
447        return getSubscriptionStatistics().getEnqueues().getCount();
448    }
449
450    @Override
451    public long getDequeueCounter() {
452        return getSubscriptionStatistics().getDequeues().getCount();
453    }
454
455    /**
456     * @return the number of messages discarded due to being a slow consumer
457     */
458    public int discarded() {
459        synchronized (matchedListMutex) {
460            return discarded;
461        }
462    }
463
464    /**
465     * @return the number of matched messages (messages targeted for the
466     *         subscription but not yet able to be dispatched due to the
467     *         prefetch buffer being full).
468     */
469    public int matched() {
470        synchronized (matchedListMutex) {
471            return matched.size();
472        }
473    }
474
475    /**
476     * Sets the maximum number of pending messages that can be matched against
477     * this consumer before old messages are discarded.
478     */
479    public void setMaximumPendingMessages(int maximumPendingMessages) {
480        this.maximumPendingMessages = maximumPendingMessages;
481    }
482
483    public MessageEvictionStrategy getMessageEvictionStrategy() {
484        return messageEvictionStrategy;
485    }
486
487    /**
488     * Sets the eviction strategy used to decide which message to evict when the
489     * slow consumer needs to discard messages
490     */
491    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
492        this.messageEvictionStrategy = messageEvictionStrategy;
493    }
494
495    public int getMaxProducersToAudit() {
496        return maxProducersToAudit;
497    }
498
499    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
500        this.maxProducersToAudit = maxProducersToAudit;
501        if (audit != null) {
502            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
503        }
504    }
505
506    public int getMaxAuditDepth() {
507        return maxAuditDepth;
508    }
509
510    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
511        this.maxAuditDepth = maxAuditDepth;
512        if (audit != null) {
513            audit.setAuditDepth(maxAuditDepth);
514        }
515    }
516
517    public boolean isEnableAudit() {
518        return enableAudit;
519    }
520
521    public synchronized void setEnableAudit(boolean enableAudit) {
522        this.enableAudit = enableAudit;
523        if (enableAudit && audit == null) {
524            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
525        }
526    }
527
528    // Implementation methods
529    // -------------------------------------------------------------------------
530    @Override
531    public boolean isFull() {
532        return getDispatchedQueueSize() >= info.getPrefetchSize();
533    }
534
535    @Override
536    public int getInFlightSize() {
537        return getDispatchedQueueSize();
538    }
539
540    /**
541     * @return true when 60% or more room is left for dispatching messages
542     */
543    @Override
544    public boolean isLowWaterMark() {
545        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
546    }
547
548    /**
549     * @return true when 10% or less room is left for dispatching messages
550     */
551    @Override
552    public boolean isHighWaterMark() {
553        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
554    }
555
556    /**
557     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
558     */
559    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
560        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
561    }
562
563    /**
564     * @return the memoryUsageHighWaterMark
565     */
566    public int getMemoryUsageHighWaterMark() {
567        return this.memoryUsageHighWaterMark;
568    }
569
570    /**
571     * @return the usageManager
572     */
573    public SystemUsage getUsageManager() {
574        return this.usageManager;
575    }
576
577    /**
578     * @return the matched
579     */
580    public PendingMessageCursor getMatched() {
581        return this.matched;
582    }
583
584    /**
585     * @param matched the matched to set
586     */
587    public void setMatched(PendingMessageCursor matched) {
588        this.matched = matched;
589    }
590
591    /**
592     * inform the MessageConsumer on the client to change it's prefetch
593     *
594     * @param newPrefetch
595     */
596    @Override
597    public void updateConsumerPrefetch(int newPrefetch) {
598        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
599            ConsumerControl cc = new ConsumerControl();
600            cc.setConsumerId(info.getConsumerId());
601            cc.setPrefetch(newPrefetch);
602            context.getConnection().dispatchAsync(cc);
603        }
604    }
605
606    private void dispatchMatched() throws IOException {
607        synchronized (matchedListMutex) {
608            if (!matched.isEmpty() && !isFull()) {
609                try {
610                    matched.reset();
611
612                    while (matched.hasNext() && !isFull()) {
613                        MessageReference message = matched.next();
614                        message.decrementReferenceCount();
615                        matched.remove();
616                        // Message may have been sitting in the matched list a while
617                        // waiting for the consumer to ak the message.
618                        if (message.isExpired()) {
619                            discard(message);
620                            continue; // just drop it.
621                        }
622                        dispatch(message);
623                    }
624                } finally {
625                    matched.release();
626                }
627            }
628        }
629    }
630
631    private void dispatch(final MessageReference node) throws IOException {
632        Message message = node != null ? node.getMessage() : null;
633        if (node != null) {
634            node.incrementReferenceCount();
635        }
636        // Make sure we can dispatch a message.
637        MessageDispatch md = new MessageDispatch();
638        md.setMessage(message);
639        md.setConsumerId(info.getConsumerId());
640        if (node != null) {
641            md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
642            synchronized(dispatchLock) {
643                getSubscriptionStatistics().getDispatched().increment();
644                dispatched.add(node);
645                getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
646            }
647
648            // Keep track if this subscription is receiving messages from a single destination.
649            if (singleDestination) {
650                if (destination == null) {
651                    destination = (Destination)node.getRegionDestination();
652                } else {
653                    if (destination != node.getRegionDestination()) {
654                        singleDestination = false;
655                    }
656                }
657            }
658        }
659        if (info.isDispatchAsync()) {
660            if (node != null) {
661                md.setTransmitCallback(new TransmitCallback() {
662
663                    @Override
664                    public void onSuccess() {
665                        Destination regionDestination = (Destination) node.getRegionDestination();
666                        regionDestination.getDestinationStatistics().getDispatched().increment();
667                        regionDestination.getDestinationStatistics().getInflight().increment();
668                        node.decrementReferenceCount();
669                    }
670
671                    @Override
672                    public void onFailure() {
673                        Destination regionDestination = (Destination) node.getRegionDestination();
674                        regionDestination.getDestinationStatistics().getDispatched().increment();
675                        regionDestination.getDestinationStatistics().getInflight().increment();
676                        node.decrementReferenceCount();
677                    }
678                });
679            }
680            context.getConnection().dispatchAsync(md);
681        } else {
682            context.getConnection().dispatchSync(md);
683            if (node != null) {
684                Destination regionDestination = (Destination) node.getRegionDestination();
685                regionDestination.getDestinationStatistics().getDispatched().increment();
686                regionDestination.getDestinationStatistics().getInflight().increment();
687                node.decrementReferenceCount();
688            }
689        }
690    }
691
692    private void discard(MessageReference message) {
693        discarding = true;
694        try {
695            message.decrementReferenceCount();
696            matched.remove(message);
697            discarded++;
698            if (destination != null) {
699                destination.getDestinationStatistics().getDequeues().increment();
700            }
701            LOG.debug("{}, discarding message {}", this, message);
702            Destination dest = (Destination) message.getRegionDestination();
703            if (dest != null) {
704                dest.messageDiscarded(getContext(), this, message);
705            }
706            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
707        } finally {
708            discarding = false;
709        }
710    }
711
712    @Override
713    public String toString() {
714        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
715                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
716    }
717
718    @Override
719    public void destroy() {
720        this.active=false;
721        synchronized (matchedListMutex) {
722            try {
723                matched.destroy();
724            } catch (Exception e) {
725                LOG.warn("Failed to destroy cursor", e);
726            }
727        }
728        setSlowConsumer(false);
729        synchronized(dispatchLock) {
730            dispatched.clear();
731        }
732    }
733
734    @Override
735    public int getPrefetchSize() {
736        return info.getPrefetchSize();
737    }
738
739    @Override
740    public void setPrefetchSize(int newSize) {
741        info.setPrefetchSize(newSize);
742        try {
743            dispatchMatched();
744        } catch(Exception e) {
745            LOG.trace("Caught exception on dispatch after prefetch size change.");
746        }
747    }
748}