001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Set;
021import org.apache.activemq.advisory.AdvisorySupport;
022import org.apache.activemq.broker.BrokerService;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.policy.PolicyEntry;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQQueue;
027import org.apache.activemq.command.ActiveMQTempDestination;
028import org.apache.activemq.command.ActiveMQTopic;
029import org.apache.activemq.command.SubscriptionInfo;
030import org.apache.activemq.store.MessageStore;
031import org.apache.activemq.store.PersistenceAdapter;
032import org.apache.activemq.store.TopicMessageStore;
033import org.apache.activemq.thread.TaskRunnerFactory;
034
035/**
036 * Creates standard ActiveMQ implementations of
037 * {@link org.apache.activemq.broker.region.Destination}.
038 * 
039 * @author fateev@amazon.com
040 * 
041 */
042public class DestinationFactoryImpl extends DestinationFactory {
043
044    protected final TaskRunnerFactory taskRunnerFactory;
045    protected final PersistenceAdapter persistenceAdapter;
046    protected RegionBroker broker;
047    private final BrokerService brokerService;
048
049    public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
050        this.brokerService = brokerService;
051        this.taskRunnerFactory = taskRunnerFactory;
052        if (persistenceAdapter == null) {
053            throw new IllegalArgumentException("null persistenceAdapter");
054        }
055        this.persistenceAdapter = persistenceAdapter;
056    }
057
058    @Override
059    public void setRegionBroker(RegionBroker broker) {
060        if (broker == null) {
061            throw new IllegalArgumentException("null broker");
062        }
063        this.broker = broker;
064    }
065
066    @Override
067    public Set<ActiveMQDestination> getDestinations() {
068        return persistenceAdapter.getDestinations();
069    }
070
071    /**
072     * @return instance of {@link Queue} or {@link Topic}
073     */
074    @Override
075    public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
076        if (destination.isQueue()) {
077            if (destination.isTemporary()) {
078                final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
079                Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
080                queue.initialize();
081                return queue;
082            } else {
083                MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
084                Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
085                configureQueue(queue, destination);
086                queue.initialize();
087                return queue;
088            }
089        } else if (destination.isTemporary()) {
090            
091            Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
092            topic.initialize();
093            return topic;
094        } else {
095            TopicMessageStore store = null;
096            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
097                store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
098            }
099            Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
100            configureTopic(topic, destination);
101            topic.initialize();
102            return topic;
103        }
104    }
105
106    @Override
107    public void removeDestination(Destination dest) {
108        ActiveMQDestination destination = dest.getActiveMQDestination();
109        if (!destination.isTemporary()) {
110            if (destination.isQueue()) {
111                persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination);
112            }
113            else if (!AdvisorySupport.isAdvisoryTopic(destination)) {
114                persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination);
115            }
116        }
117    }
118
119    protected void configureQueue(Queue queue, ActiveMQDestination destination) {
120        if (broker == null) {
121            throw new IllegalStateException("broker property is not set");
122        }
123        if (broker.getDestinationPolicy() != null) {
124            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
125            if (entry != null) {
126                entry.configure(broker,queue);
127            }
128        }
129    }
130
131    protected void configureTopic(Topic topic, ActiveMQDestination destination) {
132        if (broker == null) {
133            throw new IllegalStateException("broker property is not set");
134        }
135        if (broker.getDestinationPolicy() != null) {
136            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
137            if (entry != null) {
138                entry.configure(broker,topic);
139            }
140        }
141    }
142
143    @Override
144    public long getLastMessageBrokerSequenceId() throws IOException {
145        return persistenceAdapter.getLastMessageBrokerSequenceId();
146    }
147
148    public PersistenceAdapter getPersistenceAdapter() {
149        return persistenceAdapter;
150    }
151
152    @Override
153    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
154        return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
155    }
156}