001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import org.apache.activemq.advisory.AdvisorySupport; 020import org.apache.activemq.broker.Broker; 021import org.apache.activemq.broker.BrokerFilter; 022import org.apache.activemq.broker.BrokerService; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.broker.ProducerBrokerExchange; 025import org.apache.activemq.broker.jmx.BrokerViewMBean; 026import org.apache.activemq.broker.jmx.SubscriptionViewMBean; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.DestinationStatistics; 029import org.apache.activemq.broker.region.RegionBroker; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQMapMessage; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageId; 034import org.apache.activemq.command.ProducerId; 035import org.apache.activemq.command.ProducerInfo; 036import org.apache.activemq.state.ProducerState; 037import org.apache.activemq.usage.SystemUsage; 038import org.apache.activemq.util.IdGenerator; 039import org.apache.activemq.util.LongSequenceGenerator; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import javax.jms.JMSException; 044import javax.management.ObjectName; 045import java.io.File; 046import java.net.URI; 047import java.util.Set; 048/** 049 * A StatisticsBroker You can retrieve a Map Message for a Destination - or 050 * Broker containing statistics as key-value pairs The message must contain a 051 * replyTo Destination - else its ignored 052 * 053 */ 054public class StatisticsBroker extends BrokerFilter { 055 private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); 056 static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; 057 static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; 058 static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; 059 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 060 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 061 protected final ProducerId advisoryProducerId = new ProducerId(); 062 protected BrokerViewMBean brokerView; 063 064 /** 065 * 066 * Constructor 067 * 068 * @param next 069 */ 070 public StatisticsBroker(Broker next) { 071 super(next); 072 this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 073 } 074 075 /** 076 * Sets the persistence mode 077 * 078 * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, 079 * org.apache.activemq.command.Message) 080 */ 081 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 082 ActiveMQDestination msgDest = messageSend.getDestination(); 083 ActiveMQDestination replyTo = messageSend.getReplyTo(); 084 if (replyTo != null) { 085 String physicalName = msgDest.getPhysicalName(); 086 boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, 087 STATS_DESTINATION_PREFIX.length()); 088 boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX 089 .length()); 090 boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX 091 .length()); 092 BrokerService brokerService = getBrokerService(); 093 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 094 if (destStats) { 095 String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); 096 ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType()); 097 Set<Destination> set = getDestinations(queryDest); 098 for (Destination dest : set) { 099 DestinationStatistics stats = dest.getDestinationStatistics(); 100 if (stats != null) { 101 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 102 statsMessage.setString("destinationName", dest.getActiveMQDestination().toString()); 103 statsMessage.setLong("size", stats.getMessages().getCount()); 104 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); 105 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); 106 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); 107 statsMessage.setLong("expiredCount", stats.getExpired().getCount()); 108 statsMessage.setLong("inflightCount", stats.getInflight().getCount()); 109 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); 110 statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage()); 111 statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage()); 112 statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit()); 113 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); 114 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); 115 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); 116 statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); 117 statsMessage.setLong("producerCount", stats.getProducers().getCount()); 118 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 119 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); 120 } 121 } 122 } else if (subStats) { 123 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo); 124 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo); 125 } else if (brokerStats) { 126 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 127 SystemUsage systemUsage = brokerService.getSystemUsage(); 128 DestinationStatistics stats = regionBroker.getDestinationStatistics(); 129 statsMessage.setString("brokerName", regionBroker.getBrokerName()); 130 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString()); 131 statsMessage.setLong("size", stats.getMessages().getCount()); 132 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); 133 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); 134 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); 135 statsMessage.setLong("expiredCount", stats.getExpired().getCount()); 136 statsMessage.setLong("inflightCount", stats.getInflight().getCount()); 137 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); 138 statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage()); 139 statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage()); 140 statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit()); 141 statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage()); 142 statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage()); 143 statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit()); 144 statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage()); 145 statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage()); 146 statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit()); 147 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); 148 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); 149 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); 150 statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); 151 statsMessage.setLong("producerCount", stats.getProducers().getCount()); 152 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); 153 answer = answer != null ? answer : ""; 154 statsMessage.setString("openwire", answer); 155 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp"); 156 answer = answer != null ? answer : ""; 157 statsMessage.setString("stomp", answer); 158 answer = brokerService.getTransportConnectorURIsAsMap().get("ssl"); 159 answer = answer != null ? answer : ""; 160 statsMessage.setString("ssl", answer); 161 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl"); 162 answer = answer != null ? answer : ""; 163 statsMessage.setString("stomp+ssl", answer); 164 URI uri = brokerService.getVmConnectorURI(); 165 answer = uri != null ? uri.toString() : ""; 166 statsMessage.setString("vm", answer); 167 File file = brokerService.getDataDirectoryFile(); 168 answer = file != null ? file.getCanonicalPath() : ""; 169 statsMessage.setString("dataDirectory", answer); 170 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 171 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); 172 } else { 173 super.send(producerExchange, messageSend); 174 } 175 } else { 176 super.send(producerExchange, messageSend); 177 } 178 } 179 180 BrokerViewMBean getBrokerView() throws Exception { 181 if (this.brokerView == null) { 182 ObjectName brokerName = getBrokerService().getBrokerObjectName(); 183 this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName, 184 BrokerViewMBean.class, true); 185 } 186 return this.brokerView; 187 } 188 189 public void start() throws Exception { 190 super.start(); 191 LOG.info("Starting StatisticsBroker"); 192 } 193 194 public void stop() throws Exception { 195 super.stop(); 196 } 197 198 protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception { 199 for (int i = 0; i < subscribers.length; i++) { 200 ObjectName name = subscribers[i]; 201 SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true); 202 ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber); 203 sendStats(context, statsMessage, replyTo); 204 } 205 } 206 207 protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException { 208 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 209 statsMessage.setString("destinationName", subscriber.getDestinationName()); 210 statsMessage.setString("clientId", subscriber.getClientId()); 211 statsMessage.setString("connectionId", subscriber.getConnectionId()); 212 statsMessage.setLong("sessionId", subscriber.getSessionId()); 213 statsMessage.setString("selector", subscriber.getSelector()); 214 statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter()); 215 statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter()); 216 statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter()); 217 statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize()); 218 statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize()); 219 statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit()); 220 statsMessage.setBoolean("exclusive", subscriber.isExclusive()); 221 statsMessage.setBoolean("retroactive", subscriber.isRetroactive()); 222 statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer()); 223 return statsMessage; 224 } 225 226 protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo) 227 throws Exception { 228 msg.setPersistent(false); 229 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 230 msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId())); 231 msg.setDestination(replyTo); 232 msg.setResponseRequired(false); 233 msg.setProducerId(this.advisoryProducerId); 234 boolean originalFlowControl = context.isProducerFlowControl(); 235 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 236 producerExchange.setConnectionContext(context); 237 producerExchange.setMutable(true); 238 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 239 try { 240 context.setProducerFlowControl(false); 241 this.next.send(producerExchange, msg); 242 } finally { 243 context.setProducerFlowControl(originalFlowControl); 244 } 245 } 246}