001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023
024import javax.jms.InvalidSelectorException;
025import javax.jms.JMSException;
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ConsumerId;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.filter.BooleanExpression;
035import org.apache.activemq.filter.DestinationFilter;
036import org.apache.activemq.filter.LogicExpression;
037import org.apache.activemq.filter.MessageEvaluationContext;
038import org.apache.activemq.filter.NoLocalExpression;
039import org.apache.activemq.selector.SelectorParser;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043public abstract class AbstractSubscription implements Subscription {
044
045    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
046
047    protected Broker broker;
048    protected ConnectionContext context;
049    protected ConsumerInfo info;
050    protected final DestinationFilter destinationFilter;
051    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
052
053    private BooleanExpression selectorExpression;
054    private ObjectName objectName;
055    private int cursorMemoryHighWaterMark = 70;
056    private boolean slowConsumer;
057    private long lastAckTime;
058    private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
059
060    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
061        this.broker = broker;
062        this.context = context;
063        this.info = info;
064        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
065        this.selectorExpression = parseSelector(info);
066        this.lastAckTime = System.currentTimeMillis();
067    }
068
069    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
070        BooleanExpression rc = null;
071        if (info.getSelector() != null) {
072            rc = SelectorParser.parse(info.getSelector());
073        }
074        if (info.isNoLocal()) {
075            if (rc == null) {
076                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
077            } else {
078                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
079            }
080        }
081        if (info.getAdditionalPredicate() != null) {
082            if (rc == null) {
083                rc = info.getAdditionalPredicate();
084            } else {
085                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
086            }
087        }
088        return rc;
089    }
090
091    @Override
092    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
093        this.lastAckTime = System.currentTimeMillis();
094        subscriptionStatistics.getConsumedCount().increment();
095    }
096
097    @Override
098    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
099        ConsumerId targetConsumerId = node.getTargetConsumerId();
100        if (targetConsumerId != null) {
101            if (!targetConsumerId.equals(info.getConsumerId())) {
102                return false;
103            }
104        }
105        try {
106            return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
107        } catch (JMSException e) {
108            LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
109            return false;
110        }
111    }
112
113    @Override
114    public boolean isWildcard() {
115        return destinationFilter.isWildcard();
116    }
117
118    @Override
119    public boolean matches(ActiveMQDestination destination) {
120        return destinationFilter.matches(destination);
121    }
122
123    @Override
124    public void add(ConnectionContext context, Destination destination) throws Exception {
125        destinations.add(destination);
126    }
127
128    @Override
129    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
130        destinations.remove(destination);
131        return Collections.EMPTY_LIST;
132    }
133
134    @Override
135    public ConsumerInfo getConsumerInfo() {
136        return info;
137    }
138
139    @Override
140    public void gc() {
141    }
142
143    @Override
144    public ConnectionContext getContext() {
145        return context;
146    }
147
148    public ConsumerInfo getInfo() {
149        return info;
150    }
151
152    public BooleanExpression getSelectorExpression() {
153        return selectorExpression;
154    }
155
156    @Override
157    public String getSelector() {
158        return info.getSelector();
159    }
160
161    @Override
162    public void setSelector(String selector) throws InvalidSelectorException {
163        ConsumerInfo copy = info.copy();
164        copy.setSelector(selector);
165        BooleanExpression newSelector = parseSelector(copy);
166        // its valid so lets actually update it now
167        info.setSelector(selector);
168        this.selectorExpression = newSelector;
169    }
170
171    @Override
172    public ObjectName getObjectName() {
173        return objectName;
174    }
175
176    @Override
177    public void setObjectName(ObjectName objectName) {
178        this.objectName = objectName;
179    }
180
181    @Override
182    public int getPrefetchSize() {
183        return info.getPrefetchSize();
184    }
185
186    public void setPrefetchSize(int newSize) {
187        info.setPrefetchSize(newSize);
188    }
189
190    @Override
191    public boolean isRecoveryRequired() {
192        return true;
193    }
194
195    @Override
196    public boolean isSlowConsumer() {
197        return slowConsumer;
198    }
199
200    public void setSlowConsumer(boolean val) {
201        slowConsumer = val;
202    }
203
204    @Override
205    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
206        boolean result = false;
207        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
208        try {
209            Destination regionDestination = (Destination) message.getRegionDestination();
210            msgContext.setDestination(regionDestination.getActiveMQDestination());
211            msgContext.setMessageReference(message);
212            result = matches(message, msgContext);
213            if (result) {
214                doAddRecoveredMessage(message);
215            }
216        } finally {
217            msgContext.clear();
218        }
219        return result;
220    }
221
222    @Override
223    public ActiveMQDestination getActiveMQDestination() {
224        return info != null ? info.getDestination() : null;
225    }
226
227    @Override
228    public boolean isBrowser() {
229        return info != null && info.isBrowser();
230    }
231
232    @Override
233    public long getInFlightMessageSize() {
234        return subscriptionStatistics.getInflightMessageSize().getTotalSize();
235    }
236
237    @Override
238    public int getInFlightUsage() {
239        int prefetchSize = info.getPrefetchSize();
240        if (prefetchSize > 0) {
241            return (getInFlightSize() * 100) / prefetchSize;
242        }
243        return Integer.MAX_VALUE;
244    }
245
246    /**
247     * Add a destination
248     * @param destination
249     */
250    public void addDestination(Destination destination) {
251    }
252
253    /**
254     * Remove a destination
255     * @param destination
256     */
257    public void removeDestination(Destination destination) {
258    }
259
260    @Override
261    public int getCursorMemoryHighWaterMark(){
262        return this.cursorMemoryHighWaterMark;
263    }
264
265    @Override
266    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
267        this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
268    }
269
270    @Override
271    public int countBeforeFull() {
272        return info.getPrefetchSize() - getDispatchedQueueSize();
273    }
274
275    @Override
276    public void unmatched(MessageReference node) throws IOException {
277        // only durable topic subs have something to do here
278    }
279
280    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
281        add(message);
282    }
283
284    @Override
285    public long getTimeOfLastMessageAck() {
286        return lastAckTime;
287    }
288
289    public void setTimeOfLastMessageAck(long value) {
290        this.lastAckTime = value;
291    }
292
293    @Override
294    public long getConsumedCount(){
295        return subscriptionStatistics.getConsumedCount().getCount();
296    }
297
298    @Override
299    public void incrementConsumedCount(){
300        subscriptionStatistics.getConsumedCount().increment();
301    }
302
303    @Override
304    public void resetConsumedCount(){
305        subscriptionStatistics.getConsumedCount().reset();
306    }
307
308    @Override
309    public SubscriptionStatistics getSubscriptionStatistics() {
310        return subscriptionStatistics;
311    }
312}