001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.File; 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.net.UnknownHostException; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.LinkedBlockingQueue; 034import java.util.concurrent.RejectedExecutionException; 035import java.util.concurrent.RejectedExecutionHandler; 036import java.util.concurrent.SynchronousQueue; 037import java.util.concurrent.ThreadFactory; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041 042import javax.annotation.PostConstruct; 043import javax.annotation.PreDestroy; 044import javax.management.MalformedObjectNameException; 045import javax.management.ObjectName; 046 047import org.apache.activemq.ActiveMQConnectionMetaData; 048import org.apache.activemq.ConfigurationException; 049import org.apache.activemq.Service; 050import org.apache.activemq.advisory.AdvisoryBroker; 051import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 052import org.apache.activemq.broker.ft.MasterConnector; 053import org.apache.activemq.broker.jmx.AnnotatedMBean; 054import org.apache.activemq.broker.jmx.BrokerView; 055import org.apache.activemq.broker.jmx.ConnectorView; 056import org.apache.activemq.broker.jmx.ConnectorViewMBean; 057import org.apache.activemq.broker.jmx.FTConnectorView; 058import org.apache.activemq.broker.jmx.JmsConnectorView; 059import org.apache.activemq.broker.jmx.JobSchedulerView; 060import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 061import org.apache.activemq.broker.jmx.ManagedRegionBroker; 062import org.apache.activemq.broker.jmx.ManagementContext; 063import org.apache.activemq.broker.jmx.NetworkConnectorView; 064import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 065import org.apache.activemq.broker.jmx.ProxyConnectorView; 066import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 067import org.apache.activemq.broker.region.Destination; 068import org.apache.activemq.broker.region.DestinationFactory; 069import org.apache.activemq.broker.region.DestinationFactoryImpl; 070import org.apache.activemq.broker.region.DestinationInterceptor; 071import org.apache.activemq.broker.region.RegionBroker; 072import org.apache.activemq.broker.region.policy.PolicyMap; 073import org.apache.activemq.broker.region.virtual.MirroredQueue; 074import org.apache.activemq.broker.region.virtual.VirtualDestination; 075import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 076import org.apache.activemq.broker.region.virtual.VirtualTopic; 077import org.apache.activemq.broker.scheduler.SchedulerBroker; 078import org.apache.activemq.command.ActiveMQDestination; 079import org.apache.activemq.command.ActiveMQQueue; 080import org.apache.activemq.command.BrokerId; 081import org.apache.activemq.filter.DestinationFilter; 082import org.apache.activemq.network.ConnectionFilter; 083import org.apache.activemq.network.DiscoveryNetworkConnector; 084import org.apache.activemq.network.NetworkConnector; 085import org.apache.activemq.network.jms.JmsConnector; 086import org.apache.activemq.proxy.ProxyConnector; 087import org.apache.activemq.security.MessageAuthorizationPolicy; 088import org.apache.activemq.selector.SelectorParser; 089import org.apache.activemq.store.PersistenceAdapter; 090import org.apache.activemq.store.PersistenceAdapterFactory; 091import org.apache.activemq.store.amq.AMQPersistenceAdapter; 092import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; 093import org.apache.activemq.store.kahadb.plist.PListStore; 094import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 095import org.apache.activemq.thread.Scheduler; 096import org.apache.activemq.thread.TaskRunnerFactory; 097import org.apache.activemq.transport.TransportFactory; 098import org.apache.activemq.transport.TransportServer; 099import org.apache.activemq.transport.vm.VMTransportFactory; 100import org.apache.activemq.usage.SystemUsage; 101import org.apache.activemq.util.BrokerSupport; 102import org.apache.activemq.util.DefaultIOExceptionHandler; 103import org.apache.activemq.util.IOExceptionHandler; 104import org.apache.activemq.util.IOExceptionSupport; 105import org.apache.activemq.util.IOHelper; 106import org.apache.activemq.util.InetAddressUtil; 107import org.apache.activemq.util.JMXSupport; 108import org.apache.activemq.util.ServiceStopper; 109import org.apache.activemq.util.URISupport; 110import org.slf4j.Logger; 111import org.slf4j.LoggerFactory; 112import org.slf4j.MDC; 113 114/** 115 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a 116 * number of transport connectors, network connectors and a bunch of properties 117 * which can be used to configure the broker as its lazily created. 118 * 119 * 120 * @org.apache.xbean.XBean 121 */ 122public class BrokerService implements Service { 123 protected CountDownLatch slaveStartSignal = new CountDownLatch(1); 124 public static final String DEFAULT_PORT = "61616"; 125 public static final String LOCAL_HOST_NAME; 126 public static final String DEFAULT_BROKER_NAME = "localhost"; 127 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 128 private static final long serialVersionUID = 7353129142305630237L; 129 private boolean useJmx = true; 130 private boolean enableStatistics = true; 131 private boolean persistent = true; 132 private boolean populateJMSXUserID; 133 private boolean useAuthenticatedPrincipalForJMSXUserID; 134 private boolean populateUserNameInMBeans; 135 136 private boolean useShutdownHook = true; 137 private boolean useLoggingForShutdownErrors; 138 private boolean shutdownOnMasterFailure; 139 private boolean shutdownOnSlaveFailure; 140 private boolean waitForSlave; 141 private long waitForSlaveTimeout = 600000L; 142 private boolean passiveSlave; 143 private String brokerName = DEFAULT_BROKER_NAME; 144 private File dataDirectoryFile; 145 private File tmpDataDirectory; 146 private Broker broker; 147 private BrokerView adminView; 148 private ManagementContext managementContext; 149 private ObjectName brokerObjectName; 150 private TaskRunnerFactory taskRunnerFactory; 151 private TaskRunnerFactory persistenceTaskRunnerFactory; 152 private SystemUsage systemUsage; 153 private SystemUsage producerSystemUsage; 154 private SystemUsage consumerSystemUsaage; 155 private PersistenceAdapter persistenceAdapter; 156 private PersistenceAdapterFactory persistenceFactory; 157 protected DestinationFactory destinationFactory; 158 private MessageAuthorizationPolicy messageAuthorizationPolicy; 159 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 160 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 161 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 162 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 163 private final List<Service> services = new ArrayList<Service>(); 164 private MasterConnector masterConnector; 165 private String masterConnectorURI; 166 private transient Thread shutdownHook; 167 private String[] transportConnectorURIs; 168 private String[] networkConnectorURIs; 169 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 170 // to other jms messaging 171 // systems 172 private boolean deleteAllMessagesOnStartup; 173 private boolean advisorySupport = true; 174 private URI vmConnectorURI; 175 private String defaultSocketURIString; 176 private PolicyMap destinationPolicy; 177 private final AtomicBoolean started = new AtomicBoolean(false); 178 private final AtomicBoolean stopped = new AtomicBoolean(false); 179 private BrokerPlugin[] plugins; 180 private boolean keepDurableSubsActive = true; 181 private boolean useVirtualTopics = true; 182 private boolean useMirroredQueues = false; 183 private boolean useTempMirroredQueues = true; 184 private BrokerId brokerId; 185 private DestinationInterceptor[] destinationInterceptors; 186 private ActiveMQDestination[] destinations; 187 private PListStore tempDataStore; 188 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 189 private boolean useLocalHostBrokerName; 190 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 191 private final CountDownLatch startedLatch = new CountDownLatch(1); 192 private boolean supportFailOver; 193 private Broker regionBroker; 194 private int producerSystemUsagePortion = 60; 195 private int consumerSystemUsagePortion = 40; 196 private boolean splitSystemUsageForProducersConsumers; 197 private boolean monitorConnectionSplits = false; 198 private int taskRunnerPriority = Thread.NORM_PRIORITY; 199 private boolean dedicatedTaskRunner; 200 private boolean cacheTempDestinations = false;// useful for failover 201 private int timeBeforePurgeTempDestinations = 5000; 202 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 203 private boolean systemExitOnShutdown; 204 private int systemExitOnShutdownExitCode; 205 private SslContext sslContext; 206 private boolean forceStart = false; 207 private IOExceptionHandler ioExceptionHandler; 208 private boolean schedulerSupport = false; 209 private File schedulerDirectoryFile; 210 private Scheduler scheduler; 211 private ThreadPoolExecutor executor; 212 private boolean slave = true; 213 private int schedulePeriodForDestinationPurge= 0; 214 private int maxPurgedDestinationsPerSweep = 0; 215 private BrokerContext brokerContext; 216 private boolean networkConnectorStartAsync = false; 217 private boolean allowTempAutoCreationOnSend; 218 219 private int offlineDurableSubscriberTimeout = -1; 220 private int offlineDurableSubscriberTaskSchedule = 300000; 221 private DestinationFilter virtualConsumerDestinationFilter; 222 223 static { 224 String localHostName = "localhost"; 225 try { 226 localHostName = InetAddressUtil.getLocalHostName(); 227 } catch (UnknownHostException e) { 228 LOG.error("Failed to resolve localhost"); 229 } 230 LOCAL_HOST_NAME = localHostName; 231 } 232 233 @Override 234 public String toString() { 235 return "BrokerService[" + getBrokerName() + "]"; 236 } 237 238 /** 239 * Adds a new transport connector for the given bind address 240 * 241 * @return the newly created and added transport connector 242 * @throws Exception 243 */ 244 public TransportConnector addConnector(String bindAddress) throws Exception { 245 return addConnector(new URI(bindAddress)); 246 } 247 248 /** 249 * Adds a new transport connector for the given bind address 250 * 251 * @return the newly created and added transport connector 252 * @throws Exception 253 */ 254 public TransportConnector addConnector(URI bindAddress) throws Exception { 255 return addConnector(createTransportConnector(bindAddress)); 256 } 257 258 /** 259 * Adds a new transport connector for the given TransportServer transport 260 * 261 * @return the newly created and added transport connector 262 * @throws Exception 263 */ 264 public TransportConnector addConnector(TransportServer transport) throws Exception { 265 return addConnector(new TransportConnector(transport)); 266 } 267 268 /** 269 * Adds a new transport connector 270 * 271 * @return the transport connector 272 * @throws Exception 273 */ 274 public TransportConnector addConnector(TransportConnector connector) throws Exception { 275 transportConnectors.add(connector); 276 return connector; 277 } 278 279 /** 280 * Stops and removes a transport connector from the broker. 281 * 282 * @param connector 283 * @return true if the connector has been previously added to the broker 284 * @throws Exception 285 */ 286 public boolean removeConnector(TransportConnector connector) throws Exception { 287 boolean rc = transportConnectors.remove(connector); 288 if (rc) { 289 unregisterConnectorMBean(connector); 290 } 291 return rc; 292 } 293 294 /** 295 * Adds a new network connector using the given discovery address 296 * 297 * @return the newly created and added network connector 298 * @throws Exception 299 */ 300 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 301 return addNetworkConnector(new URI(discoveryAddress)); 302 } 303 304 /** 305 * Adds a new proxy connector using the given bind address 306 * 307 * @return the newly created and added network connector 308 * @throws Exception 309 */ 310 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 311 return addProxyConnector(new URI(bindAddress)); 312 } 313 314 /** 315 * Adds a new network connector using the given discovery address 316 * 317 * @return the newly created and added network connector 318 * @throws Exception 319 */ 320 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 321 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 322 return addNetworkConnector(connector); 323 } 324 325 /** 326 * Adds a new proxy connector using the given bind address 327 * 328 * @return the newly created and added network connector 329 * @throws Exception 330 */ 331 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 332 ProxyConnector connector = new ProxyConnector(); 333 connector.setBind(bindAddress); 334 connector.setRemote(new URI("fanout:multicast://default")); 335 return addProxyConnector(connector); 336 } 337 338 /** 339 * Adds a new network connector to connect this broker to a federated 340 * network 341 */ 342 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 343 connector.setBrokerService(this); 344 URI uri = getVmConnectorURI(); 345 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 346 map.put("network", "true"); 347 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 348 connector.setLocalUri(uri); 349 // Set a connection filter so that the connector does not establish loop 350 // back connections. 351 connector.setConnectionFilter(new ConnectionFilter() { 352 public boolean connectTo(URI location) { 353 List<TransportConnector> transportConnectors = getTransportConnectors(); 354 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 355 try { 356 TransportConnector tc = iter.next(); 357 if (location.equals(tc.getConnectUri())) { 358 return false; 359 } 360 } catch (Throwable e) { 361 } 362 } 363 return true; 364 } 365 }); 366 networkConnectors.add(connector); 367 if (isUseJmx()) { 368 registerNetworkConnectorMBean(connector); 369 } 370 return connector; 371 } 372 373 /** 374 * Removes the given network connector without stopping it. The caller 375 * should call {@link NetworkConnector#stop()} to close the connector 376 */ 377 public boolean removeNetworkConnector(NetworkConnector connector) { 378 boolean answer = networkConnectors.remove(connector); 379 if (answer) { 380 unregisterNetworkConnectorMBean(connector); 381 } 382 return answer; 383 } 384 385 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 386 URI uri = getVmConnectorURI(); 387 connector.setLocalUri(uri); 388 proxyConnectors.add(connector); 389 if (isUseJmx()) { 390 registerProxyConnectorMBean(connector); 391 } 392 return connector; 393 } 394 395 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 396 connector.setBrokerService(this); 397 jmsConnectors.add(connector); 398 if (isUseJmx()) { 399 registerJmsConnectorMBean(connector); 400 } 401 return connector; 402 } 403 404 public JmsConnector removeJmsConnector(JmsConnector connector) { 405 if (jmsConnectors.remove(connector)) { 406 return connector; 407 } 408 return null; 409 } 410 411 /** 412 * @return Returns the masterConnectorURI. 413 */ 414 public String getMasterConnectorURI() { 415 return masterConnectorURI; 416 } 417 418 /** 419 * @param masterConnectorURI 420 * The masterConnectorURI to set. 421 */ 422 public void setMasterConnectorURI(String masterConnectorURI) { 423 this.masterConnectorURI = masterConnectorURI; 424 } 425 426 /** 427 * @return true if this Broker is a slave to a Master 428 */ 429 public boolean isSlave() { 430 return (masterConnector != null && masterConnector.isSlave()) || 431 (masterConnector != null && masterConnector.isStoppedBeforeStart()) || 432 (masterConnector == null && slave); 433 } 434 435 public void masterFailed() { 436 if (shutdownOnMasterFailure) { 437 LOG.error("The Master has failed ... shutting down"); 438 try { 439 stop(); 440 } catch (Exception e) { 441 LOG.error("Failed to stop for master failure", e); 442 } 443 } else { 444 LOG.warn("Master Failed - starting all connectors"); 445 try { 446 startAllConnectors(); 447 broker.nowMasterBroker(); 448 } catch (Exception e) { 449 LOG.error("Failed to startAllConnectors", e); 450 } 451 } 452 } 453 454 public boolean isStarted() { 455 return started.get(); 456 } 457 458 /** 459 * Forces a start of the broker. 460 * By default a BrokerService instance that was 461 * previously stopped using BrokerService.stop() cannot be restarted 462 * using BrokerService.start(). 463 * This method enforces a restart. 464 * It is not recommended to force a restart of the broker and will not work 465 * for most but some very trivial broker configurations. 466 * For restarting a broker instance we recommend to first call stop() on 467 * the old instance and then recreate a new BrokerService instance. 468 * 469 * @param force - if true enforces a restart. 470 * @throws Exception 471 */ 472 public void start(boolean force) throws Exception { 473 forceStart = force; 474 stopped.set(false); 475 started.set(false); 476 start(); 477 } 478 479 // Service interface 480 // ------------------------------------------------------------------------- 481 482 protected boolean shouldAutostart() { 483 return true; 484 } 485 486 /** 487 * 488 * @throws Exception 489 * @org. apache.xbean.InitMethod 490 */ 491 @PostConstruct 492 public void autoStart() throws Exception { 493 if(shouldAutostart()) { 494 start(); 495 } 496 } 497 498 public void start() throws Exception { 499 if (stopped.get() || !started.compareAndSet(false, true)) { 500 // lets just ignore redundant start() calls 501 // as its way too easy to not be completely sure if start() has been 502 // called or not with the gazillion of different configuration 503 // mechanisms 504 // throw new IllegalStateException("Allready started."); 505 return; 506 } 507 508 MDC.put("activemq.broker", brokerName); 509 510 try { 511 if (systemExitOnShutdown && useShutdownHook) { 512 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 513 } 514 processHelperProperties(); 515 if (isUseJmx()) { 516 startManagementContext(); 517 } 518 519 getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); 520 getPersistenceAdapter().setBrokerName(getBrokerName()); 521 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter()); 522 if (deleteAllMessagesOnStartup) { 523 deleteAllMessages(); 524 } 525 getPersistenceAdapter().start(); 526 slave = false; 527 startDestinations(); 528 addShutdownHook(); 529 getBroker().start(); 530 if (isUseJmx()) { 531 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 532 // try to restart management context 533 // typical for slaves that use the same ports as master 534 managementContext.stop(); 535 startManagementContext(); 536 } 537 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 538 managedBroker.setContextBroker(broker); 539 adminView.setBroker(managedBroker); 540 } 541 BrokerRegistry.getInstance().bind(getBrokerName(), this); 542 // see if there is a MasterBroker service and if so, configure 543 // it and start it. 544 for (Service service : services) { 545 if (service instanceof MasterConnector) { 546 configureService(service); 547 service.start(); 548 } 549 } 550 if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) { 551 startAllConnectors(); 552 } 553 if (!stopped.get()) { 554 if (isUseJmx() && masterConnector != null) { 555 registerFTConnectorMBean(masterConnector); 556 } 557 } 558 if (brokerId == null) { 559 brokerId = broker.getBrokerId(); 560 } 561 if (ioExceptionHandler == null) { 562 setIoExceptionHandler(new DefaultIOExceptionHandler()); 563 } 564 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started"); 565 getBroker().brokerServiceStarted(); 566 checkSystemUsageLimits(); 567 startedLatch.countDown(); 568 } catch (Exception e) { 569 LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e); 570 try { 571 if (!stopped.get()) { 572 stop(); 573 } 574 } catch (Exception ex) { 575 LOG.warn("Failed to stop broker after failure in start ", ex); 576 } 577 throw e; 578 } finally { 579 MDC.remove("activemq.broker"); 580 } 581 } 582 583 /** 584 * 585 * @throws Exception 586 * @org.apache .xbean.DestroyMethod 587 */ 588 @PreDestroy 589 public void stop() throws Exception { 590 if (!started.get()) { 591 return; 592 } 593 594 MDC.put("activemq.broker", brokerName); 595 596 if (systemExitOnShutdown) { 597 new Thread() { 598 @Override 599 public void run() { 600 System.exit(systemExitOnShutdownExitCode); 601 } 602 }.start(); 603 } 604 605 LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down"); 606 removeShutdownHook(); 607 if (this.scheduler != null) { 608 this.scheduler.stop(); 609 this.scheduler = null; 610 } 611 ServiceStopper stopper = new ServiceStopper(); 612 if (services != null) { 613 for (Service service : services) { 614 stopper.stop(service); 615 } 616 } 617 stopAllConnectors(stopper); 618 // remove any VMTransports connected 619 // this has to be done after services are stopped, 620 // to avoid timimg issue with discovery (spinning up a new instance) 621 BrokerRegistry.getInstance().unbind(getBrokerName()); 622 VMTransportFactory.stopped(getBrokerName()); 623 if (broker != null) { 624 stopper.stop(broker); 625 broker = null; 626 } 627 628 if (tempDataStore != null) { 629 tempDataStore.stop(); 630 tempDataStore = null; 631 } 632 try { 633 stopper.stop(persistenceAdapter); 634 persistenceAdapter = null; 635 slave = true; 636 if (isUseJmx()) { 637 stopper.stop(getManagementContext()); 638 managementContext = null; 639 } 640 // Clear SelectorParser cache to free memory 641 SelectorParser.clearCache(); 642 } finally { 643 stopped.set(true); 644 stoppedLatch.countDown(); 645 } 646 if (masterConnectorURI == null) { 647 // master start has not finished yet 648 if (slaveStartSignal.getCount() == 1) { 649 started.set(false); 650 slaveStartSignal.countDown(); 651 } 652 } else { 653 for (Service service : services) { 654 if (service instanceof MasterConnector) { 655 MasterConnector mConnector = (MasterConnector) service; 656 if (!mConnector.isSlave()) { 657 // means should be slave but not connected to master yet 658 started.set(false); 659 mConnector.stopBeforeConnected(); 660 } 661 } 662 } 663 } 664 if (this.taskRunnerFactory != null) { 665 this.taskRunnerFactory.shutdown(); 666 this.taskRunnerFactory = null; 667 } 668 if (this.executor != null) { 669 this.executor.shutdownNow(); 670 this.executor = null; 671 } 672 673 this.destinationInterceptors = null; 674 this.destinationFactory = null; 675 676 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); 677 synchronized (shutdownHooks) { 678 for (Runnable hook : shutdownHooks) { 679 try { 680 hook.run(); 681 } catch (Throwable e) { 682 stopper.onException(hook, e); 683 } 684 } 685 } 686 687 MDC.remove("activemq.broker"); 688 689 stopper.throwFirstException(); 690 } 691 692 public boolean checkQueueSize(String queueName) { 693 long count = 0; 694 long queueSize = 0; 695 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 696 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 697 if (entry.getKey().isQueue()) { 698 if (entry.getValue().getName().matches(queueName)) { 699 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 700 count += queueSize; 701 if (queueSize > 0) { 702 LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:" 703 + queueSize); 704 } 705 } 706 } 707 } 708 return count == 0; 709 } 710 711 /** 712 * This method (both connectorName and queueName are using regex to match) 713 * 1. stop the connector (supposed the user input the connector which the 714 * clients connect to) 2. to check whether there is any pending message on 715 * the queues defined by queueName 3. supposedly, after stop the connector, 716 * client should failover to other broker and pending messages should be 717 * forwarded. if no pending messages, the method finally call stop to stop 718 * the broker. 719 * 720 * @param connectorName 721 * @param queueName 722 * @param timeout 723 * @param pollInterval 724 * @throws Exception 725 */ 726 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) 727 throws Exception { 728 if (isUseJmx()) { 729 if (connectorName == null || queueName == null || timeout <= 0) { 730 throw new Exception( 731 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 732 } 733 if (pollInterval <= 0) { 734 pollInterval = 30; 735 } 736 LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:" 737 + timeout + " pollInterval:" + pollInterval); 738 TransportConnector connector; 739 for (int i = 0; i < transportConnectors.size(); i++) { 740 connector = transportConnectors.get(i); 741 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 742 connector.stop(); 743 } 744 } 745 long start = System.currentTimeMillis(); 746 while (System.currentTimeMillis() - start < timeout * 1000) { 747 // check quesize until it gets zero 748 if (checkQueueSize(queueName)) { 749 stop(); 750 break; 751 } else { 752 Thread.sleep(pollInterval * 1000); 753 } 754 } 755 if (stopped.get()) { 756 LOG.info("Successfully stop the broker."); 757 } else { 758 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 759 } 760 } 761 } 762 763 /** 764 * A helper method to block the caller thread until the broker has been 765 * stopped 766 */ 767 public void waitUntilStopped() { 768 while (isStarted() && !stopped.get()) { 769 try { 770 stoppedLatch.await(); 771 } catch (InterruptedException e) { 772 // ignore 773 } 774 } 775 } 776 777 /** 778 * A helper method to block the caller thread until the broker has fully started 779 * @return boolean true if wait succeeded false if broker was not started or was stopped 780 */ 781 public boolean waitUntilStarted() { 782 boolean waitSucceeded = false; 783 while (isStarted() && !stopped.get() && !waitSucceeded) { 784 try { 785 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 786 } catch (InterruptedException ignore) { 787 } 788 } 789 return waitSucceeded; 790 } 791 792 // Properties 793 // ------------------------------------------------------------------------- 794 /** 795 * Returns the message broker 796 */ 797 public Broker getBroker() throws Exception { 798 if (broker == null) { 799 LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" 800 + getBrokerName() + ") is starting"); 801 LOG.info("For help or more information please see: http://activemq.apache.org/"); 802 broker = createBroker(); 803 } 804 return broker; 805 } 806 807 /** 808 * Returns the administration view of the broker; used to create and destroy 809 * resources such as queues and topics. Note this method returns null if JMX 810 * is disabled. 811 */ 812 public BrokerView getAdminView() throws Exception { 813 if (adminView == null) { 814 // force lazy creation 815 getBroker(); 816 } 817 return adminView; 818 } 819 820 public void setAdminView(BrokerView adminView) { 821 this.adminView = adminView; 822 } 823 824 public String getBrokerName() { 825 return brokerName; 826 } 827 828 /** 829 * Sets the name of this broker; which must be unique in the network 830 * 831 * @param brokerName 832 */ 833 public void setBrokerName(String brokerName) { 834 if (brokerName == null) { 835 throw new NullPointerException("The broker name cannot be null"); 836 } 837 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_"); 838 if (!str.equals(brokerName)) { 839 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str); 840 } 841 this.brokerName = str.trim(); 842 } 843 844 public PersistenceAdapterFactory getPersistenceFactory() { 845 return persistenceFactory; 846 } 847 848 public File getDataDirectoryFile() { 849 if (dataDirectoryFile == null) { 850 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 851 } 852 return dataDirectoryFile; 853 } 854 855 public File getBrokerDataDirectory() { 856 String brokerDir = getBrokerName(); 857 return new File(getDataDirectoryFile(), brokerDir); 858 } 859 860 /** 861 * Sets the directory in which the data files will be stored by default for 862 * the JDBC and Journal persistence adaptors. 863 * 864 * @param dataDirectory 865 * the directory to store data files 866 */ 867 public void setDataDirectory(String dataDirectory) { 868 setDataDirectoryFile(new File(dataDirectory)); 869 } 870 871 /** 872 * Sets the directory in which the data files will be stored by default for 873 * the JDBC and Journal persistence adaptors. 874 * 875 * @param dataDirectoryFile 876 * the directory to store data files 877 */ 878 public void setDataDirectoryFile(File dataDirectoryFile) { 879 this.dataDirectoryFile = dataDirectoryFile; 880 } 881 882 /** 883 * @return the tmpDataDirectory 884 */ 885 public File getTmpDataDirectory() { 886 if (tmpDataDirectory == null) { 887 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 888 } 889 return tmpDataDirectory; 890 } 891 892 /** 893 * @param tmpDataDirectory 894 * the tmpDataDirectory to set 895 */ 896 public void setTmpDataDirectory(File tmpDataDirectory) { 897 this.tmpDataDirectory = tmpDataDirectory; 898 } 899 900 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 901 this.persistenceFactory = persistenceFactory; 902 } 903 904 public void setDestinationFactory(DestinationFactory destinationFactory) { 905 this.destinationFactory = destinationFactory; 906 } 907 908 public boolean isPersistent() { 909 return persistent; 910 } 911 912 /** 913 * Sets whether or not persistence is enabled or disabled. 914 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 915 */ 916 public void setPersistent(boolean persistent) { 917 this.persistent = persistent; 918 } 919 920 public boolean isPopulateJMSXUserID() { 921 return populateJMSXUserID; 922 } 923 924 /** 925 * Sets whether or not the broker should populate the JMSXUserID header. 926 */ 927 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 928 this.populateJMSXUserID = populateJMSXUserID; 929 } 930 931 public SystemUsage getSystemUsage() { 932 try { 933 if (systemUsage == null) { 934 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); 935 systemUsage.setExecutor(getExecutor()); 936 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 937 // 64 938 // Meg 939 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1000 * 50); // 50 940 // Gb 941 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1000 * 100); // 100 942 // GB 943 addService(this.systemUsage); 944 } 945 return systemUsage; 946 } catch (IOException e) { 947 LOG.error("Cannot create SystemUsage", e); 948 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage()); 949 } 950 } 951 952 public void setSystemUsage(SystemUsage memoryManager) { 953 if (this.systemUsage != null) { 954 removeService(this.systemUsage); 955 } 956 this.systemUsage = memoryManager; 957 if (this.systemUsage.getExecutor()==null) { 958 this.systemUsage.setExecutor(getExecutor()); 959 } 960 addService(this.systemUsage); 961 } 962 963 /** 964 * @return the consumerUsageManager 965 * @throws IOException 966 */ 967 public SystemUsage getConsumerSystemUsage() throws IOException { 968 if (this.consumerSystemUsaage == null) { 969 if (splitSystemUsageForProducersConsumers) { 970 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 971 float portion = consumerSystemUsagePortion / 100f; 972 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 973 addService(this.consumerSystemUsaage); 974 } else { 975 consumerSystemUsaage = getSystemUsage(); 976 } 977 } 978 return this.consumerSystemUsaage; 979 } 980 981 /** 982 * @param consumerSystemUsaage 983 * the storeSystemUsage to set 984 */ 985 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 986 if (this.consumerSystemUsaage != null) { 987 removeService(this.consumerSystemUsaage); 988 } 989 this.consumerSystemUsaage = consumerSystemUsaage; 990 addService(this.consumerSystemUsaage); 991 } 992 993 /** 994 * @return the producerUsageManager 995 * @throws IOException 996 */ 997 public SystemUsage getProducerSystemUsage() throws IOException { 998 if (producerSystemUsage == null) { 999 if (splitSystemUsageForProducersConsumers) { 1000 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1001 float portion = producerSystemUsagePortion / 100f; 1002 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1003 addService(producerSystemUsage); 1004 } else { 1005 producerSystemUsage = getSystemUsage(); 1006 } 1007 } 1008 return producerSystemUsage; 1009 } 1010 1011 /** 1012 * @param producerUsageManager 1013 * the producerUsageManager to set 1014 */ 1015 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1016 if (this.producerSystemUsage != null) { 1017 removeService(this.producerSystemUsage); 1018 } 1019 this.producerSystemUsage = producerUsageManager; 1020 addService(this.producerSystemUsage); 1021 } 1022 1023 public PersistenceAdapter getPersistenceAdapter() throws IOException { 1024 if (persistenceAdapter == null) { 1025 persistenceAdapter = createPersistenceAdapter(); 1026 configureService(persistenceAdapter); 1027 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1028 } 1029 return persistenceAdapter; 1030 } 1031 1032 /** 1033 * Sets the persistence adaptor implementation to use for this broker 1034 * 1035 * @throws IOException 1036 */ 1037 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1038 this.persistenceAdapter = persistenceAdapter; 1039 configureService(this.persistenceAdapter); 1040 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1041 } 1042 1043 public TaskRunnerFactory getTaskRunnerFactory() { 1044 if (this.taskRunnerFactory == null) { 1045 this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1046 isDedicatedTaskRunner()); 1047 } 1048 return this.taskRunnerFactory; 1049 } 1050 1051 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1052 this.taskRunnerFactory = taskRunnerFactory; 1053 } 1054 1055 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1056 if (taskRunnerFactory == null) { 1057 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1058 true, 1000, isDedicatedTaskRunner()); 1059 } 1060 return persistenceTaskRunnerFactory; 1061 } 1062 1063 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1064 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1065 } 1066 1067 public boolean isUseJmx() { 1068 return useJmx; 1069 } 1070 1071 public boolean isEnableStatistics() { 1072 return enableStatistics; 1073 } 1074 1075 /** 1076 * Sets whether or not the Broker's services enable statistics or not. 1077 */ 1078 public void setEnableStatistics(boolean enableStatistics) { 1079 this.enableStatistics = enableStatistics; 1080 } 1081 1082 /** 1083 * Sets whether or not the Broker's services should be exposed into JMX or 1084 * not. 1085 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1086 */ 1087 public void setUseJmx(boolean useJmx) { 1088 this.useJmx = useJmx; 1089 } 1090 1091 public ObjectName getBrokerObjectName() throws IOException { 1092 if (brokerObjectName == null) { 1093 brokerObjectName = createBrokerObjectName(); 1094 } 1095 return brokerObjectName; 1096 } 1097 1098 /** 1099 * Sets the JMX ObjectName for this broker 1100 */ 1101 public void setBrokerObjectName(ObjectName brokerObjectName) { 1102 this.brokerObjectName = brokerObjectName; 1103 } 1104 1105 public ManagementContext getManagementContext() { 1106 if (managementContext == null) { 1107 managementContext = new ManagementContext(); 1108 } 1109 return managementContext; 1110 } 1111 1112 public void setManagementContext(ManagementContext managementContext) { 1113 this.managementContext = managementContext; 1114 } 1115 1116 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1117 for (NetworkConnector connector : networkConnectors) { 1118 if (connector.getName().equals(connectorName)) { 1119 return connector; 1120 } 1121 } 1122 return null; 1123 } 1124 1125 public String[] getNetworkConnectorURIs() { 1126 return networkConnectorURIs; 1127 } 1128 1129 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1130 this.networkConnectorURIs = networkConnectorURIs; 1131 } 1132 1133 public TransportConnector getConnectorByName(String connectorName) { 1134 for (TransportConnector connector : transportConnectors) { 1135 if (connector.getName().equals(connectorName)) { 1136 return connector; 1137 } 1138 } 1139 return null; 1140 } 1141 1142 public Map<String, String> getTransportConnectorURIsAsMap() { 1143 Map<String, String> answer = new HashMap<String, String>(); 1144 for (TransportConnector connector : transportConnectors) { 1145 try { 1146 URI uri = connector.getConnectUri(); 1147 if (uri != null) { 1148 String scheme = uri.getScheme(); 1149 if (scheme != null) { 1150 answer.put(scheme.toLowerCase(), uri.toString()); 1151 } 1152 } 1153 } catch (Exception e) { 1154 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1155 } 1156 } 1157 return answer; 1158 } 1159 1160 public String[] getTransportConnectorURIs() { 1161 return transportConnectorURIs; 1162 } 1163 1164 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1165 this.transportConnectorURIs = transportConnectorURIs; 1166 } 1167 1168 /** 1169 * @return Returns the jmsBridgeConnectors. 1170 */ 1171 public JmsConnector[] getJmsBridgeConnectors() { 1172 return jmsBridgeConnectors; 1173 } 1174 1175 /** 1176 * @param jmsConnectors 1177 * The jmsBridgeConnectors to set. 1178 */ 1179 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1180 this.jmsBridgeConnectors = jmsConnectors; 1181 } 1182 1183 public Service[] getServices() { 1184 return services.toArray(new Service[0]); 1185 } 1186 1187 /** 1188 * Sets the services associated with this broker such as a 1189 * {@link MasterConnector} 1190 */ 1191 public void setServices(Service[] services) { 1192 this.services.clear(); 1193 if (services != null) { 1194 for (int i = 0; i < services.length; i++) { 1195 this.services.add(services[i]); 1196 } 1197 } 1198 } 1199 1200 /** 1201 * Adds a new service so that it will be started as part of the broker 1202 * lifecycle 1203 */ 1204 public void addService(Service service) { 1205 services.add(service); 1206 } 1207 1208 public void removeService(Service service) { 1209 services.remove(service); 1210 } 1211 1212 public boolean isUseLoggingForShutdownErrors() { 1213 return useLoggingForShutdownErrors; 1214 } 1215 1216 /** 1217 * Sets whether or not we should use commons-logging when reporting errors 1218 * when shutting down the broker 1219 */ 1220 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1221 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1222 } 1223 1224 public boolean isUseShutdownHook() { 1225 return useShutdownHook; 1226 } 1227 1228 /** 1229 * Sets whether or not we should use a shutdown handler to close down the 1230 * broker cleanly if the JVM is terminated. It is recommended you leave this 1231 * enabled. 1232 */ 1233 public void setUseShutdownHook(boolean useShutdownHook) { 1234 this.useShutdownHook = useShutdownHook; 1235 } 1236 1237 public boolean isAdvisorySupport() { 1238 return advisorySupport; 1239 } 1240 1241 /** 1242 * Allows the support of advisory messages to be disabled for performance 1243 * reasons. 1244 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1245 */ 1246 public void setAdvisorySupport(boolean advisorySupport) { 1247 this.advisorySupport = advisorySupport; 1248 } 1249 1250 public List<TransportConnector> getTransportConnectors() { 1251 return new ArrayList<TransportConnector>(transportConnectors); 1252 } 1253 1254 /** 1255 * Sets the transport connectors which this broker will listen on for new 1256 * clients 1257 * 1258 * @org.apache.xbean.Property 1259 * nestedType="org.apache.activemq.broker.TransportConnector" 1260 */ 1261 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1262 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 1263 TransportConnector connector = iter.next(); 1264 addConnector(connector); 1265 } 1266 } 1267 1268 public TransportConnector getTransportConnectorByName(String name){ 1269 for (TransportConnector transportConnector:transportConnectors){ 1270 if (name.equals(transportConnector.getName())){ 1271 return transportConnector; 1272 } 1273 } 1274 return null; 1275 } 1276 1277 public TransportConnector getTransportConnectorByScheme(String scheme){ 1278 for (TransportConnector transportConnector:transportConnectors){ 1279 if (scheme.equals(transportConnector.getUri().getScheme())){ 1280 return transportConnector; 1281 } 1282 } 1283 return null; 1284 } 1285 1286 public List<NetworkConnector> getNetworkConnectors() { 1287 return new ArrayList<NetworkConnector>(networkConnectors); 1288 } 1289 1290 public List<ProxyConnector> getProxyConnectors() { 1291 return new ArrayList<ProxyConnector>(proxyConnectors); 1292 } 1293 1294 /** 1295 * Sets the network connectors which this broker will use to connect to 1296 * other brokers in a federated network 1297 * 1298 * @org.apache.xbean.Property 1299 * nestedType="org.apache.activemq.network.NetworkConnector" 1300 */ 1301 public void setNetworkConnectors(List networkConnectors) throws Exception { 1302 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { 1303 NetworkConnector connector = (NetworkConnector) iter.next(); 1304 addNetworkConnector(connector); 1305 } 1306 } 1307 1308 /** 1309 * Sets the network connectors which this broker will use to connect to 1310 * other brokers in a federated network 1311 */ 1312 public void setProxyConnectors(List proxyConnectors) throws Exception { 1313 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { 1314 ProxyConnector connector = (ProxyConnector) iter.next(); 1315 addProxyConnector(connector); 1316 } 1317 } 1318 1319 public PolicyMap getDestinationPolicy() { 1320 return destinationPolicy; 1321 } 1322 1323 /** 1324 * Sets the destination specific policies available either for exact 1325 * destinations or for wildcard areas of destinations. 1326 */ 1327 public void setDestinationPolicy(PolicyMap policyMap) { 1328 this.destinationPolicy = policyMap; 1329 } 1330 1331 public BrokerPlugin[] getPlugins() { 1332 return plugins; 1333 } 1334 1335 /** 1336 * Sets a number of broker plugins to install such as for security 1337 * authentication or authorization 1338 */ 1339 public void setPlugins(BrokerPlugin[] plugins) { 1340 this.plugins = plugins; 1341 } 1342 1343 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1344 return messageAuthorizationPolicy; 1345 } 1346 1347 /** 1348 * Sets the policy used to decide if the current connection is authorized to 1349 * consume a given message 1350 */ 1351 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1352 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1353 } 1354 1355 /** 1356 * Delete all messages from the persistent store 1357 * 1358 * @throws IOException 1359 */ 1360 public void deleteAllMessages() throws IOException { 1361 getPersistenceAdapter().deleteAllMessages(); 1362 } 1363 1364 public boolean isDeleteAllMessagesOnStartup() { 1365 return deleteAllMessagesOnStartup; 1366 } 1367 1368 /** 1369 * Sets whether or not all messages are deleted on startup - mostly only 1370 * useful for testing. 1371 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1372 */ 1373 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1374 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1375 } 1376 1377 public URI getVmConnectorURI() { 1378 if (vmConnectorURI == null) { 1379 try { 1380 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); 1381 } catch (URISyntaxException e) { 1382 LOG.error("Badly formed URI from " + getBrokerName(), e); 1383 } 1384 } 1385 return vmConnectorURI; 1386 } 1387 1388 public void setVmConnectorURI(URI vmConnectorURI) { 1389 this.vmConnectorURI = vmConnectorURI; 1390 } 1391 1392 public String getDefaultSocketURIString() { 1393 1394 if (started.get()) { 1395 if (this.defaultSocketURIString == null) { 1396 for (TransportConnector tc:this.transportConnectors) { 1397 String result = null; 1398 try { 1399 result = tc.getPublishableConnectString(); 1400 } catch (Exception e) { 1401 LOG.warn("Failed to get the ConnectURI for "+tc,e); 1402 } 1403 if (result != null) { 1404 // find first publishable uri 1405 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1406 this.defaultSocketURIString = result; 1407 break; 1408 } else { 1409 // or use the first defined 1410 if (this.defaultSocketURIString == null) { 1411 this.defaultSocketURIString = result; 1412 } 1413 } 1414 } 1415 } 1416 1417 } 1418 return this.defaultSocketURIString; 1419 } 1420 return null; 1421 } 1422 1423 /** 1424 * @return Returns the shutdownOnMasterFailure. 1425 */ 1426 public boolean isShutdownOnMasterFailure() { 1427 return shutdownOnMasterFailure; 1428 } 1429 1430 /** 1431 * @param shutdownOnMasterFailure 1432 * The shutdownOnMasterFailure to set. 1433 */ 1434 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1435 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1436 } 1437 1438 public boolean isKeepDurableSubsActive() { 1439 return keepDurableSubsActive; 1440 } 1441 1442 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1443 this.keepDurableSubsActive = keepDurableSubsActive; 1444 } 1445 1446 public boolean isUseVirtualTopics() { 1447 return useVirtualTopics; 1448 } 1449 1450 /** 1451 * Sets whether or not <a 1452 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1453 * Topics</a> should be supported by default if they have not been 1454 * explicitly configured. 1455 */ 1456 public void setUseVirtualTopics(boolean useVirtualTopics) { 1457 this.useVirtualTopics = useVirtualTopics; 1458 } 1459 1460 public DestinationInterceptor[] getDestinationInterceptors() { 1461 return destinationInterceptors; 1462 } 1463 1464 public boolean isUseMirroredQueues() { 1465 return useMirroredQueues; 1466 } 1467 1468 /** 1469 * Sets whether or not <a 1470 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1471 * Queues</a> should be supported by default if they have not been 1472 * explicitly configured. 1473 */ 1474 public void setUseMirroredQueues(boolean useMirroredQueues) { 1475 this.useMirroredQueues = useMirroredQueues; 1476 } 1477 1478 /** 1479 * Sets the destination interceptors to use 1480 */ 1481 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1482 this.destinationInterceptors = destinationInterceptors; 1483 } 1484 1485 public ActiveMQDestination[] getDestinations() { 1486 return destinations; 1487 } 1488 1489 /** 1490 * Sets the destinations which should be loaded/created on startup 1491 */ 1492 public void setDestinations(ActiveMQDestination[] destinations) { 1493 this.destinations = destinations; 1494 } 1495 1496 /** 1497 * @return the tempDataStore 1498 */ 1499 public synchronized PListStore getTempDataStore() { 1500 if (tempDataStore == null) { 1501 if (!isPersistent()) { 1502 return null; 1503 } 1504 boolean result = true; 1505 boolean empty = true; 1506 try { 1507 File directory = getTmpDataDirectory(); 1508 if (directory.exists() && directory.isDirectory()) { 1509 File[] files = directory.listFiles(); 1510 if (files != null && files.length > 0) { 1511 empty = false; 1512 for (int i = 0; i < files.length; i++) { 1513 File file = files[i]; 1514 if (!file.isDirectory()) { 1515 result &= file.delete(); 1516 } 1517 } 1518 } 1519 } 1520 if (!empty) { 1521 String str = result ? "Successfully deleted" : "Failed to delete"; 1522 LOG.info(str + " temporary storage"); 1523 } 1524 this.tempDataStore = new PListStore(); 1525 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1526 configureService(tempDataStore); 1527 this.tempDataStore.start(); 1528 } catch (Exception e) { 1529 throw new RuntimeException(e); 1530 } 1531 } 1532 return tempDataStore; 1533 } 1534 1535 /** 1536 * @param tempDataStore 1537 * the tempDataStore to set 1538 */ 1539 public void setTempDataStore(PListStore tempDataStore) { 1540 this.tempDataStore = tempDataStore; 1541 configureService(tempDataStore); 1542 try { 1543 tempDataStore.start(); 1544 } catch (Exception e) { 1545 RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e); 1546 LOG.error(exception.getLocalizedMessage(), e); 1547 throw exception; 1548 } 1549 } 1550 1551 public int getPersistenceThreadPriority() { 1552 return persistenceThreadPriority; 1553 } 1554 1555 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1556 this.persistenceThreadPriority = persistenceThreadPriority; 1557 } 1558 1559 /** 1560 * @return the useLocalHostBrokerName 1561 */ 1562 public boolean isUseLocalHostBrokerName() { 1563 return this.useLocalHostBrokerName; 1564 } 1565 1566 /** 1567 * @param useLocalHostBrokerName 1568 * the useLocalHostBrokerName to set 1569 */ 1570 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1571 this.useLocalHostBrokerName = useLocalHostBrokerName; 1572 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1573 brokerName = LOCAL_HOST_NAME; 1574 } 1575 } 1576 1577 /** 1578 * @return the supportFailOver 1579 */ 1580 public boolean isSupportFailOver() { 1581 return this.supportFailOver; 1582 } 1583 1584 /** 1585 * @param supportFailOver 1586 * the supportFailOver to set 1587 */ 1588 public void setSupportFailOver(boolean supportFailOver) { 1589 this.supportFailOver = supportFailOver; 1590 } 1591 1592 /** 1593 * Looks up and lazily creates if necessary the destination for the given 1594 * JMS name 1595 */ 1596 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1597 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1598 } 1599 1600 public void removeDestination(ActiveMQDestination destination) throws Exception { 1601 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1602 } 1603 1604 public int getProducerSystemUsagePortion() { 1605 return producerSystemUsagePortion; 1606 } 1607 1608 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1609 this.producerSystemUsagePortion = producerSystemUsagePortion; 1610 } 1611 1612 public int getConsumerSystemUsagePortion() { 1613 return consumerSystemUsagePortion; 1614 } 1615 1616 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1617 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1618 } 1619 1620 public boolean isSplitSystemUsageForProducersConsumers() { 1621 return splitSystemUsageForProducersConsumers; 1622 } 1623 1624 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1625 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1626 } 1627 1628 public boolean isMonitorConnectionSplits() { 1629 return monitorConnectionSplits; 1630 } 1631 1632 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1633 this.monitorConnectionSplits = monitorConnectionSplits; 1634 } 1635 1636 public int getTaskRunnerPriority() { 1637 return taskRunnerPriority; 1638 } 1639 1640 public void setTaskRunnerPriority(int taskRunnerPriority) { 1641 this.taskRunnerPriority = taskRunnerPriority; 1642 } 1643 1644 public boolean isDedicatedTaskRunner() { 1645 return dedicatedTaskRunner; 1646 } 1647 1648 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1649 this.dedicatedTaskRunner = dedicatedTaskRunner; 1650 } 1651 1652 public boolean isCacheTempDestinations() { 1653 return cacheTempDestinations; 1654 } 1655 1656 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1657 this.cacheTempDestinations = cacheTempDestinations; 1658 } 1659 1660 public int getTimeBeforePurgeTempDestinations() { 1661 return timeBeforePurgeTempDestinations; 1662 } 1663 1664 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1665 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1666 } 1667 1668 public boolean isUseTempMirroredQueues() { 1669 return useTempMirroredQueues; 1670 } 1671 1672 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1673 this.useTempMirroredQueues = useTempMirroredQueues; 1674 } 1675 1676 // 1677 // Implementation methods 1678 // ------------------------------------------------------------------------- 1679 /** 1680 * Handles any lazy-creation helper properties which are added to make 1681 * things easier to configure inside environments such as Spring 1682 * 1683 * @throws Exception 1684 */ 1685 protected void processHelperProperties() throws Exception { 1686 boolean masterServiceExists = false; 1687 if (transportConnectorURIs != null) { 1688 for (int i = 0; i < transportConnectorURIs.length; i++) { 1689 String uri = transportConnectorURIs[i]; 1690 addConnector(uri); 1691 } 1692 } 1693 if (networkConnectorURIs != null) { 1694 for (int i = 0; i < networkConnectorURIs.length; i++) { 1695 String uri = networkConnectorURIs[i]; 1696 addNetworkConnector(uri); 1697 } 1698 } 1699 if (jmsBridgeConnectors != null) { 1700 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1701 addJmsConnector(jmsBridgeConnectors[i]); 1702 } 1703 } 1704 for (Service service : services) { 1705 if (service instanceof MasterConnector) { 1706 masterServiceExists = true; 1707 break; 1708 } 1709 } 1710 if (masterConnectorURI != null) { 1711 if (masterServiceExists) { 1712 throw new IllegalStateException( 1713 "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property"); 1714 } else { 1715 addService(new MasterConnector(masterConnectorURI)); 1716 } 1717 } 1718 } 1719 1720 protected void checkSystemUsageLimits() throws IOException { 1721 SystemUsage usage = getSystemUsage(); 1722 long memLimit = usage.getMemoryUsage().getLimit(); 1723 long jvmLimit = Runtime.getRuntime().maxMemory(); 1724 1725 if (memLimit > jvmLimit) { 1726 LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) + 1727 " mb) is more than the maximum available for the JVM: " + 1728 jvmLimit / (1024 * 1024) + " mb"); 1729 } 1730 1731 if (getPersistenceAdapter() != null) { 1732 PersistenceAdapter adapter = getPersistenceAdapter(); 1733 File dir = adapter.getDirectory(); 1734 1735 if (dir != null) { 1736 String dirPath = dir.getAbsolutePath(); 1737 if (!dir.isAbsolute()) { 1738 dir = new File(dirPath); 1739 } 1740 1741 while (dir != null && dir.isDirectory() == false) { 1742 dir = dir.getParentFile(); 1743 } 1744 long storeLimit = usage.getStoreUsage().getLimit(); 1745 long dirFreeSpace = dir.getUsableSpace(); 1746 if (storeLimit > dirFreeSpace) { 1747 LOG.warn("Store limit is " + storeLimit / (1024 * 1024) + 1748 " mb, whilst the data directory: " + dir.getAbsolutePath() + 1749 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space"); 1750 } 1751 } 1752 1753 long maxJournalFileSize = 0; 1754 long storeLimit = usage.getStoreUsage().getLimit(); 1755 1756 if (adapter instanceof KahaDBPersistenceAdapter) { 1757 KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter; 1758 maxJournalFileSize = kahaDB.getJournalMaxFileLength(); 1759 } else if (adapter instanceof AMQPersistenceAdapter) { 1760 AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter; 1761 maxJournalFileSize = amqAdapter.getMaxFileLength(); 1762 } 1763 1764 if (storeLimit < maxJournalFileSize) { 1765 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 1766 " mb, whilst the max journal file size for the store is: " + 1767 maxJournalFileSize / (1024 * 1024) + " mb, " + 1768 "the store will not accept any data when used."); 1769 } 1770 } 1771 1772 File tmpDir = getTmpDataDirectory(); 1773 if (tmpDir != null) { 1774 1775 String tmpDirPath = tmpDir.getAbsolutePath(); 1776 if (!tmpDir.isAbsolute()) { 1777 tmpDir = new File(tmpDirPath); 1778 } 1779 1780 long storeLimit = usage.getTempUsage().getLimit(); 1781 while (tmpDir != null && tmpDir.isDirectory() == false) { 1782 tmpDir = tmpDir.getParentFile(); 1783 } 1784 long dirFreeSpace = tmpDir.getUsableSpace(); 1785 if (storeLimit > dirFreeSpace) { 1786 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 1787 " mb, whilst the temporary data directory: " + tmpDirPath + 1788 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space"); 1789 } 1790 1791 long maxJournalFileSize; 1792 1793 if (usage.getTempUsage().getStore() != null) { 1794 maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength(); 1795 } else { 1796 maxJournalFileSize = org.apache.kahadb.journal.Journal.DEFAULT_MAX_FILE_LENGTH; 1797 } 1798 1799 if (storeLimit < maxJournalFileSize) { 1800 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 1801 " mb, whilst the max journal file size for the temporary store is: " + 1802 maxJournalFileSize / (1024 * 1024) + " mb, " + 1803 "the temp store will not accept any data when used."); 1804 } 1805 } 1806 } 1807 1808 public void stopAllConnectors(ServiceStopper stopper) { 1809 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 1810 NetworkConnector connector = iter.next(); 1811 unregisterNetworkConnectorMBean(connector); 1812 stopper.stop(connector); 1813 } 1814 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 1815 ProxyConnector connector = iter.next(); 1816 stopper.stop(connector); 1817 } 1818 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 1819 JmsConnector connector = iter.next(); 1820 stopper.stop(connector); 1821 } 1822 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 1823 TransportConnector connector = iter.next(); 1824 stopper.stop(connector); 1825 } 1826 } 1827 1828 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 1829 try { 1830 ObjectName objectName = createConnectorObjectName(connector); 1831 connector = connector.asManagedConnector(getManagementContext(), objectName); 1832 ConnectorViewMBean view = new ConnectorView(connector); 1833 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1834 return connector; 1835 } catch (Throwable e) { 1836 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); 1837 } 1838 } 1839 1840 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 1841 if (isUseJmx()) { 1842 try { 1843 ObjectName objectName = createConnectorObjectName(connector); 1844 getManagementContext().unregisterMBean(objectName); 1845 } catch (Throwable e) { 1846 throw IOExceptionSupport.create( 1847 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 1848 } 1849 } 1850 } 1851 1852 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 1853 return adaptor; 1854 } 1855 1856 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 1857 if (isUseJmx()) { 1858 } 1859 } 1860 1861 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 1862 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1863 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName=" 1864 + JMXSupport.encodeObjectNamePart(connector.getName())); 1865 } 1866 1867 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 1868 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 1869 try { 1870 ObjectName objectName = createNetworkConnectorObjectName(connector); 1871 connector.setObjectName(objectName); 1872 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1873 } catch (Throwable e) { 1874 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 1875 } 1876 } 1877 1878 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) 1879 throws MalformedObjectNameException { 1880 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1881 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," 1882 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1883 } 1884 1885 1886 public ObjectName createDuplexNetworkConnectorObjectName(String transport) 1887 throws MalformedObjectNameException { 1888 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1889 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," 1890 + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport)); 1891 } 1892 1893 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 1894 if (isUseJmx()) { 1895 try { 1896 ObjectName objectName = createNetworkConnectorObjectName(connector); 1897 getManagementContext().unregisterMBean(objectName); 1898 } catch (Exception e) { 1899 LOG.error("Network Connector could not be unregistered from JMX: " + e, e); 1900 } 1901 } 1902 } 1903 1904 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 1905 ProxyConnectorView view = new ProxyConnectorView(connector); 1906 try { 1907 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1908 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector," 1909 + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1910 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1911 } catch (Throwable e) { 1912 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1913 } 1914 } 1915 1916 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException { 1917 FTConnectorView view = new FTConnectorView(connector); 1918 try { 1919 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1920 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector"); 1921 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1922 } catch (Throwable e) { 1923 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1924 } 1925 } 1926 1927 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 1928 JmsConnectorView view = new JmsConnectorView(connector); 1929 try { 1930 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1931 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector," 1932 + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1933 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1934 } catch (Throwable e) { 1935 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1936 } 1937 } 1938 1939 /** 1940 * Factory method to create a new broker 1941 * 1942 * @throws Exception 1943 * @throws 1944 * @throws 1945 */ 1946 protected Broker createBroker() throws Exception { 1947 regionBroker = createRegionBroker(); 1948 Broker broker = addInterceptors(regionBroker); 1949 // Add a filter that will stop access to the broker once stopped 1950 broker = new MutableBrokerFilter(broker) { 1951 Broker old; 1952 1953 @Override 1954 public void stop() throws Exception { 1955 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 1956 // Just ignore additional stop actions. 1957 @Override 1958 public void stop() throws Exception { 1959 } 1960 }); 1961 old.stop(); 1962 } 1963 1964 @Override 1965 public void start() throws Exception { 1966 if (forceStart && old != null) { 1967 this.next.set(old); 1968 } 1969 getNext().start(); 1970 } 1971 }; 1972 return broker; 1973 } 1974 1975 /** 1976 * Factory method to create the core region broker onto which interceptors 1977 * are added 1978 * 1979 * @throws Exception 1980 */ 1981 protected Broker createRegionBroker() throws Exception { 1982 if (destinationInterceptors == null) { 1983 destinationInterceptors = createDefaultDestinationInterceptor(); 1984 } 1985 configureServices(destinationInterceptors); 1986 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 1987 if (destinationFactory == null) { 1988 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 1989 } 1990 return createRegionBroker(destinationInterceptor); 1991 } 1992 1993 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 1994 RegionBroker regionBroker; 1995 if (isUseJmx()) { 1996 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 1997 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 1998 } else { 1999 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2000 destinationInterceptor,getScheduler(),getExecutor()); 2001 } 2002 destinationFactory.setRegionBroker(regionBroker); 2003 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2004 regionBroker.setBrokerName(getBrokerName()); 2005 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2006 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2007 if (brokerId != null) { 2008 regionBroker.setBrokerId(brokerId); 2009 } 2010 return regionBroker; 2011 } 2012 2013 /** 2014 * Create the default destination interceptor 2015 */ 2016 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2017 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2018 if (isUseVirtualTopics()) { 2019 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2020 VirtualTopic virtualTopic = new VirtualTopic(); 2021 virtualTopic.setName("VirtualTopic.>"); 2022 VirtualDestination[] virtualDestinations = { virtualTopic }; 2023 interceptor.setVirtualDestinations(virtualDestinations); 2024 answer.add(interceptor); 2025 } 2026 if (isUseMirroredQueues()) { 2027 MirroredQueue interceptor = new MirroredQueue(); 2028 answer.add(interceptor); 2029 } 2030 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2031 answer.toArray(array); 2032 return array; 2033 } 2034 2035 /** 2036 * Strategy method to add interceptors to the broker 2037 * 2038 * @throws IOException 2039 */ 2040 protected Broker addInterceptors(Broker broker) throws Exception { 2041 if (isSchedulerSupport()) { 2042 SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile()); 2043 if (isUseJmx()) { 2044 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2045 try { 2046 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" 2047 + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," 2048 + "Type=jobScheduler," + "jobSchedulerName=JMS"); 2049 2050 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2051 this.adminView.setJMSJobScheduler(objectName); 2052 } catch (Throwable e) { 2053 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2054 + e.getMessage(), e); 2055 } 2056 2057 } 2058 broker = sb; 2059 } 2060 if (isAdvisorySupport()) { 2061 broker = new AdvisoryBroker(broker); 2062 } 2063 broker = new CompositeDestinationBroker(broker); 2064 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2065 if (isPopulateJMSXUserID()) { 2066 UserIDBroker userIDBroker = new UserIDBroker(broker); 2067 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2068 broker = userIDBroker; 2069 } 2070 if (isMonitorConnectionSplits()) { 2071 broker = new ConnectionSplitBroker(broker); 2072 } 2073 if (plugins != null) { 2074 for (int i = 0; i < plugins.length; i++) { 2075 BrokerPlugin plugin = plugins[i]; 2076 broker = plugin.installPlugin(broker); 2077 } 2078 } 2079 return broker; 2080 } 2081 2082 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2083 if (isPersistent()) { 2084 PersistenceAdapterFactory fac = getPersistenceFactory(); 2085 if (fac != null) { 2086 return fac.createPersistenceAdapter(); 2087 }else { 2088 KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); 2089 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2090 adaptor.setDirectory(dir); 2091 return adaptor; 2092 } 2093 } else { 2094 return new MemoryPersistenceAdapter(); 2095 } 2096 } 2097 2098 protected ObjectName createBrokerObjectName() throws IOException { 2099 try { 2100 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 2101 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker"); 2102 } catch (Throwable e) { 2103 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e); 2104 } 2105 } 2106 2107 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2108 TransportServer transport = TransportFactory.bind(this, brokerURI); 2109 return new TransportConnector(transport); 2110 } 2111 2112 /** 2113 * Extracts the port from the options 2114 */ 2115 protected Object getPort(Map options) { 2116 Object port = options.get("port"); 2117 if (port == null) { 2118 port = DEFAULT_PORT; 2119 LOG.warn("No port specified so defaulting to: " + port); 2120 } 2121 return port; 2122 } 2123 2124 protected void addShutdownHook() { 2125 if (useShutdownHook) { 2126 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2127 @Override 2128 public void run() { 2129 containerShutdown(); 2130 } 2131 }; 2132 Runtime.getRuntime().addShutdownHook(shutdownHook); 2133 } 2134 } 2135 2136 protected void removeShutdownHook() { 2137 if (shutdownHook != null) { 2138 try { 2139 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2140 } catch (Exception e) { 2141 LOG.debug("Caught exception, must be shutting down: " + e); 2142 } 2143 } 2144 } 2145 2146 /** 2147 * Sets hooks to be executed when broker shut down 2148 * 2149 * @org.apache.xbean.Property 2150 */ 2151 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2152 for (Runnable hook : hooks) { 2153 addShutdownHook(hook); 2154 } 2155 } 2156 2157 /** 2158 * Causes a clean shutdown of the container when the VM is being shut down 2159 */ 2160 protected void containerShutdown() { 2161 try { 2162 stop(); 2163 } catch (IOException e) { 2164 Throwable linkedException = e.getCause(); 2165 if (linkedException != null) { 2166 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2167 } else { 2168 logError("Failed to shut down: " + e, e); 2169 } 2170 if (!useLoggingForShutdownErrors) { 2171 e.printStackTrace(System.err); 2172 } 2173 } catch (Exception e) { 2174 logError("Failed to shut down: " + e, e); 2175 } 2176 } 2177 2178 protected void logError(String message, Throwable e) { 2179 if (useLoggingForShutdownErrors) { 2180 LOG.error("Failed to shut down: " + e); 2181 } else { 2182 System.err.println("Failed to shut down: " + e); 2183 } 2184 } 2185 2186 /** 2187 * Starts any configured destinations on startup 2188 */ 2189 protected void startDestinations() throws Exception { 2190 if (destinations != null) { 2191 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2192 for (int i = 0; i < destinations.length; i++) { 2193 ActiveMQDestination destination = destinations[i]; 2194 getBroker().addDestination(adminConnectionContext, destination,true); 2195 } 2196 } 2197 if (isUseVirtualTopics()) { 2198 startVirtualConsumerDestinations(); 2199 } 2200 } 2201 2202 /** 2203 * Returns the broker's administration connection context used for 2204 * configuring the broker at startup 2205 */ 2206 public ConnectionContext getAdminConnectionContext() throws Exception { 2207 return BrokerSupport.getConnectionContext(getBroker()); 2208 } 2209 2210 protected void waitForSlave() { 2211 try { 2212 if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) { 2213 throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); 2214 } 2215 } catch (InterruptedException e) { 2216 LOG.error("Exception waiting for slave:" + e); 2217 } 2218 } 2219 2220 protected void slaveConnectionEstablished() { 2221 slaveStartSignal.countDown(); 2222 } 2223 2224 protected void startManagementContext() throws Exception { 2225 getManagementContext().start(); 2226 adminView = new BrokerView(this, null); 2227 ObjectName objectName = getBrokerObjectName(); 2228 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2229 } 2230 2231 /** 2232 * Start all transport and network connections, proxies and bridges 2233 * 2234 * @throws Exception 2235 */ 2236 public void startAllConnectors() throws Exception { 2237 if (!isSlave()) { 2238 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2239 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2240 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2241 TransportConnector connector = iter.next(); 2242 connector.setBrokerService(this); 2243 al.add(startTransportConnector(connector)); 2244 } 2245 if (al.size() > 0) { 2246 // let's clear the transportConnectors list and replace it with 2247 // the started transportConnector instances 2248 this.transportConnectors.clear(); 2249 setTransportConnectors(al); 2250 } 2251 URI uri = getVmConnectorURI(); 2252 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 2253 map.put("network", "true"); 2254 map.put("async", "false"); 2255 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 2256 if (isWaitForSlave()) { 2257 waitForSlave(); 2258 } 2259 if (!stopped.get()) { 2260 ThreadPoolExecutor networkConnectorStartExecutor = null; 2261 if (isNetworkConnectorStartAsync()) { 2262 // spin up as many threads as needed 2263 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2264 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2265 new ThreadFactory() { 2266 int count=0; 2267 public Thread newThread(Runnable runnable) { 2268 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2269 thread.setDaemon(true); 2270 return thread; 2271 } 2272 }); 2273 } 2274 2275 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2276 final NetworkConnector connector = iter.next(); 2277 connector.setLocalUri(uri); 2278 connector.setBrokerName(getBrokerName()); 2279 connector.setDurableDestinations(durableDestinations); 2280 if (getDefaultSocketURIString() != null) { 2281 connector.setBrokerURL(getDefaultSocketURIString()); 2282 } 2283 if (networkConnectorStartExecutor != null) { 2284 networkConnectorStartExecutor.execute(new Runnable() { 2285 public void run() { 2286 try { 2287 LOG.info("Async start of " + connector); 2288 connector.start(); 2289 } catch(Exception e) { 2290 LOG.error("Async start of network connector: " + connector + " failed", e); 2291 } 2292 } 2293 }); 2294 } else { 2295 connector.start(); 2296 } 2297 } 2298 if (networkConnectorStartExecutor != null) { 2299 // executor done when enqueued tasks are complete 2300 networkConnectorStartExecutor.shutdown(); 2301 networkConnectorStartExecutor = null; 2302 } 2303 2304 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2305 ProxyConnector connector = iter.next(); 2306 connector.start(); 2307 } 2308 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2309 JmsConnector connector = iter.next(); 2310 connector.start(); 2311 } 2312 for (Service service : services) { 2313 configureService(service); 2314 service.start(); 2315 } 2316 } 2317 } 2318 } 2319 2320 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2321 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2322 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2323 if (policy != null) { 2324 connector.setMessageAuthorizationPolicy(policy); 2325 } 2326 if (isUseJmx()) { 2327 connector = registerConnectorMBean(connector); 2328 } 2329 connector.getStatistics().setEnabled(enableStatistics); 2330 connector.start(); 2331 return connector; 2332 } 2333 2334 /** 2335 * Perform any custom dependency injection 2336 */ 2337 protected void configureServices(Object[] services) { 2338 for (Object service : services) { 2339 configureService(service); 2340 } 2341 } 2342 2343 /** 2344 * Perform any custom dependency injection 2345 */ 2346 protected void configureService(Object service) { 2347 if (service instanceof BrokerServiceAware) { 2348 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2349 serviceAware.setBrokerService(this); 2350 } 2351 if (masterConnector == null) { 2352 if (service instanceof MasterConnector) { 2353 masterConnector = (MasterConnector) service; 2354 supportFailOver = true; 2355 } 2356 } 2357 } 2358 2359 public void handleIOException(IOException exception) { 2360 if (ioExceptionHandler != null) { 2361 ioExceptionHandler.handle(exception); 2362 } else { 2363 LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception); 2364 } 2365 } 2366 2367 protected void startVirtualConsumerDestinations() throws Exception { 2368 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2369 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2370 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2371 if (!destinations.isEmpty()) { 2372 for (ActiveMQDestination destination : destinations) { 2373 if (filter.matches(destination) == true) { 2374 broker.addDestination(adminConnectionContext, destination, false); 2375 } 2376 } 2377 } 2378 } 2379 2380 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2381 // created at startup, so no sync needed 2382 if (virtualConsumerDestinationFilter == null) { 2383 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2384 for (DestinationInterceptor interceptor : destinationInterceptors) { 2385 if (interceptor instanceof VirtualDestinationInterceptor) { 2386 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2387 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2388 if (virtualDestination instanceof VirtualTopic) { 2389 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2390 } 2391 } 2392 } 2393 } 2394 ActiveMQQueue filter = new ActiveMQQueue(); 2395 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2396 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2397 } 2398 return virtualConsumerDestinationFilter; 2399 } 2400 2401 protected synchronized ThreadPoolExecutor getExecutor() { 2402 if (this.executor == null) { 2403 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2404 2405 private long i = 0; 2406 2407 @Override 2408 public Thread newThread(Runnable runnable) { 2409 this.i++; 2410 Thread thread = new Thread(runnable, "BrokerService.worker." + this.i); 2411 thread.setDaemon(true); 2412 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2413 @Override 2414 public void uncaughtException(final Thread t, final Throwable e) { 2415 LOG.error("Error in thread '{}'", t.getName(), e); 2416 } 2417 }); 2418 return thread; 2419 } 2420 }, new RejectedExecutionHandler() { 2421 @Override 2422 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2423 try { 2424 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2425 } catch (InterruptedException e) { 2426 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2427 } 2428 2429 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2430 } 2431 }); 2432 } 2433 return this.executor; 2434 } 2435 2436 public synchronized Scheduler getScheduler() { 2437 if (this.scheduler==null) { 2438 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2439 try { 2440 this.scheduler.start(); 2441 } catch (Exception e) { 2442 LOG.error("Failed to start Scheduler ",e); 2443 } 2444 } 2445 return this.scheduler; 2446 } 2447 2448 public Broker getRegionBroker() { 2449 return regionBroker; 2450 } 2451 2452 public void setRegionBroker(Broker regionBroker) { 2453 this.regionBroker = regionBroker; 2454 } 2455 2456 public void addShutdownHook(Runnable hook) { 2457 synchronized (shutdownHooks) { 2458 shutdownHooks.add(hook); 2459 } 2460 } 2461 2462 public void removeShutdownHook(Runnable hook) { 2463 synchronized (shutdownHooks) { 2464 shutdownHooks.remove(hook); 2465 } 2466 } 2467 2468 public boolean isSystemExitOnShutdown() { 2469 return systemExitOnShutdown; 2470 } 2471 2472 /** 2473 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2474 */ 2475 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2476 this.systemExitOnShutdown = systemExitOnShutdown; 2477 } 2478 2479 public int getSystemExitOnShutdownExitCode() { 2480 return systemExitOnShutdownExitCode; 2481 } 2482 2483 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2484 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2485 } 2486 2487 public SslContext getSslContext() { 2488 return sslContext; 2489 } 2490 2491 public void setSslContext(SslContext sslContext) { 2492 this.sslContext = sslContext; 2493 } 2494 2495 public boolean isShutdownOnSlaveFailure() { 2496 return shutdownOnSlaveFailure; 2497 } 2498 2499 /** 2500 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2501 */ 2502 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2503 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2504 } 2505 2506 public boolean isWaitForSlave() { 2507 return waitForSlave; 2508 } 2509 2510 /** 2511 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2512 */ 2513 public void setWaitForSlave(boolean waitForSlave) { 2514 this.waitForSlave = waitForSlave; 2515 } 2516 2517 public long getWaitForSlaveTimeout() { 2518 return this.waitForSlaveTimeout; 2519 } 2520 2521 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2522 this.waitForSlaveTimeout = waitForSlaveTimeout; 2523 } 2524 2525 public CountDownLatch getSlaveStartSignal() { 2526 return slaveStartSignal; 2527 } 2528 2529 /** 2530 * Get the passiveSlave 2531 * @return the passiveSlave 2532 */ 2533 public boolean isPassiveSlave() { 2534 return this.passiveSlave; 2535 } 2536 2537 /** 2538 * Set the passiveSlave 2539 * @param passiveSlave the passiveSlave to set 2540 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2541 */ 2542 public void setPassiveSlave(boolean passiveSlave) { 2543 this.passiveSlave = passiveSlave; 2544 } 2545 2546 /** 2547 * override the Default IOException handler, called when persistence adapter 2548 * has experiences File or JDBC I/O Exceptions 2549 * 2550 * @param ioExceptionHandler 2551 */ 2552 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2553 configureService(ioExceptionHandler); 2554 this.ioExceptionHandler = ioExceptionHandler; 2555 } 2556 2557 public IOExceptionHandler getIoExceptionHandler() { 2558 return ioExceptionHandler; 2559 } 2560 2561 /** 2562 * @return the schedulerSupport 2563 */ 2564 public boolean isSchedulerSupport() { 2565 return this.schedulerSupport; 2566 } 2567 2568 /** 2569 * @param schedulerSupport the schedulerSupport to set 2570 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2571 */ 2572 public void setSchedulerSupport(boolean schedulerSupport) { 2573 this.schedulerSupport = schedulerSupport; 2574 } 2575 2576 /** 2577 * @return the schedulerDirectory 2578 */ 2579 public File getSchedulerDirectoryFile() { 2580 if (this.schedulerDirectoryFile == null) { 2581 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2582 } 2583 return schedulerDirectoryFile; 2584 } 2585 2586 /** 2587 * @param schedulerDirectory the schedulerDirectory to set 2588 */ 2589 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2590 this.schedulerDirectoryFile = schedulerDirectory; 2591 } 2592 2593 public void setSchedulerDirectory(String schedulerDirectory) { 2594 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2595 } 2596 2597 public int getSchedulePeriodForDestinationPurge() { 2598 return this.schedulePeriodForDestinationPurge; 2599 } 2600 2601 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2602 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2603 } 2604 2605 public int getMaxPurgedDestinationsPerSweep() { 2606 return this.maxPurgedDestinationsPerSweep; 2607 } 2608 2609 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 2610 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 2611 } 2612 2613 public BrokerContext getBrokerContext() { 2614 return brokerContext; 2615 } 2616 2617 public void setBrokerContext(BrokerContext brokerContext) { 2618 this.brokerContext = brokerContext; 2619 } 2620 2621 public void setBrokerId(String brokerId) { 2622 this.brokerId = new BrokerId(brokerId); 2623 } 2624 2625 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 2626 return useAuthenticatedPrincipalForJMSXUserID; 2627 } 2628 2629 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 2630 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 2631 } 2632 2633 /** 2634 * Should MBeans that support showing the Authenticated User Name information have this 2635 * value filled in or not. 2636 * 2637 * @return true if user names should be exposed in MBeans 2638 */ 2639 public boolean isPopulateUserNameInMBeans() { 2640 return this.populateUserNameInMBeans; 2641 } 2642 2643 /** 2644 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 2645 * @param true if MBeans should expose user name information. 2646 */ 2647 public void setPopulateUserNameInMBeans(boolean value) { 2648 this.populateUserNameInMBeans = value; 2649 } 2650 2651 public boolean isNetworkConnectorStartAsync() { 2652 return networkConnectorStartAsync; 2653 } 2654 2655 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 2656 this.networkConnectorStartAsync = networkConnectorStartAsync; 2657 } 2658 2659 public boolean isAllowTempAutoCreationOnSend() { 2660 return allowTempAutoCreationOnSend; 2661 } 2662 2663 /** 2664 * enable if temp destinations need to be propagated through a network when 2665 * advisorySupport==false. This is used in conjunction with the policy 2666 * gcInactiveDestinations for matching temps so they can get removed 2667 * when inactive 2668 * 2669 * @param allowTempAutoCreationOnSend 2670 */ 2671 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 2672 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 2673 } 2674 2675 public int getOfflineDurableSubscriberTimeout() { 2676 return offlineDurableSubscriberTimeout; 2677 } 2678 2679 public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) { 2680 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 2681 } 2682 2683 public int getOfflineDurableSubscriberTaskSchedule() { 2684 return offlineDurableSubscriberTaskSchedule; 2685 } 2686 2687 public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) { 2688 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 2689 } 2690 2691 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 2692 return isUseVirtualTopics() && destination.isQueue() && 2693 getVirtualTopicConsumerDestinationFilter().matches(destination); 2694 } 2695}