001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.amq; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.LinkedHashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.Map.Entry; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.atomic.AtomicReference; 032import java.util.concurrent.locks.Lock; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.command.ActiveMQDestination; 035import org.apache.activemq.command.DataStructure; 036import org.apache.activemq.command.JournalQueueAck; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 041import org.apache.activemq.kaha.MessageAckWithLocation; 042import org.apache.activemq.kaha.impl.async.Location; 043import org.apache.activemq.store.AbstractMessageStore; 044import org.apache.activemq.store.MessageRecoveryListener; 045import org.apache.activemq.store.PersistenceAdapter; 046import org.apache.activemq.store.ReferenceStore; 047import org.apache.activemq.store.ReferenceStore.ReferenceData; 048import org.apache.activemq.thread.Task; 049import org.apache.activemq.thread.TaskRunner; 050import org.apache.activemq.transaction.Synchronization; 051import org.apache.activemq.usage.MemoryUsage; 052import org.apache.activemq.util.Callback; 053import org.apache.activemq.util.TransactionTemplate; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * A MessageStore that uses a Journal to store it's messages. 059 * 060 * 061 */ 062public class AMQMessageStore extends AbstractMessageStore { 063 private static final Logger LOG = LoggerFactory.getLogger(AMQMessageStore.class); 064 protected final AMQPersistenceAdapter peristenceAdapter; 065 protected final AMQTransactionStore transactionStore; 066 protected final ReferenceStore referenceStore; 067 protected final TransactionTemplate transactionTemplate; 068 protected Location lastLocation; 069 protected Location lastWrittenLocation; 070 protected Set<Location> inFlightTxLocations = new HashSet<Location>(); 071 protected final TaskRunner asyncWriteTask; 072 protected CountDownLatch flushLatch; 073 private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>(); 074 private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>(); 075 /** A MessageStore that we can use to retrieve messages quickly. */ 076 private Map<MessageId, ReferenceData> cpAddedMessageIds; 077 private final boolean debug = LOG.isDebugEnabled(); 078 private final AtomicReference<Location> mark = new AtomicReference<Location>(); 079 protected final Lock lock; 080 081 public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { 082 super(destination); 083 this.peristenceAdapter = adapter; 084 this.lock = referenceStore.getStoreLock(); 085 this.transactionStore = adapter.getTransactionStore(); 086 this.referenceStore = referenceStore; 087 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext( 088 new NonCachedMessageEvaluationContext())); 089 asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() { 090 public boolean iterate() { 091 asyncWrite(); 092 return false; 093 } 094 }, "Checkpoint: " + destination); 095 } 096 097 public void setMemoryUsage(MemoryUsage memoryUsage) { 098 referenceStore.setMemoryUsage(memoryUsage); 099 } 100 101 /** 102 * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it 103 * is doing. 104 */ 105 public final void addMessage(ConnectionContext context, final Message message) throws IOException { 106 final MessageId id = message.getMessageId(); 107 final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); 108 if (!context.isInTransaction()) { 109 if (debug) { 110 LOG.debug("Journalled message add for: " + id + ", at: " + location); 111 } 112 this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); 113 addMessage(message, location); 114 } else { 115 if (debug) { 116 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); 117 } 118 lock.lock(); 119 try { 120 inFlightTxLocations.add(location); 121 } finally { 122 lock.unlock(); 123 } 124 transactionStore.addMessage(this, message, location); 125 context.getTransaction().addSynchronization(new Synchronization() { 126 public void afterCommit() throws Exception { 127 if (debug) { 128 LOG.debug("Transacted message add commit for: " + id + ", at: " + location); 129 } 130 lock.lock(); 131 try { 132 inFlightTxLocations.remove(location); 133 } finally { 134 lock.unlock(); 135 } 136 addMessage(message, location); 137 } 138 139 public void afterRollback() throws Exception { 140 if (debug) { 141 LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); 142 } 143 lock.lock(); 144 try { 145 inFlightTxLocations.remove(location); 146 } finally { 147 lock.unlock(); 148 } 149 } 150 }); 151 } 152 } 153 154 final void addMessage(final Message message, final Location location) throws InterruptedIOException { 155 ReferenceData data = new ReferenceData(); 156 data.setExpiration(message.getExpiration()); 157 data.setFileId(location.getDataFileId()); 158 data.setOffset(location.getOffset()); 159 lock.lock(); 160 try { 161 lastLocation = location; 162 ReferenceData prev = messages.put(message.getMessageId(), data); 163 if (prev != null) { 164 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, prev.getFileId()); 165 } 166 } finally { 167 lock.unlock(); 168 } 169 if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { 170 flush(); 171 } else { 172 try { 173 asyncWriteTask.wakeup(); 174 } catch (InterruptedException e) { 175 throw new InterruptedIOException(); 176 } 177 } 178 } 179 180 public boolean replayAddMessage(ConnectionContext context, Message message, Location location) { 181 MessageId id = message.getMessageId(); 182 try { 183 // Only add the message if it has not already been added. 184 ReferenceData data = referenceStore.getMessageReference(id); 185 if (data == null) { 186 data = new ReferenceData(); 187 data.setExpiration(message.getExpiration()); 188 data.setFileId(location.getDataFileId()); 189 data.setOffset(location.getOffset()); 190 referenceStore.addMessageReference(context, id, data); 191 return true; 192 } 193 } catch (Throwable e) { 194 LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " 195 + e, e); 196 } 197 return false; 198 } 199 200 /** 201 */ 202 public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException { 203 JournalQueueAck remove = new JournalQueueAck(); 204 remove.setDestination(destination); 205 remove.setMessageAck(ack); 206 final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); 207 if (!context.isInTransaction()) { 208 if (debug) { 209 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); 210 } 211 removeMessage(ack, location); 212 } else { 213 if (debug) { 214 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); 215 } 216 lock.lock(); 217 try { 218 inFlightTxLocations.add(location); 219 } finally { 220 lock.unlock(); 221 } 222 transactionStore.removeMessage(this, ack, location); 223 context.getTransaction().addSynchronization(new Synchronization() { 224 public void afterCommit() throws Exception { 225 if (debug) { 226 LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " 227 + location); 228 } 229 lock.lock(); 230 try { 231 inFlightTxLocations.remove(location); 232 } finally { 233 lock.unlock(); 234 } 235 removeMessage(ack, location); 236 } 237 238 public void afterRollback() throws Exception { 239 if (debug) { 240 LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " 241 + location); 242 } 243 lock.lock(); 244 try { 245 inFlightTxLocations.remove(location); 246 } finally { 247 lock.unlock(); 248 } 249 } 250 }); 251 } 252 } 253 254 final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { 255 ReferenceData data; 256 lock.lock(); 257 try { 258 lastLocation = location; 259 MessageId id = ack.getLastMessageId(); 260 data = messages.remove(id); 261 if (data == null) { 262 messageAcks.add(new MessageAckWithLocation(ack, location)); 263 } else { 264 // message never got written so datafileReference will still exist 265 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId()); 266 } 267 } finally { 268 lock.unlock(); 269 } 270 if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { 271 flush(); 272 } else if (data == null) { 273 try { 274 asyncWriteTask.wakeup(); 275 } catch (InterruptedException e) { 276 throw new InterruptedIOException(); 277 } 278 } 279 } 280 281 public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { 282 try { 283 // Only remove the message if it has not already been removed. 284 ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId()); 285 if (t != null) { 286 referenceStore.removeMessage(context, messageAck); 287 return true; 288 } 289 } catch (Throwable e) { 290 LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() 291 + "'. Message may have already been acknowledged. reason: " + e); 292 } 293 return false; 294 } 295 296 /** 297 * Waits till the lastest data has landed on the referenceStore 298 * 299 * @throws InterruptedIOException 300 */ 301 public void flush() throws InterruptedIOException { 302 if (LOG.isDebugEnabled()) { 303 LOG.debug("flush starting ..."); 304 } 305 CountDownLatch countDown; 306 lock.lock(); 307 try { 308 if (lastWrittenLocation == lastLocation) { 309 return; 310 } 311 if (flushLatch == null) { 312 flushLatch = new CountDownLatch(1); 313 } 314 countDown = flushLatch; 315 } finally { 316 lock.unlock(); 317 } 318 try { 319 asyncWriteTask.wakeup(); 320 countDown.await(); 321 } catch (InterruptedException e) { 322 throw new InterruptedIOException(); 323 } 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("flush finished"); 326 } 327 } 328 329 /** 330 * @return 331 * @throws IOException 332 */ 333 synchronized void asyncWrite() { 334 try { 335 CountDownLatch countDown; 336 lock.lock(); 337 try { 338 countDown = flushLatch; 339 flushLatch = null; 340 } finally { 341 lock.unlock(); 342 } 343 mark.set(doAsyncWrite()); 344 if (countDown != null) { 345 countDown.countDown(); 346 } 347 } catch (IOException e) { 348 LOG.error("Checkpoint failed: " + e, e); 349 } 350 } 351 352 /** 353 * @return 354 * @throws IOException 355 */ 356 protected Location doAsyncWrite() throws IOException { 357 final List<MessageAckWithLocation> cpRemovedMessageLocations; 358 final List<Location> cpActiveJournalLocations; 359 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); 360 final Location lastLocation; 361 // swap out the message hash maps.. 362 lock.lock(); 363 try { 364 cpAddedMessageIds = this.messages; 365 cpRemovedMessageLocations = this.messageAcks; 366 cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations); 367 this.messages = new LinkedHashMap<MessageId, ReferenceData>(); 368 this.messageAcks = new ArrayList<MessageAckWithLocation>(); 369 lastLocation = this.lastLocation; 370 } finally { 371 lock.unlock(); 372 } 373 if (LOG.isDebugEnabled()) { 374 LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " 375 + cpRemovedMessageLocations.size() + " "); 376 } 377 transactionTemplate.run(new Callback() { 378 public void execute() throws Exception { 379 int size = 0; 380 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); 381 ConnectionContext context = transactionTemplate.getContext(); 382 // Checkpoint the added messages. 383 Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator(); 384 while (iterator.hasNext()) { 385 Entry<MessageId, ReferenceData> entry = iterator.next(); 386 try { 387 if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) { 388 if (LOG.isDebugEnabled()) { 389 LOG.debug("adding message ref:" + entry.getKey()); 390 } 391 size++; 392 } else { 393 if (LOG.isDebugEnabled()) { 394 LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue()); 395 } 396 } 397 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry 398 .getValue().getFileId()); 399 } catch (Throwable e) { 400 LOG.warn("Message could not be added to long term store: " + e.getMessage(), e); 401 } 402 403 // Commit the batch if it's getting too big 404 if (size >= maxCheckpointMessageAddSize) { 405 persitanceAdapter.commitTransaction(context); 406 persitanceAdapter.beginTransaction(context); 407 size = 0; 408 } 409 } 410 persitanceAdapter.commitTransaction(context); 411 persitanceAdapter.beginTransaction(context); 412 // Checkpoint the removed messages. 413 for (MessageAckWithLocation ack : cpRemovedMessageLocations) { 414 try { 415 referenceStore.removeMessage(transactionTemplate.getContext(), ack); 416 } catch (Throwable e) { 417 LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e); 418 } 419 } 420 } 421 }); 422 LOG.debug("Batch update done. lastLocation:" + lastLocation); 423 lock.lock(); 424 try { 425 cpAddedMessageIds = null; 426 lastWrittenLocation = lastLocation; 427 } finally { 428 lock.unlock(); 429 } 430 if (cpActiveJournalLocations.size() > 0) { 431 Collections.sort(cpActiveJournalLocations); 432 return cpActiveJournalLocations.get(0); 433 } else { 434 return lastLocation; 435 } 436 } 437 438 /** 439 * 440 */ 441 public Message getMessage(MessageId identity) throws IOException { 442 Location location = getLocation(identity); 443 if (location != null) { 444 DataStructure rc = peristenceAdapter.readCommand(location); 445 try { 446 return (Message) rc; 447 } catch (ClassCastException e) { 448 throw new IOException("Could not read message " + identity + " at location " + location 449 + ", expected a message, but got: " + rc); 450 } 451 } 452 return null; 453 } 454 455 protected Location getLocation(MessageId messageId) throws IOException { 456 ReferenceData data = null; 457 lock.lock(); 458 try { 459 // Is it still in flight??? 460 data = messages.get(messageId); 461 if (data == null && cpAddedMessageIds != null) { 462 data = cpAddedMessageIds.get(messageId); 463 } 464 } finally { 465 lock.unlock(); 466 } 467 if (data == null) { 468 data = referenceStore.getMessageReference(messageId); 469 if (data == null) { 470 return null; 471 } 472 } 473 Location location = new Location(); 474 location.setDataFileId(data.getFileId()); 475 location.setOffset(data.getOffset()); 476 return location; 477 } 478 479 /** 480 * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the 481 * transaction log and then the cache is updated. 482 * 483 * @param listener 484 * @throws Exception 485 */ 486 public void recover(final MessageRecoveryListener listener) throws Exception { 487 flush(); 488 referenceStore.recover(new RecoveryListenerAdapter(this, listener)); 489 } 490 491 public void start() throws Exception { 492 referenceStore.start(); 493 } 494 495 public void stop() throws Exception { 496 flush(); 497 asyncWriteTask.shutdown(); 498 referenceStore.stop(); 499 } 500 501 /** 502 * @return Returns the longTermStore. 503 */ 504 public ReferenceStore getReferenceStore() { 505 return referenceStore; 506 } 507 508 /** 509 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 510 */ 511 public void removeAllMessages(ConnectionContext context) throws IOException { 512 flush(); 513 referenceStore.removeAllMessages(context); 514 } 515 516 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, 517 String messageRef) throws IOException { 518 throw new IOException("The journal does not support message references."); 519 } 520 521 public String getMessageReference(MessageId identity) throws IOException { 522 throw new IOException("The journal does not support message references."); 523 } 524 525 /** 526 * @return 527 * @throws IOException 528 * @see org.apache.activemq.store.MessageStore#getMessageCount() 529 */ 530 public int getMessageCount() throws IOException { 531 flush(); 532 return referenceStore.getMessageCount(); 533 } 534 535 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 536 RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); 537 referenceStore.recoverNextMessages(maxReturned, recoveryListener); 538 if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { 539 flush(); 540 referenceStore.recoverNextMessages(maxReturned, recoveryListener); 541 } 542 } 543 544 Message getMessage(ReferenceData data) throws IOException { 545 Location location = new Location(); 546 location.setDataFileId(data.getFileId()); 547 location.setOffset(data.getOffset()); 548 DataStructure rc = peristenceAdapter.readCommand(location); 549 try { 550 return (Message) rc; 551 } catch (ClassCastException e) { 552 throw new IOException("Could not read message at location " + location + ", expected a message, but got: " 553 + rc); 554 } 555 } 556 557 public void resetBatching() { 558 referenceStore.resetBatching(); 559 } 560 561 public Location getMark() { 562 return mark.get(); 563 } 564 565 public void dispose(ConnectionContext context) { 566 try { 567 flush(); 568 } catch (InterruptedIOException e) { 569 Thread.currentThread().interrupt(); 570 } 571 referenceStore.dispose(context); 572 super.dispose(context); 573 } 574 575 public void setBatch(MessageId messageId) { 576 try { 577 flush(); 578 } catch (InterruptedIOException e) { 579 LOG.debug("flush on setBatch resulted in exception", e); 580 } 581 getReferenceStore().setBatch(messageId); 582 } 583 584}