001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.discovery; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.ConcurrentMap; 024 025import org.apache.activemq.command.DiscoveryEvent; 026import org.apache.activemq.transport.CompositeTransport; 027import org.apache.activemq.transport.TransportFilter; 028import org.apache.activemq.util.ServiceStopper; 029import org.apache.activemq.util.Suspendable; 030import org.apache.activemq.util.URISupport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * A {@link TransportFilter} which uses a {@link DiscoveryAgent} to 036 * discover remote broker instances and dynamically connect to them. 037 */ 038public class DiscoveryTransport extends TransportFilter implements DiscoveryListener { 039 040 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryTransport.class); 041 042 private final CompositeTransport next; 043 private DiscoveryAgent discoveryAgent; 044 private final ConcurrentMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>(); 045 046 private Map<String, String> parameters; 047 048 public DiscoveryTransport(CompositeTransport next) { 049 super(next); 050 this.next = next; 051 } 052 053 @Override 054 public void start() throws Exception { 055 if (discoveryAgent == null) { 056 throw new IllegalStateException("discoveryAgent not configured"); 057 } 058 059 // lets pass into the agent the broker name and connection details 060 discoveryAgent.setDiscoveryListener(this); 061 discoveryAgent.start(); 062 next.start(); 063 } 064 065 @Override 066 public void stop() throws Exception { 067 ServiceStopper ss = new ServiceStopper(); 068 ss.stop(discoveryAgent); 069 ss.stop(next); 070 ss.throwFirstException(); 071 } 072 073 @Override 074 public void onServiceAdd(DiscoveryEvent event) { 075 String url = event.getServiceName(); 076 if (url != null) { 077 try { 078 URI uri = new URI(url); 079 LOG.info("Adding new broker connection URL: " + uri); 080 uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX); 081 serviceURIs.put(event.getServiceName(), uri); 082 next.add(false,new URI[] {uri}); 083 } catch (URISyntaxException e) { 084 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 085 } 086 } 087 } 088 089 @Override 090 public void onServiceRemove(DiscoveryEvent event) { 091 URI uri = serviceURIs.get(event.getServiceName()); 092 if (uri != null) { 093 next.remove(false,new URI[] {uri}); 094 } 095 } 096 097 public DiscoveryAgent getDiscoveryAgent() { 098 return discoveryAgent; 099 } 100 101 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 102 this.discoveryAgent = discoveryAgent; 103 } 104 105 public void setParameters(Map<String, String> parameters) { 106 this.parameters = parameters; 107 } 108 109 @Override 110 public void transportResumed() { 111 if( discoveryAgent instanceof Suspendable ) { 112 try { 113 ((Suspendable)discoveryAgent).suspend(); 114 } catch (Exception e) { 115 LOG.warn("Exception suspending discoverAgent: ", discoveryAgent); 116 } 117 } 118 super.transportResumed(); 119 } 120 121 @Override 122 public void transportInterupted() { 123 if( discoveryAgent instanceof Suspendable ) { 124 try { 125 ((Suspendable)discoveryAgent).resume(); 126 } catch (Exception e) { 127 LOG.warn("Exception resuming discoverAgent: ", discoveryAgent); 128 } 129 } 130 super.transportInterupted(); 131 } 132}