001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.Enumeration; 020import java.util.concurrent.atomic.AtomicBoolean; 021 022import javax.jms.IllegalStateException; 023import javax.jms.JMSException; 024import javax.jms.Message; 025import javax.jms.Queue; 026import javax.jms.QueueBrowser; 027 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.ConsumerId; 030import org.apache.activemq.command.MessageDispatch; 031 032/** 033 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a 034 * queue without removing them. <p/> 035 * <P> 036 * The <CODE>getEnumeration</CODE> method returns a <CODE> 037 * java.util.Enumeration</CODE> 038 * that is used to scan the queue's messages. It may be an enumeration of the 039 * entire content of a queue, or it may contain only the messages matching a 040 * message selector. <p/> 041 * <P> 042 * Messages may be arriving and expiring while the scan is done. The JMS API 043 * does not require the content of an enumeration to be a static snapshot of 044 * queue content. Whether these changes are visible or not depends on the JMS 045 * provider. <p/> 046 * <P> 047 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session 048 * </CODE> 049 * or a <CODE>QueueSession</CODE>. 050 * 051 * @see javax.jms.Session#createBrowser 052 * @see javax.jms.QueueSession#createBrowser 053 * @see javax.jms.QueueBrowser 054 * @see javax.jms.QueueReceiver 055 */ 056 057public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration { 058 059 private final ActiveMQSession session; 060 private final ActiveMQDestination destination; 061 private final String selector; 062 063 private ActiveMQMessageConsumer consumer; 064 private boolean closed; 065 private final ConsumerId consumerId; 066 private final AtomicBoolean browseDone = new AtomicBoolean(true); 067 private final boolean dispatchAsync; 068 private Object semaphore = new Object(); 069 070 /** 071 * Constructor for an ActiveMQQueueBrowser - used internally 072 * 073 * @param theSession 074 * @param dest 075 * @param selector 076 * @throws JMSException 077 */ 078 protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException { 079 this.session = session; 080 this.consumerId = consumerId; 081 this.destination = destination; 082 this.selector = selector; 083 this.dispatchAsync = dispatchAsync; 084 this.consumer = createConsumer(); 085 } 086 087 /** 088 * @param session 089 * @param originalDestination 090 * @param selectorExpression 091 * @param cnum 092 * @return 093 * @throws JMSException 094 */ 095 private ActiveMQMessageConsumer createConsumer() throws JMSException { 096 browseDone.set(false); 097 ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy(); 098 099 return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy 100 .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) { 101 public void dispatch(MessageDispatch md) { 102 if (md.getMessage() == null) { 103 browseDone.set(true); 104 } else { 105 super.dispatch(md); 106 } 107 notifyMessageAvailable(); 108 } 109 }; 110 } 111 112 private void destroyConsumer() { 113 if (consumer == null) { 114 return; 115 } 116 try { 117 if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) { 118 session.commit(); 119 } 120 consumer.close(); 121 consumer = null; 122 } catch (JMSException e) { 123 e.printStackTrace(); 124 } 125 } 126 127 /** 128 * Gets an enumeration for browsing the current queue messages in the order 129 * they would be received. 130 * 131 * @return an enumeration for browsing the messages 132 * @throws JMSException if the JMS provider fails to get the enumeration for 133 * this browser due to some internal error. 134 */ 135 136 public Enumeration getEnumeration() throws JMSException { 137 checkClosed(); 138 if (consumer == null) { 139 consumer = createConsumer(); 140 } 141 return this; 142 } 143 144 private void checkClosed() throws IllegalStateException { 145 if (closed) { 146 throw new IllegalStateException("The Consumer is closed"); 147 } 148 } 149 150 /** 151 * @return true if more messages to process 152 */ 153 public boolean hasMoreElements() { 154 while (true) { 155 156 synchronized (this) { 157 if (consumer == null) { 158 return false; 159 } 160 } 161 162 if (consumer.getMessageSize() > 0) { 163 return true; 164 } 165 166 if (browseDone.get() || !session.isRunning()) { 167 destroyConsumer(); 168 return false; 169 } 170 171 waitForMessage(); 172 } 173 } 174 175 /** 176 * @return the next message 177 */ 178 public Object nextElement() { 179 while (true) { 180 181 synchronized (this) { 182 if (consumer == null) { 183 return null; 184 } 185 } 186 187 try { 188 Message answer = consumer.receiveNoWait(); 189 if (answer != null) { 190 return answer; 191 } 192 } catch (JMSException e) { 193 this.session.connection.onClientInternalException(e); 194 return null; 195 } 196 197 if (browseDone.get() || !session.isRunning()) { 198 destroyConsumer(); 199 return null; 200 } 201 202 waitForMessage(); 203 } 204 } 205 206 public synchronized void close() throws JMSException { 207 destroyConsumer(); 208 closed = true; 209 } 210 211 /** 212 * Gets the queue associated with this queue browser. 213 * 214 * @return the queue 215 * @throws JMSException if the JMS provider fails to get the queue 216 * associated with this browser due to some internal error. 217 */ 218 219 public Queue getQueue() throws JMSException { 220 return (Queue)destination; 221 } 222 223 public String getMessageSelector() throws JMSException { 224 return selector; 225 } 226 227 // Implementation methods 228 // ------------------------------------------------------------------------- 229 230 /** 231 * Wait on a semaphore for a fixed amount of time for a message to come in. 232 * @throws JMSException 233 */ 234 protected void waitForMessage() { 235 try { 236 consumer.sendPullCommand(-1); 237 synchronized (semaphore) { 238 semaphore.wait(2000); 239 } 240 } catch (InterruptedException e) { 241 Thread.currentThread().interrupt(); 242 } catch (JMSException e) { 243 } 244 245 } 246 247 protected void notifyMessageAvailable() { 248 synchronized (semaphore) { 249 semaphore.notifyAll(); 250 } 251 } 252 253 public String toString() { 254 return "ActiveMQQueueBrowser { value=" + consumerId + " }"; 255 } 256 257}