001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import org.apache.activemq.state.CommandVisitor;
020
021/**
022 * @openwire:marshaller code="22"
023 * 
024 */
025public class MessageAck extends BaseCommand {
026
027    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
028
029    /**
030     * Used to let the broker know that the message has been delivered to the
031     * client. Message will still be retained until an standard ack is received.
032     * This is used get the broker to send more messages past prefetch limits
033     * when an standard ack has not been sent.
034     */
035    public static final byte DELIVERED_ACK_TYPE = 0;
036
037    /**
038     * The standard ack case where a client wants the message to be discarded.
039     */
040    public static final byte STANDARD_ACK_TYPE = 2;
041
042    /**
043     * In case the client want's to explicitly let the broker know that a
044     * message was not processed and the message was considered a poison
045     * message.
046     */
047    public static final byte POSION_ACK_TYPE = 1;
048
049    /**
050     * In case the client want's to explicitly let the broker know that a
051     * message was not processed and it was re-delivered to the consumer
052     * but it was not yet considered to be a poison message.  The messageCount 
053     * field will hold the number of times the message was re-delivered. 
054     */
055    public static final byte REDELIVERED_ACK_TYPE = 3;
056    
057    /**
058     * The  ack case where a client wants only an individual message to be discarded.
059     */
060    public static final byte INDIVIDUAL_ACK_TYPE = 4;
061
062/**
063     * The ack case where a durable topic subscription does not match a selector.
064     */
065    public static final byte UNMATCHED_ACK_TYPE = 5;
066
067    protected byte ackType;
068    protected ConsumerId consumerId;
069    protected MessageId firstMessageId;
070    protected MessageId lastMessageId;
071    protected ActiveMQDestination destination;
072    protected TransactionId transactionId;
073    protected int messageCount;
074    protected Throwable poisonCause;
075
076    protected transient String consumerKey;
077
078    public MessageAck() {
079    }
080
081    public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
082        this.ackType = ackType;
083        this.consumerId = md.getConsumerId();
084        this.destination = md.getDestination();
085        this.lastMessageId = md.getMessage().getMessageId();
086        this.messageCount = messageCount;
087    }
088
089    public MessageAck(Message message, byte ackType, int messageCount) {
090        this.ackType = ackType;
091        this.destination = message.getDestination();
092        this.lastMessageId = message.getMessageId();
093        this.messageCount = messageCount;
094    }
095
096    public void copy(MessageAck copy) {
097        super.copy(copy);
098        copy.firstMessageId = firstMessageId;
099        copy.lastMessageId = lastMessageId;
100        copy.destination = destination;
101        copy.transactionId = transactionId;
102        copy.ackType = ackType;
103        copy.consumerId = consumerId;
104    }
105
106    public byte getDataStructureType() {
107        return DATA_STRUCTURE_TYPE;
108    }
109
110    public boolean isMessageAck() {
111        return true;
112    }
113
114    public boolean isPoisonAck() {
115        return ackType == POSION_ACK_TYPE;
116    }
117
118    public boolean isStandardAck() {
119        return ackType == STANDARD_ACK_TYPE;
120    }
121
122    public boolean isDeliveredAck() {
123        return ackType == DELIVERED_ACK_TYPE;
124    }
125    
126    public boolean isRedeliveredAck() {
127        return ackType == REDELIVERED_ACK_TYPE;
128    }
129    
130    public boolean isIndividualAck() {
131        return ackType == INDIVIDUAL_ACK_TYPE;
132    }
133
134    public boolean isUnmatchedAck() {
135        return ackType == UNMATCHED_ACK_TYPE;
136    }
137
138    /**
139     * @openwire:property version=1 cache=true
140     */
141    public ActiveMQDestination getDestination() {
142        return destination;
143    }
144
145    public void setDestination(ActiveMQDestination destination) {
146        this.destination = destination;
147    }
148
149    /**
150     * @openwire:property version=1 cache=true
151     */
152    public TransactionId getTransactionId() {
153        return transactionId;
154    }
155
156    public void setTransactionId(TransactionId transactionId) {
157        this.transactionId = transactionId;
158    }
159
160    public boolean isInTransaction() {
161        return transactionId != null;
162    }
163
164    /**
165     * @openwire:property version=1 cache=true
166     */
167    public ConsumerId getConsumerId() {
168        return consumerId;
169    }
170
171    public void setConsumerId(ConsumerId consumerId) {
172        this.consumerId = consumerId;
173    }
174
175    /**
176     * @openwire:property version=1
177     */
178    public byte getAckType() {
179        return ackType;
180    }
181
182    public void setAckType(byte ackType) {
183        this.ackType = ackType;
184    }
185
186    /**
187     * @openwire:property version=1
188     */
189    public MessageId getFirstMessageId() {
190        return firstMessageId;
191    }
192
193    public void setFirstMessageId(MessageId firstMessageId) {
194        this.firstMessageId = firstMessageId;
195    }
196
197    /**
198     * @openwire:property version=1
199     */
200    public MessageId getLastMessageId() {
201        return lastMessageId;
202    }
203
204    public void setLastMessageId(MessageId lastMessageId) {
205        this.lastMessageId = lastMessageId;
206    }
207
208    /**
209     * The number of messages being acknowledged in the range.
210     * 
211     * @openwire:property version=1
212     */
213    public int getMessageCount() {
214        return messageCount;
215    }
216
217    public void setMessageCount(int messageCount) {
218        this.messageCount = messageCount;
219    }
220
221    /**
222     * The cause of a poison ack, if a message listener
223     * throws an exception it will be recorded here
224     *
225     * @openwire:property version=7
226     */
227    public Throwable getPoisonCause() {
228        return poisonCause;
229    }
230
231    public void setPoisonCause(Throwable poisonCause) {
232        this.poisonCause = poisonCause;
233    }
234
235    public Response visit(CommandVisitor visitor) throws Exception {
236        return visitor.processMessageAck(this);
237    }
238
239    /**
240     * A helper method to allow a single message ID to be acknowledged
241     */
242    public void setMessageID(MessageId messageID) {
243        setFirstMessageId(messageID);
244        setLastMessageId(messageID);
245        setMessageCount(1);
246    }
247
248}