001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.LinkedList; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicReference; 032 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.JMSException; 036import javax.jms.Message; 037import javax.jms.MessageConsumer; 038import javax.jms.MessageListener; 039import javax.jms.TransactionRolledBackException; 040 041import org.apache.activemq.blob.BlobDownloader; 042import org.apache.activemq.command.ActiveMQBlobMessage; 043import org.apache.activemq.command.ActiveMQDestination; 044import org.apache.activemq.command.ActiveMQMessage; 045import org.apache.activemq.command.ActiveMQTempDestination; 046import org.apache.activemq.command.CommandTypes; 047import org.apache.activemq.command.ConsumerId; 048import org.apache.activemq.command.ConsumerInfo; 049import org.apache.activemq.command.MessageAck; 050import org.apache.activemq.command.MessageDispatch; 051import org.apache.activemq.command.MessageId; 052import org.apache.activemq.command.MessagePull; 053import org.apache.activemq.command.RemoveInfo; 054import org.apache.activemq.command.TransactionId; 055import org.apache.activemq.management.JMSConsumerStatsImpl; 056import org.apache.activemq.management.StatsCapable; 057import org.apache.activemq.management.StatsImpl; 058import org.apache.activemq.selector.SelectorParser; 059import org.apache.activemq.transaction.Synchronization; 060import org.apache.activemq.util.Callback; 061import org.apache.activemq.util.IntrospectionSupport; 062import org.apache.activemq.util.JMSExceptionSupport; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 068 * from a destination. A <CODE> MessageConsumer</CODE> object is created by 069 * passing a <CODE>Destination</CODE> object to a message-consumer creation 070 * method supplied by a session. 071 * <P> 072 * <CODE>MessageConsumer</CODE> is the parent interface for all message 073 * consumers. 074 * <P> 075 * A message consumer can be created with a message selector. A message selector 076 * allows the client to restrict the messages delivered to the message consumer 077 * to those that match the selector. 078 * <P> 079 * A client may either synchronously receive a message consumer's messages or 080 * have the consumer asynchronously deliver them as they arrive. 081 * <P> 082 * For synchronous receipt, a client can request the next message from a message 083 * consumer using one of its <CODE> receive</CODE> methods. There are several 084 * variations of <CODE>receive</CODE> that allow a client to poll or wait for 085 * the next message. 086 * <P> 087 * For asynchronous delivery, a client can register a 088 * <CODE>MessageListener</CODE> object with a message consumer. As messages 089 * arrive at the message consumer, it delivers them by calling the 090 * <CODE>MessageListener</CODE>'s<CODE> 091 * onMessage</CODE> method. 092 * <P> 093 * It is a client programming error for a <CODE>MessageListener</CODE> to 094 * throw an exception. 095 * 096 * 097 * @see javax.jms.MessageConsumer 098 * @see javax.jms.QueueReceiver 099 * @see javax.jms.TopicSubscriber 100 * @see javax.jms.Session 101 */ 102public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { 103 104 @SuppressWarnings("serial") 105 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> { 106 final TransactionId transactionId; 107 public PreviouslyDeliveredMap(TransactionId transactionId) { 108 this.transactionId = transactionId; 109 } 110 } 111 112 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class); 113 protected final ActiveMQSession session; 114 protected final ConsumerInfo info; 115 116 // These are the messages waiting to be delivered to the client 117 protected final MessageDispatchChannel unconsumedMessages; 118 119 // The are the messages that were delivered to the consumer but that have 120 // not been acknowledged. It's kept in reverse order since we 121 // Always walk list in reverse order. 122 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); 123 // track duplicate deliveries in a transaction such that the tx integrity can be validated 124 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; 125 private int deliveredCounter; 126 private int additionalWindowSize; 127 private long redeliveryDelay; 128 private int ackCounter; 129 private int dispatchedCount; 130 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>(); 131 private final JMSConsumerStatsImpl stats; 132 133 private final String selector; 134 private boolean synchronizationRegistered; 135 private final AtomicBoolean started = new AtomicBoolean(false); 136 137 private MessageAvailableListener availableListener; 138 139 private RedeliveryPolicy redeliveryPolicy; 140 private boolean optimizeAcknowledge; 141 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); 142 private ExecutorService executorService; 143 private MessageTransformer transformer; 144 private boolean clearDispatchList; 145 boolean inProgressClearRequiredFlag; 146 147 private MessageAck pendingAck; 148 private long lastDeliveredSequenceId; 149 150 private IOException failureError; 151 152 private long optimizeAckTimestamp = System.currentTimeMillis(); 153 private long optimizeAcknowledgeTimeOut = 0; 154 private long failoverRedeliveryWaitPeriod = 0; 155 private boolean transactedIndividualAck = false; 156 private boolean nonBlockingRedelivery = false; 157 158 /** 159 * Create a MessageConsumer 160 * 161 * @param session 162 * @param dest 163 * @param name 164 * @param selector 165 * @param prefetch 166 * @param maximumPendingMessageCount 167 * @param noLocal 168 * @param browser 169 * @param dispatchAsync 170 * @param messageListener 171 * @throws JMSException 172 */ 173 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, 174 String name, String selector, int prefetch, 175 int maximumPendingMessageCount, boolean noLocal, boolean browser, 176 boolean dispatchAsync, MessageListener messageListener) throws JMSException { 177 if (dest == null) { 178 throw new InvalidDestinationException("Don't understand null destinations"); 179 } else if (dest.getPhysicalName() == null) { 180 throw new InvalidDestinationException("The destination object was not given a physical name."); 181 } else if (dest.isTemporary()) { 182 String physicalName = dest.getPhysicalName(); 183 184 if (physicalName == null) { 185 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 186 } 187 188 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); 189 190 if (physicalName.indexOf(connectionID) < 0) { 191 throw new InvalidDestinationException( 192 "Cannot use a Temporary destination from another Connection"); 193 } 194 195 if (session.connection.isDeleted(dest)) { 196 throw new InvalidDestinationException( 197 "Cannot use a Temporary destination that has been deleted"); 198 } 199 if (prefetch < 0) { 200 throw new JMSException("Cannot have a prefetch size less than zero"); 201 } 202 } 203 if (session.connection.isMessagePrioritySupported()) { 204 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); 205 }else { 206 this.unconsumedMessages = new FifoMessageDispatchChannel(); 207 } 208 209 this.session = session; 210 this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); 211 setTransformer(session.getTransformer()); 212 213 this.info = new ConsumerInfo(consumerId); 214 this.info.setExclusive(this.session.connection.isExclusiveConsumer()); 215 this.info.setSubscriptionName(name); 216 this.info.setPrefetchSize(prefetch); 217 this.info.setCurrentPrefetchSize(prefetch); 218 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); 219 this.info.setNoLocal(noLocal); 220 this.info.setDispatchAsync(dispatchAsync); 221 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); 222 this.info.setSelector(null); 223 224 // Allows the options on the destination to configure the consumerInfo 225 if (dest.getOptions() != null) { 226 Map<String, Object> options = IntrospectionSupport.extractProperties( 227 new HashMap<String, Object>(dest.getOptions()), "consumer."); 228 IntrospectionSupport.setProperties(this.info, options); 229 if (options.size() > 0) { 230 String msg = "There are " + options.size() 231 + " consumer options that couldn't be set on the consumer." 232 + " Check the options are spelled correctly." 233 + " Unknown parameters=[" + options + "]." 234 + " This consumer cannot be started."; 235 LOG.warn(msg); 236 throw new ConfigurationException(msg); 237 } 238 } 239 240 this.info.setDestination(dest); 241 this.info.setBrowser(browser); 242 if (selector != null && selector.trim().length() != 0) { 243 // Validate the selector 244 SelectorParser.parse(selector); 245 this.info.setSelector(selector); 246 this.selector = selector; 247 } else if (info.getSelector() != null) { 248 // Validate the selector 249 SelectorParser.parse(this.info.getSelector()); 250 this.selector = this.info.getSelector(); 251 } else { 252 this.selector = null; 253 } 254 255 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); 256 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() 257 && !info.isBrowser(); 258 if (this.optimizeAcknowledge) { 259 this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); 260 } 261 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); 262 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); 263 this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); 264 this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery; 265 if (messageListener != null) { 266 setMessageListener(messageListener); 267 } 268 try { 269 this.session.addConsumer(this); 270 this.session.syncSendPacket(info); 271 } catch (JMSException e) { 272 this.session.removeConsumer(this); 273 throw e; 274 } 275 276 if (session.connection.isStarted()) { 277 start(); 278 } 279 } 280 281 private boolean isAutoAcknowledgeEach() { 282 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() ); 283 } 284 285 private boolean isAutoAcknowledgeBatch() { 286 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; 287 } 288 289 public StatsImpl getStats() { 290 return stats; 291 } 292 293 public JMSConsumerStatsImpl getConsumerStats() { 294 return stats; 295 } 296 297 public RedeliveryPolicy getRedeliveryPolicy() { 298 return redeliveryPolicy; 299 } 300 301 /** 302 * Sets the redelivery policy used when messages are redelivered 303 */ 304 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 305 this.redeliveryPolicy = redeliveryPolicy; 306 } 307 308 public MessageTransformer getTransformer() { 309 return transformer; 310 } 311 312 /** 313 * Sets the transformer used to transform messages before they are sent on 314 * to the JMS bus 315 */ 316 public void setTransformer(MessageTransformer transformer) { 317 this.transformer = transformer; 318 } 319 320 /** 321 * @return Returns the value. 322 */ 323 public ConsumerId getConsumerId() { 324 return info.getConsumerId(); 325 } 326 327 /** 328 * @return the consumer name - used for durable consumers 329 */ 330 public String getConsumerName() { 331 return this.info.getSubscriptionName(); 332 } 333 334 /** 335 * @return true if this consumer does not accept locally produced messages 336 */ 337 protected boolean isNoLocal() { 338 return info.isNoLocal(); 339 } 340 341 /** 342 * Retrieve is a browser 343 * 344 * @return true if a browser 345 */ 346 protected boolean isBrowser() { 347 return info.isBrowser(); 348 } 349 350 /** 351 * @return ActiveMQDestination 352 */ 353 protected ActiveMQDestination getDestination() { 354 return info.getDestination(); 355 } 356 357 /** 358 * @return Returns the prefetchNumber. 359 */ 360 public int getPrefetchNumber() { 361 return info.getPrefetchSize(); 362 } 363 364 /** 365 * @return true if this is a durable topic subscriber 366 */ 367 public boolean isDurableSubscriber() { 368 return info.getSubscriptionName() != null && info.getDestination().isTopic(); 369 } 370 371 /** 372 * Gets this message consumer's message selector expression. 373 * 374 * @return this message consumer's message selector, or null if no message 375 * selector exists for the message consumer (that is, if the message 376 * selector was not set or was set to null or the empty string) 377 * @throws JMSException if the JMS provider fails to receive the next 378 * message due to some internal error. 379 */ 380 public String getMessageSelector() throws JMSException { 381 checkClosed(); 382 return selector; 383 } 384 385 /** 386 * Gets the message consumer's <CODE>MessageListener</CODE>. 387 * 388 * @return the listener for the message consumer, or null if no listener is 389 * set 390 * @throws JMSException if the JMS provider fails to get the message 391 * listener due to some internal error. 392 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 393 */ 394 public MessageListener getMessageListener() throws JMSException { 395 checkClosed(); 396 return this.messageListener.get(); 397 } 398 399 /** 400 * Sets the message consumer's <CODE>MessageListener</CODE>. 401 * <P> 402 * Setting the message listener to null is the equivalent of unsetting the 403 * message listener for the message consumer. 404 * <P> 405 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> 406 * while messages are being consumed by an existing listener or the consumer 407 * is being used to consume messages synchronously is undefined. 408 * 409 * @param listener the listener to which the messages are to be delivered 410 * @throws JMSException if the JMS provider fails to receive the next 411 * message due to some internal error. 412 * @see javax.jms.MessageConsumer#getMessageListener 413 */ 414 public void setMessageListener(MessageListener listener) throws JMSException { 415 checkClosed(); 416 if (info.getPrefetchSize() == 0) { 417 throw new JMSException( 418 "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); 419 } 420 if (listener != null) { 421 boolean wasRunning = session.isRunning(); 422 if (wasRunning) { 423 session.stop(); 424 } 425 426 this.messageListener.set(listener); 427 session.redispatch(this, unconsumedMessages); 428 429 if (wasRunning) { 430 session.start(); 431 } 432 } else { 433 this.messageListener.set(null); 434 } 435 } 436 437 public MessageAvailableListener getAvailableListener() { 438 return availableListener; 439 } 440 441 /** 442 * Sets the listener used to notify synchronous consumers that there is a 443 * message available so that the {@link MessageConsumer#receiveNoWait()} can 444 * be called. 445 */ 446 public void setAvailableListener(MessageAvailableListener availableListener) { 447 this.availableListener = availableListener; 448 } 449 450 /** 451 * Used to get an enqueued message from the unconsumedMessages list. The 452 * amount of time this method blocks is based on the timeout value. - if 453 * timeout==-1 then it blocks until a message is received. - if timeout==0 454 * then it it tries to not block at all, it returns a message if it is 455 * available - if timeout>0 then it blocks up to timeout amount of time. 456 * Expired messages will consumed by this method. 457 * 458 * @throws JMSException 459 * @return null if we timeout or if the consumer is closed. 460 */ 461 private MessageDispatch dequeue(long timeout) throws JMSException { 462 try { 463 long deadline = 0; 464 if (timeout > 0) { 465 deadline = System.currentTimeMillis() + timeout; 466 } 467 while (true) { 468 MessageDispatch md = unconsumedMessages.dequeue(timeout); 469 if (md == null) { 470 if (timeout > 0 && !unconsumedMessages.isClosed()) { 471 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 472 } else { 473 if (failureError != null) { 474 throw JMSExceptionSupport.create(failureError); 475 } else { 476 return null; 477 } 478 } 479 } else if (md.getMessage() == null) { 480 return null; 481 } else if (md.getMessage().isExpired()) { 482 if (LOG.isDebugEnabled()) { 483 LOG.debug(getConsumerId() + " received expired message: " + md); 484 } 485 beforeMessageIsConsumed(md); 486 afterMessageIsConsumed(md, true); 487 if (timeout > 0) { 488 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 489 } 490 } else { 491 if (LOG.isTraceEnabled()) { 492 LOG.trace(getConsumerId() + " received message: " + md); 493 } 494 return md; 495 } 496 } 497 } catch (InterruptedException e) { 498 Thread.currentThread().interrupt(); 499 throw JMSExceptionSupport.create(e); 500 } 501 } 502 503 /** 504 * Receives the next message produced for this message consumer. 505 * <P> 506 * This call blocks indefinitely until a message is produced or until this 507 * message consumer is closed. 508 * <P> 509 * If this <CODE>receive</CODE> is done within a transaction, the consumer 510 * retains the message until the transaction commits. 511 * 512 * @return the next message produced for this message consumer, or null if 513 * this message consumer is concurrently closed 514 */ 515 public Message receive() throws JMSException { 516 checkClosed(); 517 checkMessageListener(); 518 519 sendPullCommand(0); 520 MessageDispatch md = dequeue(-1); 521 if (md == null) { 522 return null; 523 } 524 525 beforeMessageIsConsumed(md); 526 afterMessageIsConsumed(md, false); 527 528 return createActiveMQMessage(md); 529 } 530 531 /** 532 * @param md 533 * @return 534 */ 535 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 536 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); 537 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { 538 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy())); 539 } 540 if (transformer != null) { 541 Message transformedMessage = transformer.consumerTransform(session, this, m); 542 if (transformedMessage != null) { 543 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); 544 } 545 } 546 if (session.isClientAcknowledge()) { 547 m.setAcknowledgeCallback(new Callback() { 548 public void execute() throws Exception { 549 session.checkClosed(); 550 session.acknowledge(); 551 } 552 }); 553 }else if (session.isIndividualAcknowledge()) { 554 m.setAcknowledgeCallback(new Callback() { 555 public void execute() throws Exception { 556 session.checkClosed(); 557 acknowledge(md); 558 } 559 }); 560 } 561 return m; 562 } 563 564 /** 565 * Receives the next message that arrives within the specified timeout 566 * interval. 567 * <P> 568 * This call blocks until a message arrives, the timeout expires, or this 569 * message consumer is closed. A <CODE>timeout</CODE> of zero never 570 * expires, and the call blocks indefinitely. 571 * 572 * @param timeout the timeout value (in milliseconds), a time out of zero 573 * never expires. 574 * @return the next message produced for this message consumer, or null if 575 * the timeout expires or this message consumer is concurrently 576 * closed 577 */ 578 public Message receive(long timeout) throws JMSException { 579 checkClosed(); 580 checkMessageListener(); 581 if (timeout == 0) { 582 return this.receive(); 583 } 584 585 sendPullCommand(timeout); 586 while (timeout > 0) { 587 588 MessageDispatch md; 589 if (info.getPrefetchSize() == 0) { 590 md = dequeue(-1); // We let the broker let us know when we timeout. 591 } else { 592 md = dequeue(timeout); 593 } 594 595 if (md == null) { 596 return null; 597 } 598 599 beforeMessageIsConsumed(md); 600 afterMessageIsConsumed(md, false); 601 return createActiveMQMessage(md); 602 } 603 return null; 604 } 605 606 /** 607 * Receives the next message if one is immediately available. 608 * 609 * @return the next message produced for this message consumer, or null if 610 * one is not available 611 * @throws JMSException if the JMS provider fails to receive the next 612 * message due to some internal error. 613 */ 614 public Message receiveNoWait() throws JMSException { 615 checkClosed(); 616 checkMessageListener(); 617 sendPullCommand(-1); 618 619 MessageDispatch md; 620 if (info.getPrefetchSize() == 0) { 621 md = dequeue(-1); // We let the broker let us know when we 622 // timeout. 623 } else { 624 md = dequeue(0); 625 } 626 627 if (md == null) { 628 return null; 629 } 630 631 beforeMessageIsConsumed(md); 632 afterMessageIsConsumed(md, false); 633 return createActiveMQMessage(md); 634 } 635 636 /** 637 * Closes the message consumer. 638 * <P> 639 * Since a provider may allocate some resources on behalf of a <CODE> 640 * MessageConsumer</CODE> 641 * outside the Java virtual machine, clients should close them when they are 642 * not needed. Relying on garbage collection to eventually reclaim these 643 * resources may not be timely enough. 644 * <P> 645 * This call blocks until a <CODE>receive</CODE> or message listener in 646 * progress has completed. A blocked message consumer <CODE>receive </CODE> 647 * call returns null when this message consumer is closed. 648 * 649 * @throws JMSException if the JMS provider fails to close the consumer due 650 * to some internal error. 651 */ 652 public void close() throws JMSException { 653 if (!unconsumedMessages.isClosed()) { 654 if (session.getTransactionContext().isInTransaction()) { 655 session.getTransactionContext().addSynchronization(new Synchronization() { 656 @Override 657 public void afterCommit() throws Exception { 658 doClose(); 659 } 660 661 @Override 662 public void afterRollback() throws Exception { 663 doClose(); 664 } 665 }); 666 } else { 667 doClose(); 668 } 669 } 670 } 671 672 void doClose() throws JMSException { 673 // Store interrupted state and clear so that Transport operations don't 674 // throw InterruptedException and we ensure that resources are clened up. 675 boolean interrupted = Thread.interrupted(); 676 dispose(); 677 RemoveInfo removeCommand = info.createRemoveCommand(); 678 if (LOG.isDebugEnabled()) { 679 LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId); 680 } 681 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 682 this.session.asyncSendPacket(removeCommand); 683 if (interrupted) { 684 Thread.currentThread().interrupt(); 685 } } 686 687 void inProgressClearRequired() { 688 inProgressClearRequiredFlag = true; 689 // deal with delivered messages async to avoid lock contention with in progress acks 690 clearDispatchList = true; 691 } 692 693 void clearMessagesInProgress() { 694 if (inProgressClearRequiredFlag) { 695 synchronized (unconsumedMessages.getMutex()) { 696 if (inProgressClearRequiredFlag) { 697 if (LOG.isDebugEnabled()) { 698 LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt"); 699 } 700 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer 701 List<MessageDispatch> list = unconsumedMessages.removeAll(); 702 if (!this.info.isBrowser()) { 703 for (MessageDispatch old : list) { 704 session.connection.rollbackDuplicate(this, old.getMessage()); 705 } 706 } 707 // allow dispatch on this connection to resume 708 session.connection.transportInterruptionProcessingComplete(); 709 inProgressClearRequiredFlag = false; 710 } 711 } 712 } 713 } 714 715 void deliverAcks() { 716 MessageAck ack = null; 717 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 718 if (isAutoAcknowledgeEach()) { 719 synchronized(deliveredMessages) { 720 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 721 if (ack != null) { 722 deliveredMessages.clear(); 723 ackCounter = 0; 724 } else { 725 ack = pendingAck; 726 pendingAck = null; 727 } 728 } 729 } else if (pendingAck != null && pendingAck.isStandardAck()) { 730 ack = pendingAck; 731 pendingAck = null; 732 } 733 if (ack != null) { 734 final MessageAck ackToSend = ack; 735 736 if (executorService == null) { 737 executorService = Executors.newSingleThreadExecutor(); 738 } 739 executorService.submit(new Runnable() { 740 public void run() { 741 try { 742 session.sendAck(ackToSend,true); 743 } catch (JMSException e) { 744 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); 745 } finally { 746 deliveryingAcknowledgements.set(false); 747 } 748 } 749 }); 750 } else { 751 deliveryingAcknowledgements.set(false); 752 } 753 } 754 } 755 756 public void dispose() throws JMSException { 757 if (!unconsumedMessages.isClosed()) { 758 759 // Do we have any acks we need to send out before closing? 760 // Ack any delivered messages now. 761 if (!session.getTransacted()) { 762 deliverAcks(); 763 if (isAutoAcknowledgeBatch()) { 764 acknowledge(); 765 } 766 } 767 if (executorService != null) { 768 executorService.shutdown(); 769 try { 770 executorService.awaitTermination(60, TimeUnit.SECONDS); 771 } catch (InterruptedException e) { 772 Thread.currentThread().interrupt(); 773 } 774 } 775 776 if (session.isClientAcknowledge()) { 777 if (!this.info.isBrowser()) { 778 // rollback duplicates that aren't acknowledged 779 List<MessageDispatch> tmp = null; 780 synchronized (this.deliveredMessages) { 781 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages); 782 } 783 for (MessageDispatch old : tmp) { 784 this.session.connection.rollbackDuplicate(this, old.getMessage()); 785 } 786 tmp.clear(); 787 } 788 } 789 if (!session.isTransacted()) { 790 synchronized(deliveredMessages) { 791 deliveredMessages.clear(); 792 } 793 } 794 unconsumedMessages.close(); 795 this.session.removeConsumer(this); 796 List<MessageDispatch> list = unconsumedMessages.removeAll(); 797 if (!this.info.isBrowser()) { 798 for (MessageDispatch old : list) { 799 // ensure we don't filter this as a duplicate 800 session.connection.rollbackDuplicate(this, old.getMessage()); 801 } 802 } 803 } 804 } 805 806 /** 807 * @throws IllegalStateException 808 */ 809 protected void checkClosed() throws IllegalStateException { 810 if (unconsumedMessages.isClosed()) { 811 throw new IllegalStateException("The Consumer is closed"); 812 } 813 } 814 815 /** 816 * If we have a zero prefetch specified then send a pull command to the 817 * broker to pull a message we are about to receive 818 */ 819 protected void sendPullCommand(long timeout) throws JMSException { 820 clearDispatchList(); 821 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 822 MessagePull messagePull = new MessagePull(); 823 messagePull.configure(info); 824 messagePull.setTimeout(timeout); 825 session.asyncSendPacket(messagePull); 826 } 827 } 828 829 protected void checkMessageListener() throws JMSException { 830 session.checkMessageListener(); 831 } 832 833 protected void setOptimizeAcknowledge(boolean value) { 834 if (optimizeAcknowledge && !value) { 835 deliverAcks(); 836 } 837 optimizeAcknowledge = value; 838 } 839 840 protected void setPrefetchSize(int prefetch) { 841 deliverAcks(); 842 this.info.setCurrentPrefetchSize(prefetch); 843 } 844 845 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 846 md.setDeliverySequenceId(session.getNextDeliveryId()); 847 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 848 if (!isAutoAcknowledgeBatch()) { 849 synchronized(deliveredMessages) { 850 deliveredMessages.addFirst(md); 851 } 852 if (session.getTransacted()) { 853 if (transactedIndividualAck) { 854 immediateIndividualTransactedAck(md); 855 } else { 856 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 857 } 858 } 859 } 860 } 861 862 private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException { 863 // acks accumulate on the broker pending transaction completion to indicate 864 // delivery status 865 registerSync(); 866 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 867 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 868 session.syncSendPacket(ack); 869 } 870 871 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { 872 if (unconsumedMessages.isClosed()) { 873 return; 874 } 875 if (messageExpired) { 876 synchronized (deliveredMessages) { 877 deliveredMessages.remove(md); 878 } 879 stats.getExpiredMessageCount().increment(); 880 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 881 } else { 882 stats.onMessage(); 883 if (session.getTransacted()) { 884 // Do nothing. 885 } else if (isAutoAcknowledgeEach()) { 886 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 887 synchronized (deliveredMessages) { 888 if (!deliveredMessages.isEmpty()) { 889 if (optimizeAcknowledge) { 890 ackCounter++; 891 if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { 892 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 893 if (ack != null) { 894 deliveredMessages.clear(); 895 ackCounter = 0; 896 session.sendAck(ack); 897 optimizeAckTimestamp = System.currentTimeMillis(); 898 } 899 } 900 } else { 901 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 902 if (ack!=null) { 903 deliveredMessages.clear(); 904 session.sendAck(ack); 905 } 906 } 907 } 908 } 909 deliveryingAcknowledgements.set(false); 910 } 911 } else if (isAutoAcknowledgeBatch()) { 912 ackLater(md, MessageAck.STANDARD_ACK_TYPE); 913 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { 914 boolean messageUnackedByConsumer = false; 915 synchronized (deliveredMessages) { 916 messageUnackedByConsumer = deliveredMessages.contains(md); 917 } 918 if (messageUnackedByConsumer) { 919 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 920 } 921 } 922 else { 923 throw new IllegalStateException("Invalid session state."); 924 } 925 } 926 } 927 928 /** 929 * Creates a MessageAck for all messages contained in deliveredMessages. 930 * Caller should hold the lock for deliveredMessages. 931 * 932 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 933 * @return <code>null</code> if nothing to ack. 934 */ 935 private MessageAck makeAckForAllDeliveredMessages(byte type) { 936 synchronized (deliveredMessages) { 937 if (deliveredMessages.isEmpty()) 938 return null; 939 940 MessageDispatch md = deliveredMessages.getFirst(); 941 MessageAck ack = new MessageAck(md, type, deliveredMessages.size()); 942 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId()); 943 return ack; 944 } 945 } 946 947 private void ackLater(MessageDispatch md, byte ackType) throws JMSException { 948 949 // Don't acknowledge now, but we may need to let the broker know the 950 // consumer got the message to expand the pre-fetch window 951 if (session.getTransacted()) { 952 registerSync(); 953 } 954 955 deliveredCounter++; 956 957 MessageAck oldPendingAck = pendingAck; 958 pendingAck = new MessageAck(md, ackType, deliveredCounter); 959 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); 960 if( oldPendingAck==null ) { 961 pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); 962 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { 963 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); 964 } else { 965 // old pending ack being superseded by ack of another type, if is is not a delivered 966 // ack and hence important, send it now so it is not lost. 967 if ( !oldPendingAck.isDeliveredAck()) { 968 if (LOG.isDebugEnabled()) { 969 LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck); 970 } 971 session.sendAck(oldPendingAck); 972 } else { 973 if (LOG.isDebugEnabled()) { 974 LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck); 975 } 976 } 977 } 978 979 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { 980 session.sendAck(pendingAck); 981 pendingAck=null; 982 deliveredCounter = 0; 983 additionalWindowSize = 0; 984 } 985 } 986 987 private void registerSync() throws JMSException { 988 session.doStartTransaction(); 989 if (!synchronizationRegistered) { 990 synchronizationRegistered = true; 991 session.getTransactionContext().addSynchronization(new Synchronization() { 992 @Override 993 public void beforeEnd() throws Exception { 994 if (transactedIndividualAck) { 995 clearDispatchList(); 996 waitForRedeliveries(); 997 synchronized(deliveredMessages) { 998 rollbackOnFailedRecoveryRedelivery(); 999 } 1000 } else { 1001 acknowledge(); 1002 } 1003 synchronizationRegistered = false; 1004 } 1005 1006 @Override 1007 public void afterCommit() throws Exception { 1008 commit(); 1009 synchronizationRegistered = false; 1010 } 1011 1012 @Override 1013 public void afterRollback() throws Exception { 1014 rollback(); 1015 synchronizationRegistered = false; 1016 } 1017 }); 1018 } 1019 } 1020 1021 /** 1022 * Acknowledge all the messages that have been delivered to the client up to 1023 * this point. 1024 * 1025 * @throws JMSException 1026 */ 1027 public void acknowledge() throws JMSException { 1028 clearDispatchList(); 1029 waitForRedeliveries(); 1030 synchronized(deliveredMessages) { 1031 // Acknowledge all messages so far. 1032 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 1033 if (ack == null) 1034 return; // no msgs 1035 1036 if (session.getTransacted()) { 1037 rollbackOnFailedRecoveryRedelivery(); 1038 session.doStartTransaction(); 1039 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 1040 } 1041 session.sendAck(ack); 1042 pendingAck = null; 1043 1044 // Adjust the counters 1045 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); 1046 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1047 1048 if (!session.getTransacted()) { 1049 deliveredMessages.clear(); 1050 } 1051 } 1052 } 1053 1054 private void waitForRedeliveries() { 1055 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) { 1056 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod; 1057 int numberNotReplayed; 1058 do { 1059 numberNotReplayed = 0; 1060 synchronized(deliveredMessages) { 1061 if (previouslyDeliveredMessages != null) { 1062 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1063 if (!entry.getValue()) { 1064 numberNotReplayed++; 1065 } 1066 } 1067 } 1068 } 1069 if (numberNotReplayed > 0) { 1070 LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: " 1071 + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId()); 1072 try { 1073 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4)); 1074 } catch (InterruptedException outOfhere) { 1075 break; 1076 } 1077 } 1078 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis()); 1079 } 1080 } 1081 1082 /* 1083 * called with deliveredMessages locked 1084 */ 1085 private void rollbackOnFailedRecoveryRedelivery() throws JMSException { 1086 if (previouslyDeliveredMessages != null) { 1087 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback 1088 // as messages have been dispatched else where. 1089 int numberNotReplayed = 0; 1090 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1091 if (!entry.getValue()) { 1092 numberNotReplayed++; 1093 if (LOG.isDebugEnabled()) { 1094 LOG.debug("previously delivered message has not been replayed in transaction: " 1095 + previouslyDeliveredMessages.transactionId 1096 + " , messageId: " + entry.getKey()); 1097 } 1098 } 1099 } 1100 if (numberNotReplayed > 0) { 1101 String message = "rolling back transaction (" 1102 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed 1103 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); 1104 LOG.warn(message); 1105 throw new TransactionRolledBackException(message); 1106 } 1107 } 1108 } 1109 1110 void acknowledge(MessageDispatch md) throws JMSException { 1111 MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); 1112 session.sendAck(ack); 1113 synchronized(deliveredMessages){ 1114 deliveredMessages.remove(md); 1115 } 1116 } 1117 1118 public void commit() throws JMSException { 1119 synchronized (deliveredMessages) { 1120 deliveredMessages.clear(); 1121 clearPreviouslyDelivered(); 1122 } 1123 redeliveryDelay = 0; 1124 } 1125 1126 public void rollback() throws JMSException { 1127 synchronized (unconsumedMessages.getMutex()) { 1128 if (optimizeAcknowledge) { 1129 // remove messages read but not acked at the broker yet through 1130 // optimizeAcknowledge 1131 if (!this.info.isBrowser()) { 1132 synchronized(deliveredMessages) { 1133 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { 1134 // ensure we don't filter this as a duplicate 1135 MessageDispatch md = deliveredMessages.removeLast(); 1136 session.connection.rollbackDuplicate(this, md.getMessage()); 1137 } 1138 } 1139 } 1140 } 1141 synchronized(deliveredMessages) { 1142 rollbackPreviouslyDeliveredAndNotRedelivered(); 1143 if (deliveredMessages.isEmpty()) { 1144 return; 1145 } 1146 1147 // use initial delay for first redelivery 1148 MessageDispatch lastMd = deliveredMessages.getFirst(); 1149 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); 1150 if (currentRedeliveryCount > 0) { 1151 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 1152 } else { 1153 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 1154 } 1155 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); 1156 1157 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1158 MessageDispatch md = iter.next(); 1159 md.getMessage().onMessageRolledBack(); 1160 // ensure we don't filter this as a duplicate 1161 session.connection.rollbackDuplicate(this, md.getMessage()); 1162 } 1163 1164 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 1165 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { 1166 // We need to NACK the messages so that they get sent to the 1167 // DLQ. 1168 // Acknowledge the last message. 1169 1170 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); 1171 ack.setPoisonCause(lastMd.getRollbackCause()); 1172 ack.setFirstMessageId(firstMsgId); 1173 session.sendAck(ack,true); 1174 // Adjust the window size. 1175 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1176 redeliveryDelay = 0; 1177 } else { 1178 1179 // only redelivery_ack after first delivery 1180 if (currentRedeliveryCount > 0) { 1181 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); 1182 ack.setFirstMessageId(firstMsgId); 1183 session.sendAck(ack,true); 1184 } 1185 1186 // stop the delivery of messages. 1187 if (nonBlockingRedelivery) { 1188 if (!unconsumedMessages.isClosed()) { 1189 1190 final LinkedList<MessageDispatch> pendingRedeliveries = 1191 new LinkedList<MessageDispatch>(deliveredMessages); 1192 1193 // Start up the delivery again a little later. 1194 session.getScheduler().executeAfterDelay(new Runnable() { 1195 public void run() { 1196 try { 1197 if (!unconsumedMessages.isClosed()) { 1198 for(MessageDispatch dispatch : pendingRedeliveries) { 1199 session.dispatch(dispatch); 1200 } 1201 } 1202 } catch (Exception e) { 1203 session.connection.onAsyncException(e); 1204 } 1205 } 1206 }, redeliveryDelay); 1207 } 1208 1209 } else { 1210 unconsumedMessages.stop(); 1211 1212 for (MessageDispatch md : deliveredMessages) { 1213 unconsumedMessages.enqueueFirst(md); 1214 } 1215 1216 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { 1217 // Start up the delivery again a little later. 1218 session.getScheduler().executeAfterDelay(new Runnable() { 1219 public void run() { 1220 try { 1221 if (started.get()) { 1222 start(); 1223 } 1224 } catch (JMSException e) { 1225 session.connection.onAsyncException(e); 1226 } 1227 } 1228 }, redeliveryDelay); 1229 } else { 1230 start(); 1231 } 1232 } 1233 } 1234 deliveredCounter -= deliveredMessages.size(); 1235 deliveredMessages.clear(); 1236 } 1237 } 1238 if (messageListener.get() != null) { 1239 session.redispatch(this, unconsumedMessages); 1240 } 1241 } 1242 1243 /* 1244 * called with unconsumedMessages && deliveredMessages locked 1245 * remove any message not re-delivered as they can't be replayed to this 1246 * consumer on rollback 1247 */ 1248 private void rollbackPreviouslyDeliveredAndNotRedelivered() { 1249 if (previouslyDeliveredMessages != null) { 1250 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1251 if (!entry.getValue()) { 1252 removeFromDeliveredMessages(entry.getKey()); 1253 } 1254 } 1255 clearPreviouslyDelivered(); 1256 } 1257 } 1258 1259 /* 1260 * called with deliveredMessages locked 1261 */ 1262 private void removeFromDeliveredMessages(MessageId key) { 1263 Iterator<MessageDispatch> iterator = deliveredMessages.iterator(); 1264 while (iterator.hasNext()) { 1265 MessageDispatch candidate = iterator.next(); 1266 if (key.equals(candidate.getMessage().getMessageId())) { 1267 session.connection.rollbackDuplicate(this, candidate.getMessage()); 1268 iterator.remove(); 1269 break; 1270 } 1271 } 1272 } 1273 1274 /* 1275 * called with deliveredMessages locked 1276 */ 1277 private void clearPreviouslyDelivered() { 1278 if (previouslyDeliveredMessages != null) { 1279 previouslyDeliveredMessages.clear(); 1280 previouslyDeliveredMessages = null; 1281 } 1282 } 1283 1284 public void dispatch(MessageDispatch md) { 1285 MessageListener listener = this.messageListener.get(); 1286 try { 1287 clearMessagesInProgress(); 1288 clearDispatchList(); 1289 synchronized (unconsumedMessages.getMutex()) { 1290 if (!unconsumedMessages.isClosed()) { 1291 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 1292 if (listener != null && unconsumedMessages.isRunning()) { 1293 ActiveMQMessage message = createActiveMQMessage(md); 1294 beforeMessageIsConsumed(md); 1295 try { 1296 boolean expired = message.isExpired(); 1297 if (!expired) { 1298 listener.onMessage(message); 1299 } 1300 afterMessageIsConsumed(md, expired); 1301 } catch (RuntimeException e) { 1302 LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e); 1303 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { 1304 // schedual redelivery and possible dlq processing 1305 md.setRollbackCause(e); 1306 rollback(); 1307 } else { 1308 // Transacted or Client ack: Deliver the 1309 // next message. 1310 afterMessageIsConsumed(md, false); 1311 } 1312 } 1313 } else { 1314 if (!unconsumedMessages.isRunning()) { 1315 // delayed redelivery, ensure it can be re delivered 1316 session.connection.rollbackDuplicate(this, md.getMessage()); 1317 } 1318 unconsumedMessages.enqueue(md); 1319 if (availableListener != null) { 1320 availableListener.onMessageAvailable(this); 1321 } 1322 } 1323 } else { 1324 if (!session.isTransacted()) { 1325 LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId() 1326 + " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md); 1327 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 1328 session.sendAck(ack); 1329 } else { 1330 if (LOG.isDebugEnabled()) { 1331 LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage()); 1332 } 1333 boolean needsPoisonAck = false; 1334 synchronized (deliveredMessages) { 1335 if (previouslyDeliveredMessages != null) { 1336 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); 1337 } else { 1338 // delivery while pending redelivery to another consumer on the same connection 1339 // not waiting for redelivery will help here 1340 needsPoisonAck = true; 1341 } 1342 } 1343 if (needsPoisonAck) { 1344 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 1345 poisonAck.setFirstMessageId(md.getMessage().getMessageId()); 1346 poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: " 1347 + session.getConnection().getConnectionInfo().getConnectionId())); 1348 LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" 1349 + " consumer on this connection, failoverRedeliveryWaitPeriod=" 1350 + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck); 1351 session.sendAck(poisonAck); 1352 } else { 1353 if (transactedIndividualAck) { 1354 immediateIndividualTransactedAck(md); 1355 } else { 1356 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 1357 } 1358 } 1359 } 1360 } 1361 } 1362 } 1363 if (++dispatchedCount % 1000 == 0) { 1364 dispatchedCount = 0; 1365 Thread.yield(); 1366 } 1367 } catch (Exception e) { 1368 session.connection.onClientInternalException(e); 1369 } 1370 } 1371 1372 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again 1373 private void clearDispatchList() { 1374 if (clearDispatchList) { 1375 synchronized (deliveredMessages) { 1376 if (clearDispatchList) { 1377 if (!deliveredMessages.isEmpty()) { 1378 if (session.isTransacted()) { 1379 if (LOG.isDebugEnabled()) { 1380 LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt"); 1381 } 1382 if (previouslyDeliveredMessages == null) { 1383 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); 1384 } 1385 for (MessageDispatch delivered : deliveredMessages) { 1386 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); 1387 } 1388 } else { 1389 if (LOG.isDebugEnabled()) { 1390 LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt"); 1391 } 1392 deliveredMessages.clear(); 1393 pendingAck = null; 1394 } 1395 } 1396 clearDispatchList = false; 1397 } 1398 } 1399 } 1400 } 1401 1402 public int getMessageSize() { 1403 return unconsumedMessages.size(); 1404 } 1405 1406 public void start() throws JMSException { 1407 if (unconsumedMessages.isClosed()) { 1408 return; 1409 } 1410 started.set(true); 1411 unconsumedMessages.start(); 1412 session.executor.wakeup(); 1413 } 1414 1415 public void stop() { 1416 started.set(false); 1417 unconsumedMessages.stop(); 1418 } 1419 1420 @Override 1421 public String toString() { 1422 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() 1423 + " }"; 1424 } 1425 1426 /** 1427 * Delivers a message to the message listener. 1428 * 1429 * @return 1430 * @throws JMSException 1431 */ 1432 public boolean iterate() { 1433 MessageListener listener = this.messageListener.get(); 1434 if (listener != null) { 1435 MessageDispatch md = unconsumedMessages.dequeueNoWait(); 1436 if (md != null) { 1437 dispatch(md); 1438 return true; 1439 } 1440 } 1441 return false; 1442 } 1443 1444 public boolean isInUse(ActiveMQTempDestination destination) { 1445 return info.getDestination().equals(destination); 1446 } 1447 1448 public long getLastDeliveredSequenceId() { 1449 return lastDeliveredSequenceId; 1450 } 1451 1452 public IOException getFailureError() { 1453 return failureError; 1454 } 1455 1456 public void setFailureError(IOException failureError) { 1457 this.failureError = failureError; 1458 } 1459}