001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.Set; 021import org.apache.activemq.advisory.AdvisorySupport; 022import org.apache.activemq.broker.BrokerService; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.broker.region.policy.PolicyEntry; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ActiveMQQueue; 027import org.apache.activemq.command.ActiveMQTempDestination; 028import org.apache.activemq.command.ActiveMQTopic; 029import org.apache.activemq.command.SubscriptionInfo; 030import org.apache.activemq.store.MessageStore; 031import org.apache.activemq.store.PersistenceAdapter; 032import org.apache.activemq.store.TopicMessageStore; 033import org.apache.activemq.thread.TaskRunnerFactory; 034 035/** 036 * Creates standard ActiveMQ implementations of 037 * {@link org.apache.activemq.broker.region.Destination}. 038 * 039 * @author fateev@amazon.com 040 * 041 */ 042public class DestinationFactoryImpl extends DestinationFactory { 043 044 protected final TaskRunnerFactory taskRunnerFactory; 045 protected final PersistenceAdapter persistenceAdapter; 046 protected RegionBroker broker; 047 private final BrokerService brokerService; 048 049 public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { 050 this.brokerService = brokerService; 051 this.taskRunnerFactory = taskRunnerFactory; 052 if (persistenceAdapter == null) { 053 throw new IllegalArgumentException("null persistenceAdapter"); 054 } 055 this.persistenceAdapter = persistenceAdapter; 056 } 057 058 @Override 059 public void setRegionBroker(RegionBroker broker) { 060 if (broker == null) { 061 throw new IllegalArgumentException("null broker"); 062 } 063 this.broker = broker; 064 } 065 066 @Override 067 public Set<ActiveMQDestination> getDestinations() { 068 return persistenceAdapter.getDestinations(); 069 } 070 071 /** 072 * @return instance of {@link Queue} or {@link Topic} 073 */ 074 @Override 075 public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { 076 if (destination.isQueue()) { 077 if (destination.isTemporary()) { 078 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; 079 Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); 080 configureQueue(queue, destination); 081 queue.initialize(); 082 return queue; 083 } else { 084 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination); 085 Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory); 086 configureQueue(queue, destination); 087 queue.initialize(); 088 return queue; 089 } 090 } else if (destination.isTemporary()) { 091 092 Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory); 093 configureTopic(topic, destination); 094 topic.initialize(); 095 return topic; 096 } else { 097 TopicMessageStore store = null; 098 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 099 store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination); 100 } 101 Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory); 102 configureTopic(topic, destination); 103 topic.initialize(); 104 return topic; 105 } 106 } 107 108 @Override 109 public void removeDestination(Destination dest) { 110 ActiveMQDestination destination = dest.getActiveMQDestination(); 111 if (!destination.isTemporary()) { 112 if (destination.isQueue()) { 113 persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination); 114 } 115 else if (!AdvisorySupport.isAdvisoryTopic(destination)) { 116 persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination); 117 } 118 } 119 } 120 121 protected void configureQueue(Queue queue, ActiveMQDestination destination) { 122 if (broker == null) { 123 throw new IllegalStateException("broker property is not set"); 124 } 125 if (broker.getDestinationPolicy() != null) { 126 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 127 if (entry != null) { 128 entry.configure(broker,queue); 129 } 130 } 131 } 132 133 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 134 if (broker == null) { 135 throw new IllegalStateException("broker property is not set"); 136 } 137 if (broker.getDestinationPolicy() != null) { 138 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 139 if (entry != null) { 140 entry.configure(broker,topic); 141 } 142 } 143 } 144 145 @Override 146 public long getLastMessageBrokerSequenceId() throws IOException { 147 return persistenceAdapter.getLastMessageBrokerSequenceId(); 148 } 149 150 public PersistenceAdapter getPersistenceAdapter() { 151 return persistenceAdapter; 152 } 153 154 @Override 155 public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { 156 return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); 157 } 158}