001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.discovery.multicast;
019
020import java.io.IOException;
021import java.net.DatagramPacket;
022import java.net.InetAddress;
023import java.net.InetSocketAddress;
024import java.net.MulticastSocket;
025import java.net.NetworkInterface;
026import java.net.SocketAddress;
027import java.net.SocketTimeoutException;
028import java.net.URI;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.LinkedBlockingQueue;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import org.apache.activemq.command.DiscoveryEvent;
040import org.apache.activemq.transport.discovery.DiscoveryAgent;
041import org.apache.activemq.transport.discovery.DiscoveryListener;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
047 * encoded using any wireformat, but openwire by default.
048 * 
049 * 
050 */
051public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
052
053    public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
054    public static final String DEFAULT_HOST_STR = "default"; 
055    public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
056    public static final int    DEFAULT_PORT  = 6155; 
057        
058    private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
059    private static final String TYPE_SUFFIX = "ActiveMQ-4.";
060    private static final String ALIVE = "alive.";
061    private static final String DEAD = "dead.";
062    private static final String DELIMITER = "%";
063    private static final int BUFF_SIZE = 8192;
064    private static final int DEFAULT_IDLE_TIME = 500;
065    private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
066
067    private long initialReconnectDelay = 1000 * 5;
068    private long maxReconnectDelay = 1000 * 30;
069    private long backOffMultiplier = 2;
070    private boolean useExponentialBackOff;
071    private int maxReconnectAttempts;
072
073    private int timeToLive = 1;
074    private boolean loopBackMode;
075    private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
076    private String group = "default";
077    private URI discoveryURI;
078    private InetAddress inetAddress;
079    private SocketAddress sockAddress;
080    private DiscoveryListener discoveryListener;
081    private String selfService;
082    private MulticastSocket mcast;
083    private Thread runner;
084    private long keepAliveInterval = DEFAULT_IDLE_TIME;
085    private String mcInterface;
086    private String mcNetworkInterface;
087    private String mcJoinNetworkInterface;
088    private long lastAdvertizeTime;
089    private AtomicBoolean started = new AtomicBoolean(false);
090    private boolean reportAdvertizeFailed = true;
091    private ExecutorService executor = null;
092
093    class RemoteBrokerData {
094        final String brokerName;
095        final String service;
096        long lastHeartBeat;
097        long recoveryTime;
098        int failureCount;
099        boolean failed;
100
101        public RemoteBrokerData(String brokerName, String service) {
102            this.brokerName = brokerName;
103            this.service = service;
104            this.lastHeartBeat = System.currentTimeMillis();
105        }
106
107        public synchronized void updateHeartBeat() {
108            lastHeartBeat = System.currentTimeMillis();
109
110            // Consider that the broker recovery has succeeded if it has not
111            // failed in 60 seconds.
112            if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
113                if (LOG.isDebugEnabled()) {
114                    LOG.debug("I now think that the " + service + " service has recovered.");
115                }
116                failureCount = 0;
117                recoveryTime = 0;
118            }
119        }
120
121        public synchronized long getLastHeartBeat() {
122            return lastHeartBeat;
123        }
124
125        public synchronized boolean markFailed() {
126            if (!failed) {
127                failed = true;
128                failureCount++;
129
130                long reconnectDelay;
131                if (!useExponentialBackOff) {
132                    reconnectDelay = initialReconnectDelay;
133                } else {
134                    reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
135                    if (reconnectDelay > maxReconnectDelay) {
136                        reconnectDelay = maxReconnectDelay;
137                    }
138                }
139
140                if (LOG.isDebugEnabled()) {
141                    LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
142                              + " ms, the current failure count is: " + failureCount);
143                }
144
145                recoveryTime = System.currentTimeMillis() + reconnectDelay;
146                return true;
147            }
148            return false;
149        }
150
151        /**
152         * @return true if this broker is marked failed and it is now the right
153         *         time to start recovery.
154         */
155        public synchronized boolean doRecovery() {
156            if (!failed) {
157                return false;
158            }
159
160            // Are we done trying to recover this guy?
161            if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
162                if (LOG.isDebugEnabled()) {
163                    LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
164                }
165                return false;
166            }
167
168            // Is it not yet time?
169            if (System.currentTimeMillis() < recoveryTime) {
170                return false;
171            }
172
173            if (LOG.isDebugEnabled()) {
174                LOG.debug("Resuming event advertisement of the " + service + " service.");
175            }
176            failed = false;
177            return true;
178        }
179
180        public boolean isFailed() {
181            return failed;
182        }
183    }
184
185    /**
186     * Set the discovery listener
187     * 
188     * @param listener
189     */
190    public void setDiscoveryListener(DiscoveryListener listener) {
191        this.discoveryListener = listener;
192    }
193
194    /**
195     * register a service
196     */
197    public void registerService(String name) throws IOException {
198        this.selfService = name;
199        if (started.get()) {
200            doAdvertizeSelf();
201        }
202    }
203
204    /**
205     * @return Returns the loopBackMode.
206     */
207    public boolean isLoopBackMode() {
208        return loopBackMode;
209    }
210
211    /**
212     * @param loopBackMode The loopBackMode to set.
213     */
214    public void setLoopBackMode(boolean loopBackMode) {
215        this.loopBackMode = loopBackMode;
216    }
217
218    /**
219     * @return Returns the timeToLive.
220     */
221    public int getTimeToLive() {
222        return timeToLive;
223    }
224
225    /**
226     * @param timeToLive The timeToLive to set.
227     */
228    public void setTimeToLive(int timeToLive) {
229        this.timeToLive = timeToLive;
230    }
231
232    /**
233     * @return the discoveryURI
234     */
235    public URI getDiscoveryURI() {
236        return discoveryURI;
237    }
238
239    /**
240     * Set the discoveryURI
241     * 
242     * @param discoveryURI
243     */
244    public void setDiscoveryURI(URI discoveryURI) {
245        this.discoveryURI = discoveryURI;
246    }
247
248    public long getKeepAliveInterval() {
249        return keepAliveInterval;
250    }
251
252    public void setKeepAliveInterval(long keepAliveInterval) {
253        this.keepAliveInterval = keepAliveInterval;
254    }
255    
256    public void setInterface(String mcInterface) {
257        this.mcInterface = mcInterface;
258    }
259    
260    public void setNetworkInterface(String mcNetworkInterface) {
261        this.mcNetworkInterface = mcNetworkInterface;    
262    }
263    
264    public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
265        this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
266    }
267    
268    /**
269     * start the discovery agent
270     * 
271     * @throws Exception
272     */
273    public void start() throws Exception {
274        
275        if (started.compareAndSet(false, true)) {               
276                                
277            if (group == null || group.length() == 0) {
278                throw new IOException("You must specify a group to discover");
279            }
280            String type = getType();
281            if (!type.endsWith(".")) {
282                LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
283                type += ".";
284            }
285            
286            if (discoveryURI == null) {
287                discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
288            }
289            
290            if (LOG.isTraceEnabled()) 
291                        LOG.trace("start - discoveryURI = " + discoveryURI);                                      
292                  
293                  String myHost = discoveryURI.getHost();
294                  int    myPort = discoveryURI.getPort(); 
295                     
296                  if( DEFAULT_HOST_STR.equals(myHost) ) 
297                        myHost = DEFAULT_HOST_IP;                         
298                  
299                  if(myPort < 0 )
300                    myPort = DEFAULT_PORT;                  
301                  
302                  if (LOG.isTraceEnabled()) {
303                        LOG.trace("start - myHost = " + myHost); 
304                        LOG.trace("start - myPort = " + myPort);        
305                        LOG.trace("start - group  = " + group );                                
306                        LOG.trace("start - interface  = " + mcInterface );
307                        LOG.trace("start - network interface  = " + mcNetworkInterface );
308                        LOG.trace("start - join network interface  = " + mcJoinNetworkInterface );
309                  }     
310                  
311            this.inetAddress = InetAddress.getByName(myHost);
312            this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
313            mcast = new MulticastSocket(myPort);
314            mcast.setLoopbackMode(loopBackMode);
315            mcast.setTimeToLive(getTimeToLive());
316            if (mcJoinNetworkInterface != null) {
317                mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
318            }
319            else {
320                mcast.joinGroup(inetAddress);
321            }
322            mcast.setSoTimeout((int)keepAliveInterval);
323            if (mcInterface != null) {
324                mcast.setInterface(InetAddress.getByName(mcInterface));
325            }
326            if (mcNetworkInterface != null) {
327                mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
328            }
329            runner = new Thread(this);
330            runner.setName(this.toString() + ":" + runner.getName());
331            runner.setDaemon(true);
332            runner.start();
333            doAdvertizeSelf();
334        }
335    }
336
337    /**
338     * stop the channel
339     * 
340     * @throws Exception
341     */
342    public void stop() throws Exception {
343        if (started.compareAndSet(true, false)) {
344            doAdvertizeSelf();
345            if (mcast != null) {
346                mcast.close();
347            }
348            if (runner != null) {
349                runner.interrupt();
350            }
351            getExecutor().shutdownNow();
352        }
353    }
354
355    public String getType() {
356        return group + "." + TYPE_SUFFIX;
357    }
358
359    public void run() {
360        byte[] buf = new byte[BUFF_SIZE];
361        DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
362        while (started.get()) {
363            doTimeKeepingServices();
364            try {
365                mcast.receive(packet);
366                if (packet.getLength() > 0) {
367                    String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
368                    processData(str);
369                }
370            } catch (SocketTimeoutException se) {
371                // ignore
372            } catch (IOException e) {
373                if (started.get()) {
374                    LOG.error("failed to process packet: " + e);
375                }
376            }
377        }
378    }
379
380    private void processData(String str) {
381        if (discoveryListener != null) {
382            if (str.startsWith(getType())) {
383                String payload = str.substring(getType().length());
384                if (payload.startsWith(ALIVE)) {
385                    String brokerName = getBrokerName(payload.substring(ALIVE.length()));
386                    String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
387                    processAlive(brokerName, service);
388                } else {
389                    String brokerName = getBrokerName(payload.substring(DEAD.length()));
390                    String service = payload.substring(DEAD.length() + brokerName.length() + 2);
391                    processDead(service);
392                }
393            }
394        }
395    }
396
397    private void doTimeKeepingServices() {
398        if (started.get()) {
399            long currentTime = System.currentTimeMillis();
400            if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
401                doAdvertizeSelf();
402                lastAdvertizeTime = currentTime;
403            }
404            doExpireOldServices();
405        }
406    }
407
408    private void doAdvertizeSelf() {
409        if (selfService != null) {
410            String payload = getType();
411            payload += started.get() ? ALIVE : DEAD;
412            payload += DELIMITER + "localhost" + DELIMITER;
413            payload += selfService;
414            try {
415                byte[] data = payload.getBytes();
416                DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
417                mcast.send(packet);
418            } catch (IOException e) {
419                // If a send fails, chances are all subsequent sends will fail
420                // too.. No need to keep reporting the
421                // same error over and over.
422                if (reportAdvertizeFailed) {
423                    reportAdvertizeFailed = false;
424                    LOG.error("Failed to advertise our service: " + payload, e);
425                    if ("Operation not permitted".equals(e.getMessage())) {
426                        LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
427                                  + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
428                    }
429                }
430            }
431        }
432    }
433
434    private void processAlive(String brokerName, String service) {
435        if (selfService == null || !service.equals(selfService)) {
436            RemoteBrokerData data = brokersByService.get(service);
437            if (data == null) {
438                data = new RemoteBrokerData(brokerName, service);
439                brokersByService.put(service, data);      
440                fireServiceAddEvent(data);
441                doAdvertizeSelf();
442            } else {
443                data.updateHeartBeat();
444                if (data.doRecovery()) {
445                    fireServiceAddEvent(data);
446                }
447            }
448        }
449    }
450
451    private void processDead(String service) {
452        if (!service.equals(selfService)) {
453            RemoteBrokerData data = brokersByService.remove(service);
454            if (data != null && !data.isFailed()) {
455                fireServiceRemovedEvent(data);
456            }
457        }
458    }
459
460    private void doExpireOldServices() {
461        long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
462        for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
463            RemoteBrokerData data = i.next();
464            if (data.getLastHeartBeat() < expireTime) {
465                processDead(data.service);
466            }
467        }
468    }
469
470    private String getBrokerName(String str) {
471        String result = null;
472        int start = str.indexOf(DELIMITER);
473        if (start >= 0) {
474            int end = str.indexOf(DELIMITER, start + 1);
475            result = str.substring(start + 1, end);
476        }
477        return result;
478    }
479
480    public void serviceFailed(DiscoveryEvent event) throws IOException {
481        RemoteBrokerData data = brokersByService.get(event.getServiceName());
482        if (data != null && data.markFailed()) {
483            fireServiceRemovedEvent(data);
484        }
485    }
486
487    private void fireServiceRemovedEvent(RemoteBrokerData data) {
488        if (discoveryListener != null && started.get()) {
489            final DiscoveryEvent event = new DiscoveryEvent(data.service);
490            event.setBrokerName(data.brokerName);
491
492            // Have the listener process the event async so that
493            // he does not block this thread since we are doing time sensitive
494            // processing of events.
495            getExecutor().execute(new Runnable() {
496                public void run() {
497                    DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
498                    if (discoveryListener != null) {
499                        discoveryListener.onServiceRemove(event);
500                    }
501                }
502            });
503        }
504    }
505
506    private void fireServiceAddEvent(RemoteBrokerData data) {
507        if (discoveryListener != null && started.get()) {
508            final DiscoveryEvent event = new DiscoveryEvent(data.service);
509            event.setBrokerName(data.brokerName);
510            
511            // Have the listener process the event async so that
512            // he does not block this thread since we are doing time sensitive
513            // processing of events.
514            getExecutor().execute(new Runnable() {
515                public void run() {
516                    DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
517                    if (discoveryListener != null) {
518                        discoveryListener.onServiceAdd(event);
519                    }
520                }
521            });
522        }
523    }
524
525    private ExecutorService getExecutor() {
526        if (executor == null) {
527            final String threadName = "Notifier-" + this.toString();
528            executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
529                public Thread newThread(Runnable runable) {
530                    Thread t = new Thread(runable,  threadName);
531                    t.setDaemon(true);
532                    return t;
533                }
534            });
535        }
536        return executor;
537    }
538
539    public long getBackOffMultiplier() {
540        return backOffMultiplier;
541    }
542
543    public void setBackOffMultiplier(long backOffMultiplier) {
544        this.backOffMultiplier = backOffMultiplier;
545    }
546
547    public long getInitialReconnectDelay() {
548        return initialReconnectDelay;
549    }
550
551    public void setInitialReconnectDelay(long initialReconnectDelay) {
552        this.initialReconnectDelay = initialReconnectDelay;
553    }
554
555    public int getMaxReconnectAttempts() {
556        return maxReconnectAttempts;
557    }
558
559    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
560        this.maxReconnectAttempts = maxReconnectAttempts;
561    }
562
563    public long getMaxReconnectDelay() {
564        return maxReconnectDelay;
565    }
566
567    public void setMaxReconnectDelay(long maxReconnectDelay) {
568        this.maxReconnectDelay = maxReconnectDelay;
569    }
570
571    public boolean isUseExponentialBackOff() {
572        return useExponentialBackOff;
573    }
574
575    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
576        this.useExponentialBackOff = useExponentialBackOff;
577    }
578
579    public void setGroup(String group) {
580        this.group = group;
581    }
582    
583    @Override
584    public String toString() {
585        return  "MulticastDiscoveryAgent-"
586            + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
587    }
588}