001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import javax.jms.JMSException;
029
030import org.apache.activemq.broker.Broker;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034import org.apache.activemq.command.ConsumerControl;
035import org.apache.activemq.command.ConsumerInfo;
036import org.apache.activemq.command.Message;
037import org.apache.activemq.command.MessageAck;
038import org.apache.activemq.command.MessageDispatch;
039import org.apache.activemq.command.MessageDispatchNotification;
040import org.apache.activemq.command.MessageId;
041import org.apache.activemq.command.MessagePull;
042import org.apache.activemq.command.Response;
043import org.apache.activemq.thread.Scheduler;
044import org.apache.activemq.transaction.Synchronization;
045import org.apache.activemq.transport.TransmitCallback;
046import org.apache.activemq.usage.SystemUsage;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A subscription that honors the pre-fetch option of the ConsumerInfo.
052 */
053public abstract class PrefetchSubscription extends AbstractSubscription {
054
055    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056    protected final Scheduler scheduler;
057
058    protected PendingMessageCursor pending;
059    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060    protected final AtomicInteger prefetchExtension = new AtomicInteger();
061    protected boolean usePrefetchExtension = true;
062    private int maxProducersToAudit=32;
063    private int maxAuditDepth=2048;
064    protected final SystemUsage usageManager;
065    protected final Object pendingLock = new Object();
066    protected final Object dispatchLock = new Object();
067    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
068
069    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
070        super(broker,context, info);
071        this.usageManager=usageManager;
072        pending = cursor;
073        try {
074            pending.start();
075        } catch (Exception e) {
076            throw new JMSException(e.getMessage());
077        }
078        this.scheduler = broker.getScheduler();
079    }
080
081    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
082        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
083    }
084
085    /**
086     * Allows a message to be pulled on demand by a client
087     */
088    @Override
089    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
090        // The slave should not deliver pull messages.
091        // TODO: when the slave becomes a master, He should send a NULL message to all the
092        // consumers to 'wake them up' in case they were waiting for a message.
093        if (getPrefetchSize() == 0) {
094            prefetchExtension.set(pull.getQuantity());
095            final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount();
096
097            // Have the destination push us some messages.
098            for (Destination dest : destinations) {
099                dest.iterate();
100            }
101            dispatchPending();
102
103            synchronized(this) {
104                // If there was nothing dispatched.. we may need to setup a timeout.
105                if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
106                    // immediate timeout used by receiveNoWait()
107                    if (pull.getTimeout() == -1) {
108                        // Null message indicates the pull is done or did not have pending.
109                        prefetchExtension.set(1);
110                        add(QueueMessageReference.NULL_MESSAGE);
111                        dispatchPending();
112                    }
113                    if (pull.getTimeout() > 0) {
114                        scheduler.executeAfterDelay(new Runnable() {
115                            @Override
116                            public void run() {
117                                pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
118                            }
119                        }, pull.getTimeout());
120                    }
121                }
122            }
123        }
124        return null;
125    }
126
127    /**
128     * Occurs when a pull times out. If nothing has been dispatched since the
129     * timeout was setup, then send the NULL message.
130     */
131    final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
132        synchronized (pendingLock) {
133            if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) {
134                try {
135                    prefetchExtension.set(1);
136                    add(QueueMessageReference.NULL_MESSAGE);
137                    dispatchPending();
138                } catch (Exception e) {
139                    context.getConnection().serviceException(e);
140                } finally {
141                    prefetchExtension.set(0);
142                }
143            }
144        }
145    }
146
147    @Override
148    public void add(MessageReference node) throws Exception {
149        synchronized (pendingLock) {
150            // The destination may have just been removed...
151            if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
152                // perhaps we should inform the caller that we are no longer valid to dispatch to?
153                return;
154            }
155
156            // Don't increment for the pullTimeout control message.
157            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
158                getSubscriptionStatistics().getEnqueues().increment();
159            }
160            pending.addMessageLast(node);
161        }
162        dispatchPending();
163    }
164
165    @Override
166    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
167        synchronized(pendingLock) {
168            try {
169                pending.reset();
170                while (pending.hasNext()) {
171                    MessageReference node = pending.next();
172                    node.decrementReferenceCount();
173                    if (node.getMessageId().equals(mdn.getMessageId())) {
174                        // Synchronize between dispatched list and removal of messages from pending list
175                        // related to remove subscription action
176                        synchronized(dispatchLock) {
177                            pending.remove();
178                            createMessageDispatch(node, node.getMessage());
179                            dispatched.add(node);
180                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
181                            onDispatch(node, node.getMessage());
182                        }
183                        return;
184                    }
185                }
186            } finally {
187                pending.release();
188            }
189        }
190        throw new JMSException(
191                "Slave broker out of sync with master: Dispatched message ("
192                        + mdn.getMessageId() + ") was not in the pending list for "
193                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
194    }
195
196    @Override
197    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
198        // Handle the standard acknowledgment case.
199        boolean callDispatchMatched = false;
200        Destination destination = null;
201
202        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
203            // suppress unexpected ack exception in this expected case
204            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
205            return;
206        }
207
208        LOG.trace("ack: {}", ack);
209
210        synchronized(dispatchLock) {
211            if (ack.isStandardAck()) {
212                // First check if the ack matches the dispatched. When using failover this might
213                // not be the case. We don't ever want to ack the wrong messages.
214                assertAckMatchesDispatched(ack);
215
216                // Acknowledge all dispatched messages up till the message id of
217                // the acknowledgment.
218                boolean inAckRange = false;
219                List<MessageReference> removeList = new ArrayList<MessageReference>();
220                for (final MessageReference node : dispatched) {
221                    MessageId messageId = node.getMessageId();
222                    if (ack.getFirstMessageId() == null
223                            || ack.getFirstMessageId().equals(messageId)) {
224                        inAckRange = true;
225                    }
226                    if (inAckRange) {
227                        // Don't remove the nodes until we are committed.
228                        if (!context.isInTransaction()) {
229                            getSubscriptionStatistics().getDequeues().increment();
230                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
231                            removeList.add(node);
232                        } else {
233                            registerRemoveSync(context, node);
234                        }
235                        acknowledge(context, ack, node);
236                        if (ack.getLastMessageId().equals(messageId)) {
237                            destination = (Destination) node.getRegionDestination();
238                            callDispatchMatched = true;
239                            break;
240                        }
241                    }
242                }
243                for (final MessageReference node : removeList) {
244                    dispatched.remove(node);
245                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
246                }
247                // this only happens after a reconnect - get an ack which is not
248                // valid
249                if (!callDispatchMatched) {
250                    LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
251                }
252            } else if (ack.isIndividualAck()) {
253                // Message was delivered and acknowledge - but only delete the
254                // individual message
255                for (final MessageReference node : dispatched) {
256                    MessageId messageId = node.getMessageId();
257                    if (ack.getLastMessageId().equals(messageId)) {
258                        // Don't remove the nodes until we are committed - immediateAck option
259                        if (!context.isInTransaction()) {
260                            getSubscriptionStatistics().getDequeues().increment();
261                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
262                            dispatched.remove(node);
263                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
264                        } else {
265                            registerRemoveSync(context, node);
266                        }
267
268                        if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
269                            // allow transaction batch to exceed prefetch
270                            while (true) {
271                                int currentExtension = prefetchExtension.get();
272                                int newExtension = Math.max(currentExtension, currentExtension + 1);
273                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
274                                    break;
275                                }
276                            }
277                        }
278
279                        acknowledge(context, ack, node);
280                        destination = (Destination) node.getRegionDestination();
281                        callDispatchMatched = true;
282                        break;
283                    }
284                }
285            }else if (ack.isDeliveredAck()) {
286                // Message was delivered but not acknowledged: update pre-fetch
287                // counters.
288                int index = 0;
289                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
290                    final MessageReference node = iter.next();
291                    Destination nodeDest = (Destination) node.getRegionDestination();
292                    if (ack.getLastMessageId().equals(node.getMessageId())) {
293                        if (usePrefetchExtension && getPrefetchSize() != 0) {
294                            // allow  batch to exceed prefetch
295                            while (true) {
296                                int currentExtension = prefetchExtension.get();
297                                int newExtension = Math.max(currentExtension, index + 1);
298                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
299                                    break;
300                                }
301                            }
302                        }
303                        destination = nodeDest;
304                        callDispatchMatched = true;
305                        break;
306                    }
307                }
308                if (!callDispatchMatched) {
309                    throw new JMSException(
310                            "Could not correlate acknowledgment with dispatched message: "
311                                    + ack);
312                }
313            } else if (ack.isExpiredAck()) {
314                // Message was expired
315                int index = 0;
316                boolean inAckRange = false;
317                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
318                    final MessageReference node = iter.next();
319                    Destination nodeDest = (Destination) node.getRegionDestination();
320                    MessageId messageId = node.getMessageId();
321                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
322                        inAckRange = true;
323                    }
324                    if (inAckRange) {
325                        Destination regionDestination = nodeDest;
326                        if (broker.isExpired(node)) {
327                            regionDestination.messageExpired(context, this, node);
328                        }
329                        iter.remove();
330                        nodeDest.getDestinationStatistics().getInflight().decrement();
331
332                        if (ack.getLastMessageId().equals(messageId)) {
333                            if (usePrefetchExtension && getPrefetchSize() != 0) {
334                                // allow  batch to exceed prefetch
335                                while (true) {
336                                    int currentExtension = prefetchExtension.get();
337                                    int newExtension = Math.max(currentExtension, index + 1);
338                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
339                                        break;
340                                    }
341                                }
342                            }
343
344                            destination = (Destination) node.getRegionDestination();
345                            callDispatchMatched = true;
346                            break;
347                        }
348                    }
349                }
350                if (!callDispatchMatched) {
351                    throw new JMSException(
352                            "Could not correlate expiration acknowledgment with dispatched message: "
353                                    + ack);
354                }
355            } else if (ack.isRedeliveredAck()) {
356                // Message was re-delivered but it was not yet considered to be
357                // a DLQ message.
358                boolean inAckRange = false;
359                for (final MessageReference node : dispatched) {
360                    MessageId messageId = node.getMessageId();
361                    if (ack.getFirstMessageId() == null
362                            || ack.getFirstMessageId().equals(messageId)) {
363                        inAckRange = true;
364                    }
365                    if (inAckRange) {
366                        if (ack.getLastMessageId().equals(messageId)) {
367                            destination = (Destination) node.getRegionDestination();
368                            callDispatchMatched = true;
369                            break;
370                        }
371                    }
372                }
373                if (!callDispatchMatched) {
374                    throw new JMSException(
375                            "Could not correlate acknowledgment with dispatched message: "
376                                    + ack);
377                }
378            } else if (ack.isPoisonAck()) {
379                // TODO: what if the message is already in a DLQ???
380                // Handle the poison ACK case: we need to send the message to a
381                // DLQ
382                if (ack.isInTransaction()) {
383                    throw new JMSException("Poison ack cannot be transacted: "
384                            + ack);
385                }
386                int index = 0;
387                boolean inAckRange = false;
388                List<MessageReference> removeList = new ArrayList<MessageReference>();
389                for (final MessageReference node : dispatched) {
390                    MessageId messageId = node.getMessageId();
391                    if (ack.getFirstMessageId() == null
392                            || ack.getFirstMessageId().equals(messageId)) {
393                        inAckRange = true;
394                    }
395                    if (inAckRange) {
396                        sendToDLQ(context, node, ack.getPoisonCause());
397                        Destination nodeDest = (Destination) node.getRegionDestination();
398                        nodeDest.getDestinationStatistics()
399                        .getInflight().decrement();
400                        removeList.add(node);
401                        getSubscriptionStatistics().getDequeues().increment();
402                        index++;
403                        acknowledge(context, ack, node);
404                        if (ack.getLastMessageId().equals(messageId)) {
405                            while (true) {
406                                int currentExtension = prefetchExtension.get();
407                                int newExtension = Math.max(0, currentExtension - (index + 1));
408                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
409                                    break;
410                                }
411                            }
412                            destination = nodeDest;
413                            callDispatchMatched = true;
414                            break;
415                        }
416                    }
417                }
418                for (final MessageReference node : removeList) {
419                    dispatched.remove(node);
420                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
421                }
422                if (!callDispatchMatched) {
423                    throw new JMSException(
424                            "Could not correlate acknowledgment with dispatched message: "
425                                    + ack);
426                }
427            }
428        }
429        if (callDispatchMatched && destination != null) {
430            destination.wakeup();
431            dispatchPending();
432
433            if (pending.isEmpty()) {
434                for (Destination dest : destinations) {
435                    dest.wakeup();
436                }
437            }
438        } else {
439            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
440        }
441    }
442
443    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
444        // setup a Synchronization to remove nodes from the
445        // dispatched list.
446        context.getTransaction().addSynchronization(
447                new Synchronization() {
448
449                    @Override
450                    public void beforeEnd() {
451                        if (usePrefetchExtension && getPrefetchSize() != 0) {
452                            while (true) {
453                                int currentExtension = prefetchExtension.get();
454                                int newExtension = Math.max(0, currentExtension - 1);
455                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
456                                    break;
457                                }
458                            }
459                        }
460                    }
461
462                    @Override
463                    public void afterCommit()
464                            throws Exception {
465                        Destination nodeDest = (Destination) node.getRegionDestination();
466                        synchronized(dispatchLock) {
467                            getSubscriptionStatistics().getDequeues().increment();
468                            dispatched.remove(node);
469                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
470                            nodeDest.getDestinationStatistics().getInflight().decrement();
471                        }
472                        nodeDest.wakeup();
473                        dispatchPending();
474                    }
475
476                    @Override
477                    public void afterRollback() throws Exception {
478                        synchronized(dispatchLock) {
479                            // poisionAck will decrement - otherwise still inflight on client
480                        }
481                    }
482                });
483    }
484
485    /**
486     * Checks an ack versus the contents of the dispatched list.
487     *  called with dispatchLock held
488     * @param ack
489     * @throws JMSException if it does not match
490     */
491    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
492        MessageId firstAckedMsg = ack.getFirstMessageId();
493        MessageId lastAckedMsg = ack.getLastMessageId();
494        int checkCount = 0;
495        boolean checkFoundStart = false;
496        boolean checkFoundEnd = false;
497        for (MessageReference node : dispatched) {
498
499            if (firstAckedMsg == null) {
500                checkFoundStart = true;
501            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
502                checkFoundStart = true;
503            }
504
505            if (checkFoundStart) {
506                checkCount++;
507            }
508
509            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
510                checkFoundEnd = true;
511                break;
512            }
513        }
514        if (!checkFoundStart && firstAckedMsg != null)
515            throw new JMSException("Unmatched acknowledge: " + ack
516                    + "; Could not find Message-ID " + firstAckedMsg
517                    + " in dispatched-list (start of ack)");
518        if (!checkFoundEnd && lastAckedMsg != null)
519            throw new JMSException("Unmatched acknowledge: " + ack
520                    + "; Could not find Message-ID " + lastAckedMsg
521                    + " in dispatched-list (end of ack)");
522        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
523            throw new JMSException("Unmatched acknowledge: " + ack
524                    + "; Expected message count (" + ack.getMessageCount()
525                    + ") differs from count in dispatched-list (" + checkCount
526                    + ")");
527        }
528    }
529
530    /**
531     *
532     * @param context
533     * @param node
534     * @param poisonCause
535     * @throws IOException
536     * @throws Exception
537     */
538    protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
539        broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
540    }
541
542    @Override
543    public int getInFlightSize() {
544        return dispatched.size();
545    }
546
547    /**
548     * Used to determine if the broker can dispatch to the consumer.
549     *
550     * @return true if the subscription is full
551     */
552    @Override
553    public boolean isFull() {
554        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
555    }
556
557    /**
558     * @return true when 60% or more room is left for dispatching messages
559     */
560    @Override
561    public boolean isLowWaterMark() {
562        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
563    }
564
565    /**
566     * @return true when 10% or less room is left for dispatching messages
567     */
568    @Override
569    public boolean isHighWaterMark() {
570        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
571    }
572
573    @Override
574    public int countBeforeFull() {
575        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
576    }
577
578    @Override
579    public int getPendingQueueSize() {
580        return pending.size();
581    }
582
583    @Override
584    public long getPendingMessageSize() {
585        synchronized (pendingLock) {
586            return pending.messageSize();
587        }
588    }
589
590    @Override
591    public int getDispatchedQueueSize() {
592        return dispatched.size();
593    }
594
595    @Override
596    public long getDequeueCounter() {
597        return getSubscriptionStatistics().getDequeues().getCount();
598    }
599
600    @Override
601    public long getDispatchedCounter() {
602        return getSubscriptionStatistics().getDispatched().getCount();
603    }
604
605    @Override
606    public long getEnqueueCounter() {
607        return getSubscriptionStatistics().getEnqueues().getCount();
608    }
609
610    @Override
611    public boolean isRecoveryRequired() {
612        return pending.isRecoveryRequired();
613    }
614
615    public PendingMessageCursor getPending() {
616        return this.pending;
617    }
618
619    public void setPending(PendingMessageCursor pending) {
620        this.pending = pending;
621        if (this.pending!=null) {
622            this.pending.setSystemUsage(usageManager);
623            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
624        }
625    }
626
627    @Override
628    public void add(ConnectionContext context, Destination destination) throws Exception {
629        synchronized(pendingLock) {
630            super.add(context, destination);
631            pending.add(context, destination);
632        }
633    }
634
635    @Override
636    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
637        return remove(context, destination, dispatched);
638    }
639
640    public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
641        LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
642        synchronized(pendingLock) {
643            super.remove(context, destination);
644            // Here is a potential problem concerning Inflight stat:
645            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
646            // Except if each commit or rollback callback action comes before remove of subscriber.
647            redispatch.addAll(pending.remove(context, destination));
648
649            if (dispatched == null) {
650                return redispatch;
651            }
652
653            // Synchronized to DispatchLock if necessary
654            if (dispatched == this.dispatched) {
655                synchronized(dispatchLock) {
656                    addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
657                }
658            } else {
659                addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
660            }
661        }
662
663        return redispatch;
664    }
665
666    private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
667        ArrayList<MessageReference> references = new ArrayList<MessageReference>();
668        for (MessageReference r : dispatched) {
669            if (r.getRegionDestination() == destination) {
670                references.add(r);
671                getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
672            }
673        }
674        redispatch.addAll(0, references);
675        destination.getDestinationStatistics().getInflight().subtract(references.size());
676        dispatched.removeAll(references);
677    }
678
679    // made public so it can be used in MQTTProtocolConverter
680    public void dispatchPending() throws IOException {
681        List<Destination> slowConsumerTargets = null;
682
683        synchronized(pendingLock) {
684            try {
685                int numberToDispatch = countBeforeFull();
686                if (numberToDispatch > 0) {
687                    setSlowConsumer(false);
688                    setPendingBatchSize(pending, numberToDispatch);
689                    int count = 0;
690                    pending.reset();
691                    while (pending.hasNext() && !isFull() && count < numberToDispatch) {
692                        MessageReference node = pending.next();
693                        if (node == null) {
694                            break;
695                        }
696
697                        // Synchronize between dispatched list and remove of message from pending list
698                        // related to remove subscription action
699                        synchronized(dispatchLock) {
700                            pending.remove();
701                            if (!isDropped(node) && canDispatch(node)) {
702
703                                // Message may have been sitting in the pending
704                                // list a while waiting for the consumer to ak the message.
705                                if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
706                                    //increment number to dispatch
707                                    numberToDispatch++;
708                                    if (broker.isExpired(node)) {
709                                        ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
710                                    }
711
712                                    if (!isBrowser()) {
713                                        node.decrementReferenceCount();
714                                        continue;
715                                    }
716                                }
717                                dispatch(node);
718                                count++;
719                            }
720                        }
721                        // decrement after dispatch has taken ownership to avoid usage jitter
722                        node.decrementReferenceCount();
723                    }
724                } else if (!isSlowConsumer()) {
725                    setSlowConsumer(true);
726                    slowConsumerTargets = destinations;
727                }
728            } finally {
729                pending.release();
730            }
731        }
732
733        if (slowConsumerTargets != null) {
734            for (Destination dest : slowConsumerTargets) {
735                dest.slowConsumer(context, this);
736            }
737        }
738    }
739
740    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
741        pending.setMaxBatchSize(numberToDispatch);
742    }
743
744    // called with dispatchLock held
745    protected boolean dispatch(final MessageReference node) throws IOException {
746        final Message message = node.getMessage();
747        if (message == null) {
748            return false;
749        }
750
751        okForAckAsDispatchDone.countDown();
752
753        MessageDispatch md = createMessageDispatch(node, message);
754        if (node != QueueMessageReference.NULL_MESSAGE) {
755            getSubscriptionStatistics().getDispatched().increment();
756            dispatched.add(node);
757            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
758        }
759        if (getPrefetchSize() == 0) {
760            while (true) {
761                int currentExtension = prefetchExtension.get();
762                int newExtension = Math.max(0, currentExtension - 1);
763                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
764                    break;
765                }
766            }
767        }
768        if (info.isDispatchAsync()) {
769            md.setTransmitCallback(new TransmitCallback() {
770
771                @Override
772                public void onSuccess() {
773                    // Since the message gets queued up in async dispatch, we don't want to
774                    // decrease the reference count until it gets put on the wire.
775                    onDispatch(node, message);
776                }
777
778                @Override
779                public void onFailure() {
780                    Destination nodeDest = (Destination) node.getRegionDestination();
781                    if (nodeDest != null) {
782                        if (node != QueueMessageReference.NULL_MESSAGE) {
783                            nodeDest.getDestinationStatistics().getDispatched().increment();
784                            nodeDest.getDestinationStatistics().getInflight().increment();
785                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
786                        }
787                    }
788                    if (node instanceof QueueMessageReference) {
789                        ((QueueMessageReference) node).unlock();
790                    }
791                }
792            });
793            context.getConnection().dispatchAsync(md);
794        } else {
795            context.getConnection().dispatchSync(md);
796            onDispatch(node, message);
797        }
798        return true;
799    }
800
801    protected void onDispatch(final MessageReference node, final Message message) {
802        Destination nodeDest = (Destination) node.getRegionDestination();
803        if (nodeDest != null) {
804            if (node != QueueMessageReference.NULL_MESSAGE) {
805                nodeDest.getDestinationStatistics().getDispatched().increment();
806                nodeDest.getDestinationStatistics().getInflight().increment();
807                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
808            }
809        }
810
811        if (info.isDispatchAsync()) {
812            try {
813                dispatchPending();
814            } catch (IOException e) {
815                context.getConnection().serviceExceptionAsync(e);
816            }
817        }
818    }
819
820    /**
821     * inform the MessageConsumer on the client to change it's prefetch
822     *
823     * @param newPrefetch
824     */
825    @Override
826    public void updateConsumerPrefetch(int newPrefetch) {
827        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
828            ConsumerControl cc = new ConsumerControl();
829            cc.setConsumerId(info.getConsumerId());
830            cc.setPrefetch(newPrefetch);
831            context.getConnection().dispatchAsync(cc);
832        }
833    }
834
835    /**
836     * @param node
837     * @param message
838     * @return MessageDispatch
839     */
840    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
841        MessageDispatch md = new MessageDispatch();
842        md.setConsumerId(info.getConsumerId());
843
844        if (node == QueueMessageReference.NULL_MESSAGE) {
845            md.setMessage(null);
846            md.setDestination(null);
847        } else {
848            Destination regionDestination = (Destination) node.getRegionDestination();
849            md.setDestination(regionDestination.getActiveMQDestination());
850            md.setMessage(message);
851            md.setRedeliveryCounter(node.getRedeliveryCounter());
852        }
853
854        return md;
855    }
856
857    /**
858     * Use when a matched message is about to be dispatched to the client.
859     *
860     * @param node
861     * @return false if the message should not be dispatched to the client
862     *         (another sub may have already dispatched it for example).
863     * @throws IOException
864     */
865    protected abstract boolean canDispatch(MessageReference node) throws IOException;
866
867    protected abstract boolean isDropped(MessageReference node);
868
869    /**
870     * Used during acknowledgment to remove the message.
871     *
872     * @throws IOException
873     */
874    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
875
876
877    public int getMaxProducersToAudit() {
878        return maxProducersToAudit;
879    }
880
881    public void setMaxProducersToAudit(int maxProducersToAudit) {
882        this.maxProducersToAudit = maxProducersToAudit;
883        if (this.pending != null) {
884            this.pending.setMaxProducersToAudit(maxProducersToAudit);
885        }
886    }
887
888    public int getMaxAuditDepth() {
889        return maxAuditDepth;
890    }
891
892    public void setMaxAuditDepth(int maxAuditDepth) {
893        this.maxAuditDepth = maxAuditDepth;
894        if (this.pending != null) {
895            this.pending.setMaxAuditDepth(maxAuditDepth);
896        }
897    }
898
899    public boolean isUsePrefetchExtension() {
900        return usePrefetchExtension;
901    }
902
903    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
904        this.usePrefetchExtension = usePrefetchExtension;
905    }
906
907    protected int getPrefetchExtension() {
908        return this.prefetchExtension.get();
909    }
910
911    @Override
912    public void setPrefetchSize(int prefetchSize) {
913        this.info.setPrefetchSize(prefetchSize);
914        try {
915            this.dispatchPending();
916        } catch (Exception e) {
917            LOG.trace("Caught exception during dispatch after prefetch change.", e);
918        }
919    }
920}