001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.Timer; 021import java.util.concurrent.SynchronousQueue; 022import java.util.concurrent.ThreadFactory; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.locks.ReentrantReadWriteLock; 028 029import org.apache.activemq.command.KeepAliveInfo; 030import org.apache.activemq.command.WireFormatInfo; 031import org.apache.activemq.thread.SchedulerTimerTask; 032import org.apache.activemq.wireformat.WireFormat; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Used to make sure that commands are arriving periodically from the peer of 038 * the transport. 039 */ 040public abstract class AbstractInactivityMonitor extends TransportFilter { 041 042 private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class); 043 044 private static ThreadPoolExecutor ASYNC_TASKS; 045 private static int CHECKER_COUNTER; 046 private static long DEFAULT_CHECK_TIME_MILLS = 30000; 047 private static Timer READ_CHECK_TIMER; 048 private static Timer WRITE_CHECK_TIMER; 049 050 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 051 052 private final AtomicBoolean commandSent = new AtomicBoolean(false); 053 private final AtomicBoolean inSend = new AtomicBoolean(false); 054 private final AtomicBoolean failed = new AtomicBoolean(false); 055 056 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 057 private final AtomicBoolean inReceive = new AtomicBoolean(false); 058 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 059 060 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); 061 062 private SchedulerTimerTask writeCheckerTask; 063 private SchedulerTimerTask readCheckerTask; 064 065 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; 066 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS; 067 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; 068 private boolean useKeepAlive = true; 069 private boolean keepAliveResponseRequired; 070 071 protected WireFormat wireFormat; 072 073 private final Runnable readChecker = new Runnable() { 074 long lastRunTime; 075 public void run() { 076 long now = System.currentTimeMillis(); 077 long elapsed = (now-lastRunTime); 078 079 if( lastRunTime != 0 && LOG.isDebugEnabled() ) { 080 LOG.debug(""+elapsed+" ms elapsed since last read check."); 081 } 082 083 // Perhaps the timer executed a read check late.. and then executes 084 // the next read check on time which causes the time elapsed between 085 // read checks to be small.. 086 087 // If less than 90% of the read check Time elapsed then abort this readcheck. 088 if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression. 089 LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); 090 return; 091 } 092 093 lastRunTime = now; 094 readCheck(); 095 } 096 }; 097 098 private boolean allowReadCheck(long elapsed) { 099 return elapsed > (readCheckTime * 9 / 10); 100 } 101 102 private final Runnable writeChecker = new Runnable() { 103 long lastRunTime; 104 public void run() { 105 long now = System.currentTimeMillis(); 106 if( lastRunTime != 0 && LOG.isDebugEnabled() ) { 107 LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check."); 108 109 } 110 lastRunTime = now; 111 writeCheck(); 112 } 113 }; 114 115 public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) { 116 super(next); 117 this.wireFormat = wireFormat; 118 } 119 120 public void start() throws Exception { 121 next.start(); 122 startMonitorThreads(); 123 } 124 125 public void stop() throws Exception { 126 stopMonitorThreads(); 127 next.stop(); 128 } 129 130 final void writeCheck() { 131 if (inSend.get()) { 132 if (LOG.isTraceEnabled()) { 133 LOG.trace("A send is in progress"); 134 } 135 return; 136 } 137 138 if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { 139 if (LOG.isTraceEnabled()) { 140 LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo"); 141 } 142 ASYNC_TASKS.execute(new Runnable() { 143 public void run() { 144 if (monitorStarted.get()) { 145 try { 146 // If we can't get the lock it means another write beat us into the 147 // send and we don't need to heart beat now. 148 if (sendLock.writeLock().tryLock()) { 149 KeepAliveInfo info = new KeepAliveInfo(); 150 info.setResponseRequired(keepAliveResponseRequired); 151 doOnewaySend(info); 152 } 153 } catch (IOException e) { 154 onException(e); 155 } finally { 156 if (sendLock.writeLock().isHeldByCurrentThread()) { 157 sendLock.writeLock().unlock(); 158 } 159 } 160 } 161 }; 162 }); 163 } else { 164 if (LOG.isTraceEnabled()) { 165 LOG.trace(this + " message sent since last write check, resetting flag"); 166 } 167 } 168 169 commandSent.set(false); 170 } 171 172 final void readCheck() { 173 int currentCounter = next.getReceiveCounter(); 174 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 175 if (inReceive.get() || currentCounter!=previousCounter ) { 176 if (LOG.isTraceEnabled()) { 177 LOG.trace("A receive is in progress"); 178 } 179 return; 180 } 181 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { 182 if (LOG.isDebugEnabled()) { 183 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); 184 } 185 ASYNC_TASKS.execute(new Runnable() { 186 public void run() { 187 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress())); 188 }; 189 }); 190 } else { 191 if (LOG.isTraceEnabled()) { 192 LOG.trace("Message received since last read check, resetting flag: "); 193 } 194 } 195 commandReceived.set(false); 196 } 197 198 protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException; 199 protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException; 200 201 public void onCommand(Object command) { 202 commandReceived.set(true); 203 inReceive.set(true); 204 try { 205 if (command.getClass() == KeepAliveInfo.class) { 206 KeepAliveInfo info = (KeepAliveInfo) command; 207 if (info.isResponseRequired()) { 208 sendLock.readLock().lock(); 209 try { 210 info.setResponseRequired(false); 211 oneway(info); 212 } catch (IOException e) { 213 onException(e); 214 } finally { 215 sendLock.readLock().unlock(); 216 } 217 } 218 } else { 219 if (command.getClass() == WireFormatInfo.class) { 220 synchronized (this) { 221 try { 222 processInboundWireFormatInfo((WireFormatInfo) command); 223 } catch (IOException e) { 224 onException(e); 225 } 226 } 227 } 228 229 transportListener.onCommand(command); 230 } 231 } finally { 232 inReceive.set(false); 233 } 234 } 235 236 public void oneway(Object o) throws IOException { 237 // To prevent the inactivity monitor from sending a message while we 238 // are performing a send we take a read lock. The inactivity monitor 239 // sends its Heart-beat commands under a write lock. This means that 240 // the MutexTransport is still responsible for synchronizing sends 241 this.sendLock.readLock().lock(); 242 inSend.set(true); 243 try { 244 doOnewaySend(o); 245 } finally { 246 commandSent.set(true); 247 inSend.set(false); 248 this.sendLock.readLock().unlock(); 249 } 250 } 251 252 // Must be called under lock, either read or write on sendLock. 253 private void doOnewaySend(Object command) throws IOException { 254 if( failed.get() ) { 255 throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress()); 256 } 257 if (command.getClass() == WireFormatInfo.class) { 258 synchronized (this) { 259 processOutboundWireFormatInfo((WireFormatInfo) command); 260 } 261 } 262 next.oneway(command); 263 } 264 265 public void onException(IOException error) { 266 if (failed.compareAndSet(false, true)) { 267 stopMonitorThreads(); 268 transportListener.onException(error); 269 } 270 } 271 272 public void setUseKeepAlive(boolean val) { 273 useKeepAlive = val; 274 } 275 276 public long getReadCheckTime() { 277 return readCheckTime; 278 } 279 280 public void setReadCheckTime(long readCheckTime) { 281 this.readCheckTime = readCheckTime; 282 } 283 284 public long getWriteCheckTime() { 285 return writeCheckTime; 286 } 287 288 public void setWriteCheckTime(long writeCheckTime) { 289 this.writeCheckTime = writeCheckTime; 290 } 291 292 public long getInitialDelayTime() { 293 return initialDelayTime; 294 } 295 296 public void setInitialDelayTime(long initialDelayTime) { 297 this.initialDelayTime = initialDelayTime; 298 } 299 300 public boolean isKeepAliveResponseRequired() { 301 return this.keepAliveResponseRequired; 302 } 303 304 public void setKeepAliveResponseRequired(boolean value) { 305 this.keepAliveResponseRequired = value; 306 } 307 308 public boolean isMonitorStarted() { 309 return this.monitorStarted.get(); 310 } 311 312 protected synchronized void startMonitorThreads() throws IOException { 313 if (monitorStarted.get()) { 314 return; 315 } 316 317 if (!configuredOk()) { 318 return; 319 } 320 321 if (readCheckTime > 0) { 322 readCheckerTask = new SchedulerTimerTask(readChecker); 323 } 324 325 if (writeCheckTime > 0) { 326 writeCheckerTask = new SchedulerTimerTask(writeChecker); 327 } 328 329 if (writeCheckTime > 0 || readCheckTime > 0) { 330 monitorStarted.set(true); 331 synchronized(AbstractInactivityMonitor.class) { 332 if( CHECKER_COUNTER == 0 ) { 333 ASYNC_TASKS = createExecutor(); 334 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); 335 WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); 336 } 337 CHECKER_COUNTER++; 338 if (readCheckTime > 0) { 339 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); 340 } 341 if (writeCheckTime > 0) { 342 WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); 343 } 344 } 345 } 346 } 347 348 abstract protected boolean configuredOk() throws IOException; 349 350 protected synchronized void stopMonitorThreads() { 351 if (monitorStarted.compareAndSet(true, false)) { 352 if (readCheckerTask != null) { 353 readCheckerTask.cancel(); 354 } 355 if (writeCheckerTask != null) { 356 writeCheckerTask.cancel(); 357 } 358 synchronized( AbstractInactivityMonitor.class ) { 359 WRITE_CHECK_TIMER.purge(); 360 READ_CHECK_TIMER.purge(); 361 CHECKER_COUNTER--; 362 if(CHECKER_COUNTER==0) { 363 WRITE_CHECK_TIMER.cancel(); 364 READ_CHECK_TIMER.cancel(); 365 WRITE_CHECK_TIMER = null; 366 READ_CHECK_TIMER = null; 367 ASYNC_TASKS.shutdown(); 368 ASYNC_TASKS = null; 369 } 370 } 371 } 372 } 373 374 private ThreadFactory factory = new ThreadFactory() { 375 public Thread newThread(Runnable runnable) { 376 Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); 377 thread.setDaemon(true); 378 return thread; 379 } 380 }; 381 382 private ThreadPoolExecutor createExecutor() { 383 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 384 exec.allowCoreThreadTimeOut(true); 385 return exec; 386 } 387}