001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021import java.util.Set; 022import org.apache.activemq.broker.Broker; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.broker.ProducerBrokerExchange; 025import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 026import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageAck; 030import org.apache.activemq.command.MessageDispatchNotification; 031import org.apache.activemq.command.ProducerInfo; 032import org.apache.activemq.store.MessageStore; 033import org.apache.activemq.usage.MemoryUsage; 034import org.apache.activemq.usage.Usage; 035 036/** 037 * 038 * 039 */ 040public class DestinationFilter implements Destination { 041 042 private final Destination next; 043 044 public DestinationFilter(Destination next) { 045 this.next = next; 046 } 047 048 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { 049 next.acknowledge(context, sub, ack, node); 050 } 051 052 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 053 next.addSubscription(context, sub); 054 } 055 056 public Message[] browse() { 057 return next.browse(); 058 } 059 060 public void dispose(ConnectionContext context) throws IOException { 061 next.dispose(context); 062 } 063 064 public boolean isDisposed() { 065 return next.isDisposed(); 066 } 067 068 public void gc() { 069 next.gc(); 070 } 071 072 public void markForGC(long timeStamp) { 073 next.markForGC(timeStamp); 074 } 075 076 public boolean canGC() { 077 return next.canGC(); 078 } 079 080 public long getInactiveTimoutBeforeGC() { 081 return next.getInactiveTimoutBeforeGC(); 082 } 083 084 public ActiveMQDestination getActiveMQDestination() { 085 return next.getActiveMQDestination(); 086 } 087 088 public DeadLetterStrategy getDeadLetterStrategy() { 089 return next.getDeadLetterStrategy(); 090 } 091 092 public DestinationStatistics getDestinationStatistics() { 093 return next.getDestinationStatistics(); 094 } 095 096 public String getName() { 097 return next.getName(); 098 } 099 100 public MemoryUsage getMemoryUsage() { 101 return next.getMemoryUsage(); 102 } 103 104 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 105 next.removeSubscription(context, sub, lastDeliveredSequenceId); 106 } 107 108 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { 109 next.send(context, messageSend); 110 } 111 112 public void start() throws Exception { 113 next.start(); 114 } 115 116 public void stop() throws Exception { 117 next.stop(); 118 } 119 120 public List<Subscription> getConsumers() { 121 return next.getConsumers(); 122 } 123 124 /** 125 * Sends a message to the given destination which may be a wildcard 126 * 127 * @param context broker context 128 * @param message message to send 129 * @param destination possibly wildcard destination to send the message to 130 * @throws Exception on error 131 */ 132 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { 133 Broker broker = context.getConnectionContext().getBroker(); 134 Set<Destination> destinations = broker.getDestinations(destination); 135 136 for (Destination dest : destinations) { 137 dest.send(context, message.copy()); 138 } 139 } 140 141 public MessageStore getMessageStore() { 142 return next.getMessageStore(); 143 } 144 145 public boolean isProducerFlowControl() { 146 return next.isProducerFlowControl(); 147 } 148 149 public void setProducerFlowControl(boolean value) { 150 next.setProducerFlowControl(value); 151 } 152 153 public boolean isAlwaysRetroactive() { 154 return next.isAlwaysRetroactive(); 155 } 156 157 public void setAlwaysRetroactive(boolean value) { 158 next.setAlwaysRetroactive(value); 159 } 160 161 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 162 next.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 163 } 164 165 public long getBlockedProducerWarningInterval() { 166 return next.getBlockedProducerWarningInterval(); 167 } 168 169 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 170 next.addProducer(context, info); 171 } 172 173 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 174 next.removeProducer(context, info); 175 } 176 177 public int getMaxAuditDepth() { 178 return next.getMaxAuditDepth(); 179 } 180 181 public int getMaxProducersToAudit() { 182 return next.getMaxProducersToAudit(); 183 } 184 185 public boolean isEnableAudit() { 186 return next.isEnableAudit(); 187 } 188 189 public void setEnableAudit(boolean enableAudit) { 190 next.setEnableAudit(enableAudit); 191 } 192 193 public void setMaxAuditDepth(int maxAuditDepth) { 194 next.setMaxAuditDepth(maxAuditDepth); 195 } 196 197 public void setMaxProducersToAudit(int maxProducersToAudit) { 198 next.setMaxProducersToAudit(maxProducersToAudit); 199 } 200 201 public boolean isActive() { 202 return next.isActive(); 203 } 204 205 public int getMaxPageSize() { 206 return next.getMaxPageSize(); 207 } 208 209 public void setMaxPageSize(int maxPageSize) { 210 next.setMaxPageSize(maxPageSize); 211 } 212 213 public boolean isUseCache() { 214 return next.isUseCache(); 215 } 216 217 public void setUseCache(boolean useCache) { 218 next.setUseCache(useCache); 219 } 220 221 public int getMinimumMessageSize() { 222 return next.getMinimumMessageSize(); 223 } 224 225 public void setMinimumMessageSize(int minimumMessageSize) { 226 next.setMinimumMessageSize(minimumMessageSize); 227 } 228 229 public void wakeup() { 230 next.wakeup(); 231 } 232 233 public boolean isLazyDispatch() { 234 return next.isLazyDispatch(); 235 } 236 237 public void setLazyDispatch(boolean value) { 238 next.setLazyDispatch(value); 239 } 240 241 public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) { 242 next.messageExpired(context, prefetchSubscription, node); 243 } 244 245 public boolean iterate() { 246 return next.iterate(); 247 } 248 249 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 250 next.fastProducer(context, producerInfo); 251 } 252 253 public void isFull(ConnectionContext context, Usage<?> usage) { 254 next.isFull(context, usage); 255 } 256 257 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 258 next.messageConsumed(context, messageReference); 259 } 260 261 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 262 next.messageDelivered(context, messageReference); 263 } 264 265 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 266 next.messageDiscarded(context, sub, messageReference); 267 } 268 269 public void slowConsumer(ConnectionContext context, Subscription subs) { 270 next.slowConsumer(context, subs); 271 } 272 273 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) { 274 next.messageExpired(context, subs, node); 275 } 276 277 public int getMaxBrowsePageSize() { 278 return next.getMaxBrowsePageSize(); 279 } 280 281 public void setMaxBrowsePageSize(int maxPageSize) { 282 next.setMaxBrowsePageSize(maxPageSize); 283 } 284 285 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 286 next.processDispatchNotification(messageDispatchNotification); 287 } 288 289 public int getCursorMemoryHighWaterMark() { 290 return next.getCursorMemoryHighWaterMark(); 291 } 292 293 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 294 next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); 295 } 296 297 public boolean isPrioritizedMessages() { 298 return next.isPrioritizedMessages(); 299 } 300 301 public SlowConsumerStrategy getSlowConsumerStrategy() { 302 return next.getSlowConsumerStrategy(); 303 } 304 305 public boolean isDoOptimzeMessageStorage() { 306 return next.isDoOptimzeMessageStorage(); 307 } 308 309 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 310 next.setDoOptimzeMessageStorage(doOptimzeMessageStorage); 311 } 312 313}