001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.util.List;
020import org.apache.activemq.broker.region.Subscription;
021import org.apache.activemq.command.BrokerId;
022import org.apache.activemq.command.ConsumerInfo;
023import org.apache.activemq.command.Message;
024import org.apache.activemq.command.NetworkBridgeFilter;
025import org.apache.activemq.filter.MessageEvaluationContext;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * implement conditional behaviour for queue consumers,
031 * allows replaying back to origin if no consumers are present on the local broker
032 * after a configurable delay, irrespective of the networkTTL
033 * Also allows rate limiting of messages through the network, useful for static includes
034 *
035 *  @org.apache.xbean.XBean
036 */
037
038public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
039    boolean replayWhenNoConsumers = false;
040    int replayDelay = 0;
041    int rateLimit = 0;
042    int rateDuration = 1000;
043
044    @Override
045    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
046        ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
047        filter.setNetworkBrokerId(remoteBrokerPath[0]);
048        filter.setNetworkTTL(networkTimeToLive);
049        filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
050        filter.setRateLimit(getRateLimit());
051        filter.setRateDuration(getRateDuration());
052        filter.setReplayDelay(getReplayDelay());
053        return filter;
054    }
055
056    public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
057        this.replayWhenNoConsumers = replayWhenNoConsumers;
058    }
059
060    public boolean isReplayWhenNoConsumers() {
061        return replayWhenNoConsumers;
062    }
063
064    public void setRateLimit(int rateLimit) {
065        this.rateLimit = rateLimit;
066    }
067
068    public int getRateLimit() {
069        return rateLimit;
070    }
071
072    public int getRateDuration() {
073        return rateDuration;
074    }
075
076    public void setRateDuration(int rateDuration) {
077        this.rateDuration = rateDuration;
078    }
079
080    public int getReplayDelay() {
081        return replayDelay;
082    }
083
084    public void setReplayDelay(int replayDelay) {
085        this.replayDelay = replayDelay;
086    }
087
088    private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
089        final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
090        private int rateLimit;
091        private int rateDuration = 1000;
092        private boolean allowReplayWhenNoConsumers = true;
093        private int replayDelay = 1000;
094
095        private int matchCount;
096        private long rateDurationEnd;
097
098        @Override
099        protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
100            boolean match = true;
101            if (mec.getDestination().isQueue()) {
102                if (contains(message.getBrokerPath(), networkBrokerId)) {
103                    // potential replay back to origin
104                    match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
105
106                    if (match && LOG.isTraceEnabled()) {
107                        LOG.trace("Replaying  [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer");
108                    }
109                }
110
111                if (match && rateLimitExceeded()) {
112                    if (LOG.isTraceEnabled()) {
113                        LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit  + "/" + rateDuration);
114                    }
115                    match = false;
116                }
117
118            } else {
119                // use existing logic for topics
120                match = super.matchesForwardingFilter(message, mec);
121            }
122
123            return match;
124        }
125
126        private boolean hasNotJustArrived(Message message) {
127            return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
128        }
129
130        private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
131            List<Subscription> consumers = mec.getMessageReference().getRegionDestination().getConsumers();
132            for (Subscription sub : consumers) {
133                if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
134                    if (LOG.isTraceEnabled()) {
135                        LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo());
136                    }
137                    return false;
138                }
139            }
140            return true;
141        }
142
143        private boolean rateLimitExceeded() {
144            if (rateLimit == 0) {
145                return false;
146            }
147
148            if (rateDurationEnd < System.currentTimeMillis()) {
149                rateDurationEnd = System.currentTimeMillis() + rateDuration;
150                matchCount = 0;
151            }
152            return ++matchCount > rateLimit;
153        }
154
155        public void setReplayDelay(int replayDelay) {
156            this.replayDelay = replayDelay;
157        }
158
159        public void setRateLimit(int rateLimit) {
160            this.rateLimit = rateLimit;
161        }
162
163        public void setRateDuration(int rateDuration) {
164            this.rateDuration = rateDuration;
165        }
166
167        public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
168            this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
169        }
170    }
171}