001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.view;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.ConnectionContext;
021import org.apache.activemq.broker.ProducerBrokerExchange;
022import org.apache.activemq.broker.jmx.BrokerViewMBean;
023import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
024import org.apache.activemq.broker.region.Subscription;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.ProducerId;
029import org.apache.activemq.command.ProducerInfo;
030import org.apache.activemq.filter.DestinationMapNode;
031import java.io.IOException;
032import java.io.PrintWriter;
033import java.util.Collection;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.Map;
038import java.util.Set;
039import javax.management.ObjectName;
040
041/**
042 * 
043 */
044public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
045
046    protected static final String ID_SEPARATOR = "_";
047
048    private final boolean redrawOnRemove;
049    private boolean clearProducerCacheAfterRender;
050    private String domain = "org.apache.activemq";
051    private BrokerViewMBean brokerView;
052
053    // until we have some MBeans for producers, lets do it all ourselves
054    private Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
055    private Map<ProducerId, Set<ActiveMQDestination>> producerDestinations = new HashMap<ProducerId, Set<ActiveMQDestination>>();
056    private Object lock = new Object();
057
058    public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException {
059        super(next, file);
060        this.redrawOnRemove = redrawOnRemove;
061        
062    }
063
064    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
065        Subscription answer = super.addConsumer(context, info);
066        generateFile();
067        return answer;
068    }
069
070    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
071        super.addProducer(context, info);
072        ProducerId producerId = info.getProducerId();
073        synchronized (lock) {
074            producers.put(producerId, info);
075        }
076        generateFile();
077    }
078
079    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
080        super.removeConsumer(context, info);
081        if (redrawOnRemove) {
082            generateFile();
083        }
084    }
085
086    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
087        super.removeProducer(context, info);
088        ProducerId producerId = info.getProducerId();
089        if (redrawOnRemove) {
090            synchronized (lock) {
091                producerDestinations.remove(producerId);
092                producers.remove(producerId);
093            }
094            generateFile();
095        }
096    }
097
098    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
099        super.send(producerExchange, messageSend);
100        ProducerId producerId = messageSend.getProducerId();
101        ActiveMQDestination destination = messageSend.getDestination();
102        synchronized (lock) {
103            Set<ActiveMQDestination> destinations = producerDestinations.get(producerId);
104            if (destinations == null) {
105                destinations = new HashSet<ActiveMQDestination>();
106            }
107            producerDestinations.put(producerId, destinations);
108            destinations.add(destination);
109        }
110    }
111
112    protected void generateFile(PrintWriter writer) throws Exception {
113
114        writer.println("digraph \"ActiveMQ Connections\" {");
115        writer.println();
116        writer.println("label=\"ActiveMQ Broker: " + getBrokerView().getBrokerId() + "\"];");
117        writer.println();
118        writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];");
119        writer.println();
120
121        Map<String, String> clients = new HashMap<String, String>();
122        Map<String, String> queues = new HashMap<String, String>();
123        Map<String, String> topics = new HashMap<String, String>();
124
125        printSubscribers(writer, clients, queues, "queue_", getBrokerView().getQueueSubscribers());
126        writer.println();
127
128        printSubscribers(writer, clients, topics, "topic_", getBrokerView().getTopicSubscribers());
129        writer.println();
130
131        printProducers(writer, clients, queues, topics);
132        writer.println();
133
134        writeLabels(writer, "green", "Client: ", clients);
135        writer.println();
136
137        writeLabels(writer, "red", "Queue: ", queues);
138        writeLabels(writer, "blue", "Topic: ", topics);
139        writer.println("}");
140
141        if (clearProducerCacheAfterRender) {
142            producerDestinations.clear();
143        }
144    }
145
146    protected void printProducers(PrintWriter writer, Map<String, String> clients, Map<String, String> queues, Map<String, String> topics) {
147        synchronized (lock) {
148            for (Iterator iter = producerDestinations.entrySet().iterator(); iter.hasNext();) {
149                Map.Entry entry = (Map.Entry)iter.next();
150                ProducerId producerId = (ProducerId)entry.getKey();
151                Set destinationSet = (Set)entry.getValue();
152                printProducers(writer, clients, queues, topics, producerId, destinationSet);
153            }
154        }
155    }
156
157    protected void printProducers(PrintWriter writer, Map<String, String> clients, Map<String, String> queues, Map<String, String> topics, ProducerId producerId, Set destinationSet) {
158        for (Iterator iter = destinationSet.iterator(); iter.hasNext();) {
159            ActiveMQDestination destination = (ActiveMQDestination)iter.next();
160
161            // TODO use clientId one day
162            String clientId = producerId.getConnectionId();
163            String safeClientId = asID(clientId);
164            clients.put(safeClientId, clientId);
165
166            String physicalName = destination.getPhysicalName();
167            String safeDestinationId = asID(physicalName);
168            if (destination.isTopic()) {
169                safeDestinationId = "topic_" + safeDestinationId;
170                topics.put(safeDestinationId, physicalName);
171            } else {
172                safeDestinationId = "queue_" + safeDestinationId;
173                queues.put(safeDestinationId, physicalName);
174            }
175
176            String safeProducerId = asID(producerId.toString());
177
178            // lets write out the links
179
180            writer.print(safeClientId);
181            writer.print(" -> ");
182            writer.print(safeProducerId);
183            writer.println(";");
184
185            writer.print(safeProducerId);
186            writer.print(" -> ");
187            writer.print(safeDestinationId);
188            writer.println(";");
189
190            // now lets write out the label
191            writer.print(safeProducerId);
192            writer.print(" [label = \"");
193            String label = "Producer: " + producerId.getSessionId() + "-" + producerId.getValue();
194            writer.print(label);
195            writer.println("\"];");
196
197        }
198    }
199
200    protected void printSubscribers(PrintWriter writer, Map<String, String> clients, Map<String, String> destinations, String type, ObjectName[] subscribers) {
201        for (int i = 0; i < subscribers.length; i++) {
202            ObjectName name = subscribers[i];
203            SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
204
205            String clientId = subscriber.getClientId();
206            String safeClientId = asID(clientId);
207            clients.put(safeClientId, clientId);
208
209            String destination = subscriber.getDestinationName();
210            String safeDestinationId = type + asID(destination);
211            destinations.put(safeDestinationId, destination);
212
213            String selector = subscriber.getSelector();
214
215            // lets write out the links
216            String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubcriptionId();
217
218            writer.print(subscriberId);
219            writer.print(" -> ");
220            writer.print(safeClientId);
221            writer.println(";");
222
223            writer.print(safeDestinationId);
224            writer.print(" -> ");
225            writer.print(subscriberId);
226            writer.println(";");
227
228            // now lets write out the label
229            writer.print(subscriberId);
230            writer.print(" [label = \"");
231            String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubcriptionId();
232            if (selector != null && selector.length() > 0) {
233                label = label + "\\nSelector: " + selector;
234            }
235            writer.print(label);
236            writer.println("\"];");
237        }
238    }
239
240    protected void writeLabels(PrintWriter writer, String color, String prefix, Map<String, String> map) {
241        for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
242            Map.Entry entry = (Map.Entry)iter.next();
243            String id = (String)entry.getKey();
244            String label = (String)entry.getValue();
245
246            writer.print(id);
247            writer.print(" [ fillcolor = ");
248            writer.print(color);
249            writer.print(", label = \"");
250            writer.print(prefix);
251            writer.print(label);
252            writer.println("\"];");
253        }
254    }
255
256    /**
257     * Lets strip out any non supported characters
258     */
259    protected String asID(String name) {
260        StringBuffer buffer = new StringBuffer();
261        int size = name.length();
262        for (int i = 0; i < size; i++) {
263            char ch = name.charAt(i);
264            if (Character.isLetterOrDigit(ch) || ch == '_') {
265                buffer.append(ch);
266            } else {
267                buffer.append('_');
268            }
269        }
270        return buffer.toString();
271    }
272
273    protected void printNodes(PrintWriter writer, DestinationMapNode node, String prefix) {
274        String path = getPath(node);
275        writer.print("  ");
276        writer.print(prefix);
277        writer.print(ID_SEPARATOR);
278        writer.print(path);
279        String label = path;
280        if (prefix.equals("topic")) {
281            label = "Topics";
282        } else if (prefix.equals("queue")) {
283            label = "Queues";
284        }
285        writer.print("[ label = \"");
286        writer.print(label);
287        writer.println("\" ];");
288
289        Collection children = node.getChildren();
290        for (Iterator iter = children.iterator(); iter.hasNext();) {
291            DestinationMapNode child = (DestinationMapNode)iter.next();
292            printNodes(writer, child, prefix + ID_SEPARATOR + path);
293        }
294    }
295
296    protected void printNodeLinks(PrintWriter writer, DestinationMapNode node, String prefix) {
297        String path = getPath(node);
298        Collection children = node.getChildren();
299        for (Iterator iter = children.iterator(); iter.hasNext();) {
300            DestinationMapNode child = (DestinationMapNode)iter.next();
301
302            writer.print("  ");
303            writer.print(prefix);
304            writer.print(ID_SEPARATOR);
305            writer.print(path);
306            writer.print(" -> ");
307            writer.print(prefix);
308            writer.print(ID_SEPARATOR);
309            writer.print(path);
310            writer.print(ID_SEPARATOR);
311            writer.print(getPath(child));
312            writer.println(";");
313
314            printNodeLinks(writer, child, prefix + ID_SEPARATOR + path);
315        }
316    }
317
318    protected String getPath(DestinationMapNode node) {
319        String path = node.getPath();
320        if (path.equals("*")) {
321            return "root";
322        }
323        return path;
324    }
325    
326    BrokerViewMBean getBrokerView() throws Exception {
327        if (this.brokerView == null) {
328            ObjectName brokerName = getBrokerService().getBrokerObjectName();
329            this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
330                    BrokerViewMBean.class, true);
331        }
332        return this.brokerView;
333    }
334}