001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.util.Set;
020import javax.annotation.PostConstruct;
021import org.apache.activemq.broker.BrokerPluginSupport;
022import org.apache.activemq.broker.Connection;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.ConsumerBrokerExchange;
025import org.apache.activemq.broker.ProducerBrokerExchange;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.MessageReference;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.BrokerInfo;
031import org.apache.activemq.command.ConnectionInfo;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.DestinationInfo;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatch;
037import org.apache.activemq.command.MessageDispatchNotification;
038import org.apache.activemq.command.MessagePull;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveSubscriptionInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.command.SessionInfo;
043import org.apache.activemq.command.TransactionId;
044import org.apache.activemq.usage.Usage;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * A simple Broker intercepter which allows you to enable/disable logging.
050 *
051 * @org.apache.xbean.XBean
052 */
053
054public class LoggingBrokerPlugin extends BrokerPluginSupport {
055
056    private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
057
058    private boolean logAll = false;
059    private boolean logMessageEvents = false;
060    private boolean logConnectionEvents = true;
061    private boolean logSessionEvents = true;
062    private boolean logTransactionEvents = false;
063    private boolean logConsumerEvents = false;
064    private boolean logProducerEvents = false;
065    private boolean logInternalEvents = false;
066
067    /**
068     * @throws Exception
069     * @org.apache.xbean.InitMethod
070     */
071    @PostConstruct
072    public void afterPropertiesSet() throws Exception {
073        LOG.info("Created LoggingBrokerPlugin: " + this.toString());
074    }
075
076    public boolean isLogAll() {
077        return logAll;
078    }
079
080    /**
081     * Logger all Events that go through the Plugin
082     */
083    public void setLogAll(boolean logAll) {
084        this.logAll = logAll;
085    }
086
087    public boolean isLogMessageEvents() {
088        return logMessageEvents;
089    }
090
091    /**
092     * Logger Events that are related to message processing
093     */
094    public void setLogMessageEvents(boolean logMessageEvents) {
095        this.logMessageEvents = logMessageEvents;
096    }
097
098    public boolean isLogConnectionEvents() {
099        return logConnectionEvents;
100    }
101
102    /**
103     * Logger Events that are related to connections
104     */
105    public void setLogConnectionEvents(boolean logConnectionEvents) {
106        this.logConnectionEvents = logConnectionEvents;
107    }
108
109    public boolean isLogSessionEvents() {
110        return logSessionEvents;
111    }
112
113    /**
114     * Logger Events that are related to sessions
115     */
116    public void setLogSessionEvents(boolean logSessionEvents) {
117        this.logSessionEvents = logSessionEvents;
118    }
119
120    public boolean isLogTransactionEvents() {
121        return logTransactionEvents;
122    }
123
124    /**
125     * Logger Events that are related to transaction processing
126     */
127    public void setLogTransactionEvents(boolean logTransactionEvents) {
128        this.logTransactionEvents = logTransactionEvents;
129    }
130
131    public boolean isLogConsumerEvents() {
132        return logConsumerEvents;
133    }
134
135    /**
136     * Logger Events that are related to Consumers
137     */
138    public void setLogConsumerEvents(boolean logConsumerEvents) {
139        this.logConsumerEvents = logConsumerEvents;
140    }
141
142    public boolean isLogProducerEvents() {
143        return logProducerEvents;
144    }
145
146    /**
147     * Logger Events that are related to Producers
148     */
149    public void setLogProducerEvents(boolean logProducerEvents) {
150        this.logProducerEvents = logProducerEvents;
151    }
152
153    public boolean isLogInternalEvents() {
154        return logInternalEvents;
155    }
156
157    /**
158     * Logger Events that are normally internal to the broker
159     */
160    public void setLogInternalEvents(boolean logInternalEvents) {
161        this.logInternalEvents = logInternalEvents;
162    }
163
164    @Override
165    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
166        if (isLogAll() || isLogConsumerEvents()) {
167            LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
168                    + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
169            if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
170                LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
171                        + ", Last Message Id: " + ack.getLastMessageId());
172            }
173        }
174        super.acknowledge(consumerExchange, ack);
175    }
176
177    @Override
178    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
179        if (isLogAll() || isLogConsumerEvents()) {
180            LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
181        }
182        return super.messagePull(context, pull);
183    }
184
185    @Override
186    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
187        if (isLogAll() || isLogConnectionEvents()) {
188            LOG.info("Adding Connection : " + info);
189        }
190        super.addConnection(context, info);
191    }
192
193    @Override
194    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
195        if (isLogAll() || isLogConsumerEvents()) {
196            LOG.info("Adding Consumer : " + info);
197        }
198        return super.addConsumer(context, info);
199    }
200
201    @Override
202    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
203        if (isLogAll() || isLogProducerEvents()) {
204            LOG.info("Adding Producer :" + info);
205        }
206        super.addProducer(context, info);
207    }
208
209    @Override
210    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
211        if (isLogAll() || isLogTransactionEvents()) {
212            LOG.info("Commiting transaction : " + xid.getTransactionKey());
213        }
214        super.commitTransaction(context, xid, onePhase);
215    }
216
217    @Override
218    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
219        if (isLogAll() || isLogConsumerEvents()) {
220            LOG.info("Removing subscription : " + info);
221        }
222        super.removeSubscription(context, info);
223    }
224
225    @Override
226    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
227
228        TransactionId[] result = super.getPreparedTransactions(context);
229        if ((isLogAll() || isLogTransactionEvents()) && result != null) {
230            StringBuffer tids = new StringBuffer();
231            for (TransactionId tid : result) {
232                if (tids.length() > 0) {
233                    tids.append(", ");
234                }
235                tids.append(tid.getTransactionKey());
236            }
237            LOG.info("Prepared transactions : " + tids);
238        }
239        return result;
240    }
241
242    @Override
243    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
244        if (isLogAll() || isLogTransactionEvents()) {
245            LOG.info("Preparing transaction : " + xid.getTransactionKey());
246        }
247        return super.prepareTransaction(context, xid);
248    }
249
250    @Override
251    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
252        if (isLogAll() || isLogConnectionEvents()) {
253            LOG.info("Removing Connection : " + info);
254        }
255        super.removeConnection(context, info, error);
256    }
257
258    @Override
259    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
260        if (isLogAll() || isLogConsumerEvents()) {
261            LOG.info("Removing Consumer : " + info);
262        }
263        super.removeConsumer(context, info);
264    }
265
266    @Override
267    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
268        if (isLogAll() || isLogProducerEvents()) {
269            LOG.info("Removing Producer : " + info);
270        }
271        super.removeProducer(context, info);
272    }
273
274    @Override
275    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
276        if (isLogAll() || isLogTransactionEvents()) {
277            LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
278        }
279        super.rollbackTransaction(context, xid);
280    }
281
282    @Override
283    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
284        if (isLogAll() || isLogProducerEvents()) {
285            LOG.info("Sending message : " + messageSend.copy());
286        }
287        super.send(producerExchange, messageSend);
288    }
289
290    @Override
291    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
292        if (isLogAll() || isLogTransactionEvents()) {
293            LOG.info("Beginning transaction : " + xid.getTransactionKey());
294        }
295        super.beginTransaction(context, xid);
296    }
297
298    @Override
299    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
300        if (isLogAll() || isLogTransactionEvents()) {
301            LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
302        }
303        super.forgetTransaction(context, transactionId);
304    }
305
306    @Override
307    public Connection[] getClients() throws Exception {
308        Connection[] result = super.getClients();
309
310        if (isLogAll() || isLogInternalEvents()) {
311            if (result == null) {
312                LOG.info("Get Clients returned empty list.");
313            } else {
314                StringBuffer cids = new StringBuffer();
315                for (Connection c : result) {
316                    cids.append(cids.length() > 0 ? ", " : "");
317                    cids.append(c.getConnectionId());
318                }
319                LOG.info("Connected clients : " + cids);
320            }
321        }
322        return super.getClients();
323    }
324
325    @Override
326    public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
327            ActiveMQDestination destination, boolean create) throws Exception {
328        if (isLogAll() || isLogInternalEvents()) {
329            LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
330                    + destination.getPhysicalName());
331        }
332        return super.addDestination(context, destination, create);
333    }
334
335    @Override
336    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
337            throws Exception {
338        if (isLogAll() || isLogInternalEvents()) {
339            LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
340                    + destination.getPhysicalName());
341        }
342        super.removeDestination(context, destination, timeout);
343    }
344
345    @Override
346    public ActiveMQDestination[] getDestinations() throws Exception {
347        ActiveMQDestination[] result = super.getDestinations();
348        if (isLogAll() || isLogInternalEvents()) {
349            if (result == null) {
350                LOG.info("Get Destinations returned empty list.");
351            } else {
352                StringBuffer destinations = new StringBuffer();
353                for (ActiveMQDestination dest : result) {
354                    destinations.append(destinations.length() > 0 ? ", " : "");
355                    destinations.append(dest.getPhysicalName());
356                }
357                LOG.info("Get Destinations : " + destinations);
358            }
359        }
360        return result;
361    }
362
363    @Override
364    public void start() throws Exception {
365        if (isLogAll() || isLogInternalEvents()) {
366            LOG.info("Starting " + getBrokerName());
367        }
368        super.start();
369    }
370
371    @Override
372    public void stop() throws Exception {
373        if (isLogAll() || isLogInternalEvents()) {
374            LOG.info("Stopping " + getBrokerName());
375        }
376        super.stop();
377    }
378
379    @Override
380    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
381        if (isLogAll() || isLogSessionEvents()) {
382            LOG.info("Adding Session : " + info);
383        }
384        super.addSession(context, info);
385    }
386
387    @Override
388    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
389        if (isLogAll() || isLogSessionEvents()) {
390            LOG.info("Removing Session : " + info);
391        }
392        super.removeSession(context, info);
393    }
394
395    @Override
396    public void addBroker(Connection connection, BrokerInfo info) {
397        if (isLogAll() || isLogInternalEvents()) {
398            LOG.info("Adding Broker " + info.getBrokerName());
399        }
400        super.addBroker(connection, info);
401    }
402
403    @Override
404    public void removeBroker(Connection connection, BrokerInfo info) {
405        if (isLogAll() || isLogInternalEvents()) {
406            LOG.info("Removing Broker " + info.getBrokerName());
407        }
408        super.removeBroker(connection, info);
409    }
410
411    @Override
412    public BrokerInfo[] getPeerBrokerInfos() {
413        BrokerInfo[] result = super.getPeerBrokerInfos();
414        if (isLogAll() || isLogInternalEvents()) {
415            if (result == null) {
416                LOG.info("Get Peer Broker Infos returned empty list.");
417            } else {
418                StringBuffer peers = new StringBuffer();
419                for (BrokerInfo bi : result) {
420                    peers.append(peers.length() > 0 ? ", " : "");
421                    peers.append(bi.getBrokerName());
422                }
423                LOG.info("Get Peer Broker Infos : " + peers);
424            }
425        }
426        return result;
427    }
428
429    @Override
430    public void preProcessDispatch(MessageDispatch messageDispatch) {
431        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
432            LOG.info("preProcessDispatch :" + messageDispatch);
433        }
434        super.preProcessDispatch(messageDispatch);
435    }
436
437    @Override
438    public void postProcessDispatch(MessageDispatch messageDispatch) {
439        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
440            LOG.info("postProcessDispatch :" + messageDispatch);
441        }
442        super.postProcessDispatch(messageDispatch);
443    }
444
445    @Override
446    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
447        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
448            LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
449        }
450        super.processDispatchNotification(messageDispatchNotification);
451    }
452
453    @Override
454    public Set<ActiveMQDestination> getDurableDestinations() {
455        Set<ActiveMQDestination> result = super.getDurableDestinations();
456        if (isLogAll() || isLogInternalEvents()) {
457            if (result == null) {
458                LOG.info("Get Durable Destinations returned empty list.");
459            } else {
460                StringBuffer destinations = new StringBuffer();
461                for (ActiveMQDestination dest : result) {
462                    destinations.append(destinations.length() > 0 ? ", " : "");
463                    destinations.append(dest.getPhysicalName());
464                }
465                LOG.info("Get Durable Destinations : " + destinations);
466            }
467        }
468        return result;
469    }
470
471    @Override
472    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
473        if (isLogAll() || isLogInternalEvents()) {
474            LOG.info("Adding destination info : " + info);
475        }
476        super.addDestinationInfo(context, info);
477    }
478
479    @Override
480    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
481        if (isLogAll() || isLogInternalEvents()) {
482            LOG.info("Removing destination info : " + info);
483        }
484        super.removeDestinationInfo(context, info);
485    }
486
487    @Override
488    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
489        if (isLogAll() || isLogInternalEvents()) {
490            String msg = "Unable to display message.";
491
492            msg = message.getMessage().toString();
493
494            LOG.info("Message has expired : " + msg);
495        }
496        super.messageExpired(context, message, subscription);
497    }
498
499    @Override
500    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
501                                      Subscription subscription) {
502        if (isLogAll() || isLogInternalEvents()) {
503            String msg = "Unable to display message.";
504
505            msg = messageReference.getMessage().toString();
506
507            LOG.info("Sending to DLQ : " + msg);
508        }
509        super.sendToDeadLetterQueue(context, messageReference, subscription);
510    }
511
512    @Override
513    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
514        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
515            LOG.info("Fast Producer : " + producerInfo);
516        }
517        super.fastProducer(context, producerInfo);
518    }
519
520    @Override
521    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
522        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
523            LOG.info("Destination is full : " + destination.getName());
524        }
525        super.isFull(context, destination, usage);
526    }
527
528    @Override
529    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
530        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
531            String msg = "Unable to display message.";
532
533            msg = messageReference.getMessage().toString();
534
535            LOG.info("Message consumed : " + msg);
536        }
537        super.messageConsumed(context, messageReference);
538    }
539
540    @Override
541    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
542        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
543            String msg = "Unable to display message.";
544
545            msg = messageReference.getMessage().toString();
546
547            LOG.info("Message delivered : " + msg);
548        }
549        super.messageDelivered(context, messageReference);
550    }
551
552    @Override
553    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
554        if (isLogAll() || isLogInternalEvents()) {
555            String msg = "Unable to display message.";
556
557            msg = messageReference.getMessage().toString();
558
559            LOG.info("Message discarded : " + msg);
560        }
561        super.messageDiscarded(context, sub, messageReference);
562    }
563
564    @Override
565    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
566        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
567            LOG.info("Detected slow consumer on " + destination.getName());
568            StringBuffer buf = new StringBuffer("Connection(");
569            buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
570            buf.append(") Session(");
571            buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
572            buf.append(")");
573            LOG.info(buf.toString());
574        }
575        super.slowConsumer(context, destination, subs);
576    }
577
578    @Override
579    public void nowMasterBroker() {
580        if (isLogAll() || isLogInternalEvents()) {
581            LOG.info("Is now the master broker : " + getBrokerName());
582        }
583        super.nowMasterBroker();
584    }
585
586    @Override
587    public String toString() {
588        StringBuffer buf = new StringBuffer();
589        buf.append("LoggingBrokerPlugin(");
590        buf.append("logAll=");
591        buf.append(isLogAll());
592        buf.append(", logConnectionEvents=");
593        buf.append(isLogConnectionEvents());
594        buf.append(", logSessionEvents=");
595        buf.append(isLogSessionEvents());
596        buf.append(", logConsumerEvents=");
597        buf.append(isLogConsumerEvents());
598        buf.append(", logProducerEvents=");
599        buf.append(isLogProducerEvents());
600        buf.append(", logMessageEvents=");
601        buf.append(isLogMessageEvents());
602        buf.append(", logTransactionEvents=");
603        buf.append(isLogTransactionEvents());
604        buf.append(", logInternalEvents=");
605        buf.append(isLogInternalEvents());
606        buf.append(")");
607        return buf.toString();
608    }
609}