001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.util.Map; 021 022import org.apache.activemq.advisory.AdvisorySupport; 023import org.apache.activemq.broker.region.RegionBroker; 024import org.apache.activemq.broker.region.Subscription; 025import org.apache.activemq.broker.region.TopicRegion; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConsumerId; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.filter.DestinationFilter; 030import org.apache.activemq.transport.Transport; 031import org.apache.activemq.util.TypeConversionSupport; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Consolidates subscriptions 037 */ 038public class DurableConduitBridge extends ConduitBridge { 039 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 040 041 @Override 042 public String toString() { 043 return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); 044 } 045 /** 046 * Constructor 047 * 048 * @param configuration 049 * 050 * @param localBroker 051 * @param remoteBroker 052 */ 053 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 054 Transport remoteBroker) { 055 super(configuration, localBroker, remoteBroker); 056 } 057 058 /** 059 * Subscriptions for these destinations are always created 060 * 061 */ 062 @Override 063 protected void setupStaticDestinations() { 064 super.setupStaticDestinations(); 065 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 066 if (dests != null) { 067 for (ActiveMQDestination dest : dests) { 068 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 069 try { 070 //Filtering by non-empty subscriptions, see AMQ-5875 071 if (dest.isTopic()) { 072 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 073 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 074 075 String candidateSubName = getSubscriberName(dest); 076 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 077 String subName = subscription.getConsumerInfo().getSubscriptionName(); 078 if (subName != null && subName.equals(candidateSubName)) { 079 DemandSubscription sub = createDemandSubscription(dest, subName); 080 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 081 sub.setStaticallyIncluded(true); 082 addSubscription(sub); 083 break; 084 } 085 } 086 } 087 } catch (IOException e) { 088 LOG.error("Failed to add static destination {}", dest, e); 089 } 090 LOG.trace("Forwarding messages for durable destination: {}", dest); 091 } 092 } 093 } 094 } 095 096 @Override 097 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 098 boolean isForcedDurable = isForcedDurable(info); 099 100 if (addToAlreadyInterestedConsumers(info, isForcedDurable)) { 101 return null; // don't want this subscription added 102 } 103 //add our original id to ourselves 104 info.addNetworkConsumerId(info.getConsumerId()); 105 ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null; 106 107 if(info.isDurable() || isForcedDurable) { 108 // set the subscriber name to something reproducible 109 info.setSubscriptionName(getSubscriberName(info.getDestination())); 110 // and override the consumerId with something unique so that it won't 111 // be removed if the durable subscriber (at the other end) goes away 112 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), 113 consumerIdGenerator.getNextSequenceId())); 114 } 115 info.setSelector(null); 116 DemandSubscription demandSubscription = doCreateDemandSubscription(info); 117 if (forcedDurableId != null) { 118 demandSubscription.addForcedDurableConsumer(forcedDurableId); 119 forcedDurableRemoteId.add(forcedDurableId); 120 } 121 return demandSubscription; 122 } 123 124 125 private boolean isForcedDurable(ConsumerInfo info) { 126 if (info.isDurable()) { 127 return false; 128 } 129 130 ActiveMQDestination destination = info.getDestination(); 131 if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() || 132 destination.isQueue()) { 133 return false; 134 } 135 136 ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination); 137 if (matching != null) { 138 return isDestForcedDurable(matching); 139 } 140 matching = findMatchingDestination(staticallyIncludedDestinations, destination); 141 if (matching != null) { 142 return isDestForcedDurable(matching); 143 } 144 return false; 145 } 146 147 private boolean isDestForcedDurable(ActiveMQDestination destination) { 148 final Map<String, String> options = destination.getOptions(); 149 150 boolean isForceDurable = false; 151 if (options != null) { 152 isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class); 153 } 154 155 return isForceDurable; 156 } 157 158 protected String getSubscriberName(ActiveMQDestination dest) { 159 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 160 return subscriberName; 161 } 162 163 protected boolean doesConsumerExist(ActiveMQDestination dest) { 164 DestinationFilter filter = DestinationFilter.parseFilter(dest); 165 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 166 if (filter.matches(ds.getLocalInfo().getDestination())) { 167 return true; 168 } 169 } 170 return false; 171 } 172}