001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.Iterator;
020import java.util.List;
021import org.apache.activemq.broker.Broker;
022import org.apache.activemq.broker.ConnectionContext;
023import org.apache.activemq.broker.region.MessageReference;
024import org.apache.activemq.broker.region.SubscriptionRecovery;
025import org.apache.activemq.broker.region.Topic;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.memory.list.DestinationBasedMessageList;
029import org.apache.activemq.memory.list.MessageList;
030import org.apache.activemq.memory.list.SimpleMessageList;
031
032/**
033 * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
034 * amount of memory available in RAM for message history which is evicted in
035 * time order.
036 * 
037 * @org.apache.xbean.XBean
038 * 
039 */
040public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
041
042    private MessageList buffer;
043    private int maximumSize =  64 * 1024;
044    private boolean useSharedBuffer = true;
045
046    public FixedSizedSubscriptionRecoveryPolicy() {
047        
048    }
049    
050    public SubscriptionRecoveryPolicy copy() {
051        FixedSizedSubscriptionRecoveryPolicy rc = new FixedSizedSubscriptionRecoveryPolicy();
052        rc.setMaximumSize(maximumSize);
053        rc.setUseSharedBuffer(useSharedBuffer);
054        return rc;
055    }
056
057    public boolean add(ConnectionContext context, MessageReference message) throws Exception {
058        buffer.add(message);
059        return true;
060    }
061
062    public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
063        // Re-dispatch the messages from the buffer.
064        List copy = buffer.getMessages(sub.getActiveMQDestination());
065        if (!copy.isEmpty()) {
066            for (Iterator iter = copy.iterator(); iter.hasNext();) {
067                MessageReference node = (MessageReference)iter.next();
068                sub.addRecoveredMessage(context, node);
069            }
070        }
071    }
072
073    public void start() throws Exception {
074        buffer = createMessageList();
075    }
076
077    public void stop() throws Exception {
078        buffer.clear();
079    }
080
081    // Properties
082    // -------------------------------------------------------------------------
083    public MessageList getBuffer() {
084        return buffer;
085    }
086
087    public void setBuffer(MessageList buffer) {
088        this.buffer = buffer;
089    }
090
091    public int getMaximumSize() {
092        return maximumSize;
093    }
094
095    /**
096     * Sets the maximum amount of RAM in bytes that this buffer can hold in RAM
097     */
098    public void setMaximumSize(int maximumSize) {
099        this.maximumSize = maximumSize;
100    }
101
102    public boolean isUseSharedBuffer() {
103        return useSharedBuffer;
104    }
105
106    public void setUseSharedBuffer(boolean useSharedBuffer) {
107        this.useSharedBuffer = useSharedBuffer;
108    }
109
110    public Message[] browse(ActiveMQDestination destination) throws Exception {
111        return buffer.browse(destination);
112    }
113    
114    public void setBroker(Broker broker) {        
115    }
116
117    // Implementation methods
118
119    // -------------------------------------------------------------------------
120    protected MessageList createMessageList() {
121        if (useSharedBuffer) {
122            return new SimpleMessageList(maximumSize);
123        } else {
124            return new DestinationBasedMessageList(maximumSize);
125        }
126    }
127
128}