001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.util.Map;
021
022import org.apache.activemq.advisory.AdvisorySupport;
023import org.apache.activemq.broker.region.RegionBroker;
024import org.apache.activemq.broker.region.Subscription;
025import org.apache.activemq.broker.region.TopicRegion;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ConsumerId;
028import org.apache.activemq.command.ConsumerInfo;
029import org.apache.activemq.filter.DestinationFilter;
030import org.apache.activemq.transport.Transport;
031import org.apache.activemq.util.TypeConversionSupport;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Consolidates subscriptions
037 */
038public class DurableConduitBridge extends ConduitBridge {
039    private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
040
041    @Override
042    public String toString() {
043        return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
044    }
045    /**
046     * Constructor
047     *
048     * @param configuration
049     *
050     * @param localBroker
051     * @param remoteBroker
052     */
053    public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
054                                Transport remoteBroker) {
055        super(configuration, localBroker, remoteBroker);
056    }
057
058    /**
059     * Subscriptions for these destinations are always created
060     *
061     */
062    @Override
063    protected void setupStaticDestinations() {
064        super.setupStaticDestinations();
065        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
066        if (dests != null) {
067            for (ActiveMQDestination dest : dests) {
068                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
069                    try {
070                        //Filtering by non-empty subscriptions, see AMQ-5875
071                        if (dest.isTopic()) {
072                            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
073                            TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
074
075                            String candidateSubName = getSubscriberName(dest);
076                            for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
077                                String subName = subscription.getConsumerInfo().getSubscriptionName();
078                                if (subName != null && subName.equals(candidateSubName)) {
079                                    DemandSubscription sub = createDemandSubscription(dest, subName);
080                                    sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
081                                    sub.setStaticallyIncluded(true);
082                                    addSubscription(sub);
083                                    break;
084                                }
085                            }
086                        }
087                    } catch (IOException e) {
088                        LOG.error("Failed to add static destination {}", dest, e);
089                    }
090                    LOG.trace("Forwarding messages for durable destination: {}", dest);
091                }
092            }
093        }
094    }
095
096    @Override
097    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
098        boolean isForcedDurable = isForcedDurable(info);
099
100        if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
101            return null; // don't want this subscription added
102        }
103        //add our original id to ourselves
104        info.addNetworkConsumerId(info.getConsumerId());
105        ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
106
107        if(info.isDurable() || isForcedDurable) {
108            // set the subscriber name to something reproducible
109            info.setSubscriptionName(getSubscriberName(info.getDestination()));
110            // and override the consumerId with something unique so that it won't
111            // be removed if the durable subscriber (at the other end) goes away
112            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
113                               consumerIdGenerator.getNextSequenceId()));
114        }
115        info.setSelector(null);
116        DemandSubscription demandSubscription = doCreateDemandSubscription(info);
117        if (forcedDurableId != null) {
118            demandSubscription.addForcedDurableConsumer(forcedDurableId);
119            forcedDurableRemoteId.add(forcedDurableId);
120        }
121        return demandSubscription;
122    }
123
124
125    private boolean isForcedDurable(ConsumerInfo info) {
126        if (info.isDurable()) {
127            return false;
128        }
129
130        ActiveMQDestination destination = info.getDestination();
131        if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
132                destination.isQueue()) {
133            return false;
134        }
135
136        ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
137        if (matching != null) {
138            return isDestForcedDurable(matching);
139        }
140        matching = findMatchingDestination(staticallyIncludedDestinations, destination);
141        if (matching != null) {
142            return isDestForcedDurable(matching);
143        }
144        return false;
145    }
146
147    private boolean isDestForcedDurable(ActiveMQDestination destination) {
148        final Map<String, String> options = destination.getOptions();
149
150        boolean isForceDurable = false;
151        if (options != null) {
152            isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
153        }
154
155        return isForceDurable;
156    }
157
158    protected String getSubscriberName(ActiveMQDestination dest) {
159        String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
160        return subscriberName;
161    }
162
163    protected boolean doesConsumerExist(ActiveMQDestination dest) {
164        DestinationFilter filter = DestinationFilter.parseFilter(dest);
165        for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
166            if (filter.matches(ds.getLocalInfo().getDestination())) {
167                return true;
168            }
169        }
170        return false;
171    }
172}