001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ProducerBrokerExchange;
035import org.apache.activemq.broker.region.policy.DispatchPolicy;
036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
040import org.apache.activemq.broker.util.InsertionCountList;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ConsumerInfo;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.Message;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageId;
047import org.apache.activemq.command.ProducerAck;
048import org.apache.activemq.command.ProducerInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SubscriptionInfo;
051import org.apache.activemq.filter.MessageEvaluationContext;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.store.MessageRecoveryListener;
054import org.apache.activemq.store.TopicMessageStore;
055import org.apache.activemq.thread.Task;
056import org.apache.activemq.thread.TaskRunner;
057import org.apache.activemq.thread.TaskRunnerFactory;
058import org.apache.activemq.transaction.Synchronization;
059import org.apache.activemq.util.SubscriptionKey;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * The Topic is a destination that sends a copy of a message to every active
065 * Subscription registered.
066 */
067public class Topic extends BaseDestination implements Task {
068    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
069    private final TopicMessageStore topicStore;
070    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
071    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
072    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
073    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
074    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
075    private final TaskRunner taskRunner;
076    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
077    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
078        @Override
079        public void run() {
080            try {
081                Topic.this.taskRunner.wakeup();
082            } catch (InterruptedException e) {
083            }
084        };
085    };
086
087    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
088            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
089        super(brokerService, store, destination, parentStats);
090        this.topicStore = store;
091        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
092        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
093    }
094
095    @Override
096    public void initialize() throws Exception {
097        super.initialize();
098        // set non default subscription recovery policy (override policyEntries)
099        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
100            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
101            setAlwaysRetroactive(true);
102        }
103        if (store != null) {
104            // AMQ-2586: Better to leave this stat at zero than to give the user
105            // misleading metrics.
106            // int messageCount = store.getMessageCount();
107            // destinationStatistics.getMessages().setCount(messageCount);
108            store.start();
109        }
110    }
111
112    @Override
113    public List<Subscription> getConsumers() {
114        synchronized (consumers) {
115            return new ArrayList<Subscription>(consumers);
116        }
117    }
118
119    public boolean lock(MessageReference node, LockOwner sub) {
120        return true;
121    }
122
123    @Override
124    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
125        if (!sub.getConsumerInfo().isDurable()) {
126
127            // Do a retroactive recovery if needed.
128            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
129
130                // synchronize with dispatch method so that no new messages are sent
131                // while we are recovering a subscription to avoid out of order messages.
132                dispatchLock.writeLock().lock();
133                try {
134                    boolean applyRecovery = false;
135                    synchronized (consumers) {
136                        if (!consumers.contains(sub)){
137                            sub.add(context, this);
138                            consumers.add(sub);
139                            applyRecovery=true;
140                            super.addSubscription(context, sub);
141                        }
142                    }
143                    if (applyRecovery){
144                        subscriptionRecoveryPolicy.recover(context, this, sub);
145                    }
146                } finally {
147                    dispatchLock.writeLock().unlock();
148                }
149
150            } else {
151                synchronized (consumers) {
152                    if (!consumers.contains(sub)){
153                        sub.add(context, this);
154                        consumers.add(sub);
155                        super.addSubscription(context, sub);
156                    }
157                }
158            }
159        } else {
160            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
161            super.addSubscription(context, sub);
162            sub.add(context, this);
163            if(dsub.isActive()) {
164                synchronized (consumers) {
165                    boolean hasSubscription = false;
166
167                    if (consumers.size() == 0) {
168                        hasSubscription = false;
169                    } else {
170                        for (Subscription currentSub : consumers) {
171                            if (currentSub.getConsumerInfo().isDurable()) {
172                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
173                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
174                                    hasSubscription = true;
175                                    break;
176                                }
177                            }
178                        }
179                    }
180
181                    if (!hasSubscription) {
182                        consumers.add(sub);
183                    }
184                }
185            }
186            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
187        }
188    }
189
190    @Override
191    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
192        if (!sub.getConsumerInfo().isDurable()) {
193            super.removeSubscription(context, sub, lastDeliveredSequenceId);
194            synchronized (consumers) {
195                consumers.remove(sub);
196            }
197        }
198        sub.remove(context, this);
199    }
200
201    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
202        if (topicStore != null) {
203            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
204            DurableTopicSubscription removed = durableSubscribers.remove(key);
205            if (removed != null) {
206                destinationStatistics.getConsumers().decrement();
207                // deactivate and remove
208                removed.deactivate(false, 0l);
209                consumers.remove(removed);
210            }
211        }
212    }
213
214    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) {
215        if (hasSelectorChanged(info1, info2)) {
216            return true;
217        }
218
219        return hasNoLocalChanged(info1, info2);
220    }
221
222    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) {
223        // Prior to V11 the broker did not store the noLocal value for durable subs.
224        if (brokerService.getStoreOpenWireVersion() >= 11) {
225            if (info1.isNoLocal() ^ info2.isNoLocal()) {
226                return true;
227            }
228        }
229
230        return false;
231    }
232
233    private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
234        if (info1.getSelector() != null ^ info2.getSelector() != null) {
235            return true;
236        }
237
238        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
239            return true;
240        }
241
242        return false;
243    }
244
245    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
246        // synchronize with dispatch method so that no new messages are sent
247        // while we are recovering a subscription to avoid out of order messages.
248        dispatchLock.writeLock().lock();
249        try {
250
251            if (topicStore == null) {
252                return;
253            }
254
255            // Recover the durable subscription.
256            String clientId = subscription.getSubscriptionKey().getClientId();
257            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
258            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
259            if (info != null) {
260                // Check to see if selector changed.
261                if (hasDurableSubChanged(info, subscription.getConsumerInfo())) {
262                    // Need to delete the subscription
263                    topicStore.deleteSubscription(clientId, subscriptionName);
264                    info = null;
265                    // Force a rebuild of the selector chain for the subscription otherwise
266                    // the stored subscription is updated but the selector expression is not
267                    // and the subscription will not behave according to the new configuration.
268                    subscription.setSelector(subscription.getConsumerInfo().getSelector());
269                    synchronized (consumers) {
270                        consumers.remove(subscription);
271                    }
272                } else {
273                    synchronized (consumers) {
274                        if (!consumers.contains(subscription)) {
275                            consumers.add(subscription);
276                        }
277                    }
278                }
279            }
280
281            // Do we need to create the subscription?
282            if (info == null) {
283                info = new SubscriptionInfo();
284                info.setClientId(clientId);
285                info.setSelector(subscription.getConsumerInfo().getSelector());
286                info.setSubscriptionName(subscriptionName);
287                info.setDestination(getActiveMQDestination());
288                info.setNoLocal(subscription.getConsumerInfo().isNoLocal());
289                // This destination is an actual destination id.
290                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
291                // This destination might be a pattern
292                synchronized (consumers) {
293                    consumers.add(subscription);
294                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
295                }
296            }
297
298            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
299            msgContext.setDestination(destination);
300            if (subscription.isRecoveryRequired()) {
301                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
302                    @Override
303                    public boolean recoverMessage(Message message) throws Exception {
304                        message.setRegionDestination(Topic.this);
305                        try {
306                            msgContext.setMessageReference(message);
307                            if (subscription.matches(message, msgContext)) {
308                                subscription.add(message);
309                            }
310                        } catch (IOException e) {
311                            LOG.error("Failed to recover this message {}", message, e);
312                        }
313                        return true;
314                    }
315
316                    @Override
317                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
318                        throw new RuntimeException("Should not be called.");
319                    }
320
321                    @Override
322                    public boolean hasSpace() {
323                        return true;
324                    }
325
326                    @Override
327                    public boolean isDuplicate(MessageId id) {
328                        return false;
329                    }
330                });
331            }
332        } finally {
333            dispatchLock.writeLock().unlock();
334        }
335    }
336
337    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
338        synchronized (consumers) {
339            consumers.remove(sub);
340        }
341        sub.remove(context, this, dispatched);
342    }
343
344    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
345        if (subscription.getConsumerInfo().isRetroactive()) {
346            subscriptionRecoveryPolicy.recover(context, this, subscription);
347        }
348    }
349
350    @Override
351    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
352        final ConnectionContext context = producerExchange.getConnectionContext();
353
354        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
355        producerExchange.incrementSend();
356        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
357                && !context.isInRecoveryMode();
358
359        message.setRegionDestination(this);
360
361        // There is delay between the client sending it and it arriving at the
362        // destination.. it may have expired.
363        if (message.isExpired()) {
364            broker.messageExpired(context, message, null);
365            getDestinationStatistics().getExpired().increment();
366            if (sendProducerAck) {
367                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
368                context.getConnection().dispatchAsync(ack);
369            }
370            return;
371        }
372
373        if (memoryUsage.isFull()) {
374            isFull(context, memoryUsage);
375            fastProducer(context, producerInfo);
376
377            if (isProducerFlowControl() && context.isProducerFlowControl()) {
378
379                if (warnOnProducerFlowControl) {
380                    warnOnProducerFlowControl = false;
381                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
382                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
383                }
384
385                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
386                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
387                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
388                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
389                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
390                }
391
392                // We can avoid blocking due to low usage if the producer is sending a sync message or
393                // if it is using a producer window
394                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
395                    synchronized (messagesWaitingForSpace) {
396                        messagesWaitingForSpace.add(new Runnable() {
397                            @Override
398                            public void run() {
399                                try {
400
401                                    // While waiting for space to free up... the
402                                    // message may have expired.
403                                    if (message.isExpired()) {
404                                        broker.messageExpired(context, message, null);
405                                        getDestinationStatistics().getExpired().increment();
406                                    } else {
407                                        doMessageSend(producerExchange, message);
408                                    }
409
410                                    if (sendProducerAck) {
411                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
412                                                .getSize());
413                                        context.getConnection().dispatchAsync(ack);
414                                    } else {
415                                        Response response = new Response();
416                                        response.setCorrelationId(message.getCommandId());
417                                        context.getConnection().dispatchAsync(response);
418                                    }
419
420                                } catch (Exception e) {
421                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
422                                        ExceptionResponse response = new ExceptionResponse(e);
423                                        response.setCorrelationId(message.getCommandId());
424                                        context.getConnection().dispatchAsync(response);
425                                    }
426                                }
427                            }
428                        });
429
430                        registerCallbackForNotFullNotification();
431                        context.setDontSendReponse(true);
432                        return;
433                    }
434
435                } else {
436                    // Producer flow control cannot be used, so we have do the flow control
437                    // at the broker by blocking this thread until there is space available.
438
439                    if (memoryUsage.isFull()) {
440                        if (context.isInTransaction()) {
441
442                            int count = 0;
443                            while (!memoryUsage.waitForSpace(1000)) {
444                                if (context.getStopping().get()) {
445                                    throw new IOException("Connection closed, send aborted.");
446                                }
447                                if (count > 2 && context.isInTransaction()) {
448                                    count = 0;
449                                    int size = context.getTransaction().size();
450                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
451                                }
452                                count++;
453                            }
454                        } else {
455                            waitForSpace(
456                                    context,
457                                    producerExchange,
458                                    memoryUsage,
459                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
460                                            + message.getProducerId()
461                                            + ") to prevent flooding "
462                                            + getActiveMQDestination().getQualifiedName()
463                                            + "."
464                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
465                        }
466                    }
467
468                    // The usage manager could have delayed us by the time
469                    // we unblock the message could have expired..
470                    if (message.isExpired()) {
471                        getDestinationStatistics().getExpired().increment();
472                        LOG.debug("Expired message: {}", message);
473                        return;
474                    }
475                }
476            }
477        }
478
479        doMessageSend(producerExchange, message);
480        messageDelivered(context, message);
481        if (sendProducerAck) {
482            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
483            context.getConnection().dispatchAsync(ack);
484        }
485    }
486
487    /**
488     * do send the message - this needs to be synchronized to ensure messages
489     * are stored AND dispatched in the right order
490     *
491     * @param producerExchange
492     * @param message
493     * @throws IOException
494     * @throws Exception
495     */
496    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
497            throws IOException, Exception {
498        final ConnectionContext context = producerExchange.getConnectionContext();
499        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
500        Future<Object> result = null;
501
502        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
503            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
504                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
505                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
506                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
507                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
508                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
509                    throw new javax.jms.ResourceAllocationException(logMessage);
510                }
511
512                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
513            }
514            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
515
516            if (isReduceMemoryFootprint()) {
517                message.clearMarshalledState();
518            }
519        }
520
521        message.incrementReferenceCount();
522
523        if (context.isInTransaction()) {
524            context.getTransaction().addSynchronization(new Synchronization() {
525                @Override
526                public void afterCommit() throws Exception {
527                    // It could take while before we receive the commit
528                    // operation.. by that time the message could have
529                    // expired..
530                    if (message.isExpired()) {
531                        if (broker.isExpired(message)) {
532                            getDestinationStatistics().getExpired().increment();
533                            broker.messageExpired(context, message, null);
534                        }
535                        message.decrementReferenceCount();
536                        return;
537                    }
538                    try {
539                        dispatch(context, message);
540                    } finally {
541                        message.decrementReferenceCount();
542                    }
543                }
544
545                @Override
546                public void afterRollback() throws Exception {
547                    message.decrementReferenceCount();
548                }
549            });
550
551        } else {
552            try {
553                dispatch(context, message);
554            } finally {
555                message.decrementReferenceCount();
556            }
557        }
558
559        if (result != null && !result.isCancelled()) {
560            try {
561                result.get();
562            } catch (CancellationException e) {
563                // ignore - the task has been cancelled if the message
564                // has already been deleted
565            }
566        }
567    }
568
569    private boolean canOptimizeOutPersistence() {
570        return durableSubscribers.size() == 0;
571    }
572
573    @Override
574    public String toString() {
575        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
576    }
577
578    @Override
579    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
580            final MessageReference node) throws IOException {
581        if (topicStore != null && node.isPersistent()) {
582            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
583            SubscriptionKey key = dsub.getSubscriptionKey();
584            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
585                    convertToNonRangedAck(ack, node));
586        }
587        messageConsumed(context, node);
588    }
589
590    @Override
591    public void gc() {
592    }
593
594    public Message loadMessage(MessageId messageId) throws IOException {
595        return topicStore != null ? topicStore.getMessage(messageId) : null;
596    }
597
598    @Override
599    public void start() throws Exception {
600        this.subscriptionRecoveryPolicy.start();
601        if (memoryUsage != null) {
602            memoryUsage.start();
603        }
604
605        if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
606            scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
607        }
608    }
609
610    @Override
611    public void stop() throws Exception {
612        if (taskRunner != null) {
613            taskRunner.shutdown();
614        }
615        this.subscriptionRecoveryPolicy.stop();
616        if (memoryUsage != null) {
617            memoryUsage.stop();
618        }
619        if (this.topicStore != null) {
620            this.topicStore.stop();
621        }
622
623         scheduler.cancel(expireMessagesTask);
624    }
625
626    @Override
627    public Message[] browse() {
628        final List<Message> result = new ArrayList<Message>();
629        doBrowse(result, getMaxBrowsePageSize());
630        return result.toArray(new Message[result.size()]);
631    }
632
633    private void doBrowse(final List<Message> browseList, final int max) {
634        try {
635            if (topicStore != null) {
636                final List<Message> toExpire = new ArrayList<Message>();
637                topicStore.recover(new MessageRecoveryListener() {
638                    @Override
639                    public boolean recoverMessage(Message message) throws Exception {
640                        if (message.isExpired()) {
641                            toExpire.add(message);
642                        }
643                        browseList.add(message);
644                        return true;
645                    }
646
647                    @Override
648                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
649                        return true;
650                    }
651
652                    @Override
653                    public boolean hasSpace() {
654                        return browseList.size() < max;
655                    }
656
657                    @Override
658                    public boolean isDuplicate(MessageId id) {
659                        return false;
660                    }
661                });
662                final ConnectionContext connectionContext = createConnectionContext();
663                for (Message message : toExpire) {
664                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
665                        if (!sub.isActive()) {
666                            message.setRegionDestination(this);
667                            messageExpired(connectionContext, sub, message);
668                        }
669                    }
670                }
671                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
672                if (msgs != null) {
673                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
674                        browseList.add(msgs[i]);
675                    }
676                }
677            }
678        } catch (Throwable e) {
679            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
680        }
681    }
682
683    @Override
684    public boolean iterate() {
685        synchronized (messagesWaitingForSpace) {
686            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
687                Runnable op = messagesWaitingForSpace.removeFirst();
688                op.run();
689            }
690
691            if (!messagesWaitingForSpace.isEmpty()) {
692                registerCallbackForNotFullNotification();
693            }
694        }
695        return false;
696    }
697
698    private void registerCallbackForNotFullNotification() {
699        // If the usage manager is not full, then the task will not
700        // get called..
701        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
702            // so call it directly here.
703            sendMessagesWaitingForSpaceTask.run();
704        }
705    }
706
707    // Properties
708    // -------------------------------------------------------------------------
709
710    public DispatchPolicy getDispatchPolicy() {
711        return dispatchPolicy;
712    }
713
714    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
715        this.dispatchPolicy = dispatchPolicy;
716    }
717
718    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
719        return subscriptionRecoveryPolicy;
720    }
721
722    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
723        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
724            // allow users to combine retained message policy with other ActiveMQ policies
725            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
726            policy.setWrapped(recoveryPolicy);
727        } else {
728            this.subscriptionRecoveryPolicy = recoveryPolicy;
729        }
730    }
731
732    // Implementation methods
733    // -------------------------------------------------------------------------
734
735    @Override
736    public final void wakeup() {
737    }
738
739    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
740        // AMQ-2586: Better to leave this stat at zero than to give the user
741        // misleading metrics.
742        // destinationStatistics.getMessages().increment();
743        destinationStatistics.getEnqueues().increment();
744        destinationStatistics.getMessageSize().addSize(message.getSize());
745        MessageEvaluationContext msgContext = null;
746
747        dispatchLock.readLock().lock();
748        try {
749            if (!subscriptionRecoveryPolicy.add(context, message)) {
750                return;
751            }
752            synchronized (consumers) {
753                if (consumers.isEmpty()) {
754                    onMessageWithNoConsumers(context, message);
755                    return;
756                }
757            }
758            msgContext = context.getMessageEvaluationContext();
759            msgContext.setDestination(destination);
760            msgContext.setMessageReference(message);
761            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
762                onMessageWithNoConsumers(context, message);
763            }
764
765        } finally {
766            dispatchLock.readLock().unlock();
767            if (msgContext != null) {
768                msgContext.clear();
769            }
770        }
771    }
772
773    private final Runnable expireMessagesTask = new Runnable() {
774        @Override
775        public void run() {
776            List<Message> browsedMessages = new InsertionCountList<Message>();
777            doBrowse(browsedMessages, getMaxExpirePageSize());
778        }
779    };
780
781    @Override
782    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
783        broker.messageExpired(context, reference, subs);
784        // AMQ-2586: Better to leave this stat at zero than to give the user
785        // misleading metrics.
786        // destinationStatistics.getMessages().decrement();
787        destinationStatistics.getExpired().increment();
788        MessageAck ack = new MessageAck();
789        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
790        ack.setDestination(destination);
791        ack.setMessageID(reference.getMessageId());
792        try {
793            if (subs instanceof DurableTopicSubscription) {
794                ((DurableTopicSubscription)subs).removePending(reference);
795            }
796            acknowledge(context, subs, ack, reference);
797        } catch (Exception e) {
798            LOG.error("Failed to remove expired Message from the store ", e);
799        }
800    }
801
802    @Override
803    protected Logger getLog() {
804        return LOG;
805    }
806
807    protected boolean isOptimizeStorage(){
808        boolean result = false;
809
810        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
811                result = true;
812                for (DurableTopicSubscription s : durableSubscribers.values()) {
813                    if (s.isActive()== false){
814                        result = false;
815                        break;
816                    }
817                    if (s.getPrefetchSize()==0){
818                        result = false;
819                        break;
820                    }
821                    if (s.isSlowConsumer()){
822                        result = false;
823                        break;
824                    }
825                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
826                        result = false;
827                        break;
828                    }
829                }
830        }
831        return result;
832    }
833
834    /**
835     * force a reread of the store - after transaction recovery completion
836     */
837    @Override
838    public void clearPendingMessages() {
839        dispatchLock.readLock().lock();
840        try {
841            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
842                clearPendingAndDispatch(durableTopicSubscription);
843            }
844        } finally {
845            dispatchLock.readLock().unlock();
846        }
847    }
848
849    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
850        synchronized (durableTopicSubscription.pendingLock) {
851            durableTopicSubscription.pending.clear();
852            try {
853                durableTopicSubscription.dispatchPending();
854            } catch (IOException exception) {
855                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
856                        durableTopicSubscription,
857                        destination,
858                        durableTopicSubscription.pending }, exception);
859            }
860        }
861    }
862
863    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
864        return durableSubscribers;
865    }
866}