001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.amq; 018 019import java.io.File; 020import java.io.IOException; 021import java.io.RandomAccessFile; 022import java.nio.channels.FileLock; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.activeio.journal.Journal; 035import org.apache.activemq.broker.BrokerService; 036import org.apache.activemq.broker.BrokerServiceAware; 037import org.apache.activemq.broker.ConnectionContext; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ActiveMQQueue; 040import org.apache.activemq.command.ActiveMQTopic; 041import org.apache.activemq.command.DataStructure; 042import org.apache.activemq.command.JournalQueueAck; 043import org.apache.activemq.command.JournalTopicAck; 044import org.apache.activemq.command.JournalTrace; 045import org.apache.activemq.command.JournalTransaction; 046import org.apache.activemq.command.Message; 047import org.apache.activemq.command.ProducerId; 048import org.apache.activemq.command.SubscriptionInfo; 049import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 050import org.apache.activemq.kaha.impl.async.AsyncDataManager; 051import org.apache.activemq.kaha.impl.async.Location; 052import org.apache.activemq.kaha.impl.index.hash.HashIndex; 053import org.apache.activemq.openwire.OpenWireFormat; 054import org.apache.activemq.store.MessageStore; 055import org.apache.activemq.store.PersistenceAdapter; 056import org.apache.activemq.store.ReferenceStore; 057import org.apache.activemq.store.ReferenceStoreAdapter; 058import org.apache.activemq.store.TopicMessageStore; 059import org.apache.activemq.store.TopicReferenceStore; 060import org.apache.activemq.store.TransactionStore; 061import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; 062import org.apache.activemq.thread.Scheduler; 063import org.apache.activemq.thread.Task; 064import org.apache.activemq.thread.TaskRunner; 065import org.apache.activemq.thread.TaskRunnerFactory; 066import org.apache.activemq.usage.SystemUsage; 067import org.apache.activemq.usage.Usage; 068import org.apache.activemq.usage.UsageListener; 069import org.apache.activemq.util.ByteSequence; 070import org.apache.activemq.util.IOExceptionSupport; 071import org.apache.activemq.util.IOHelper; 072import org.apache.activemq.wireformat.WireFormat; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076 077/** 078 * An implementation of {@link PersistenceAdapter} designed for use with a 079 * {@link Journal} and then check pointing asynchronously on a timeout with some 080 * other long term persistent storage. 081 * 082 * @org.apache.xbean.XBean element="amqPersistenceAdapter" 083 * 084 */ 085public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { 086 087 private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class); 088 private Scheduler scheduler; 089 private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>(); 090 private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>(); 091 private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; 092 private static final boolean BROKEN_FILE_LOCK; 093 private static final boolean DISABLE_LOCKING; 094 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 095 private AsyncDataManager asyncDataManager; 096 private ReferenceStoreAdapter referenceStoreAdapter; 097 private TaskRunnerFactory taskRunnerFactory; 098 private WireFormat wireFormat = new OpenWireFormat(); 099 private SystemUsage usageManager; 100 private long checkpointInterval = 1000 * 20; 101 private int maxCheckpointMessageAddSize = 1024 * 4; 102 private final AMQTransactionStore transactionStore = new AMQTransactionStore(this); 103 private TaskRunner checkpointTask; 104 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 105 private final AtomicBoolean started = new AtomicBoolean(false); 106 private Runnable periodicCheckpointTask; 107 private Runnable periodicCleanupTask; 108 private boolean deleteAllMessages; 109 private boolean syncOnWrite; 110 private String brokerName = ""; 111 private File directory; 112 private File directoryArchive; 113 private BrokerService brokerService; 114 private final AtomicLong storeSize = new AtomicLong(); 115 private boolean persistentIndex=true; 116 private boolean useNio = true; 117 private boolean archiveDataLogs=false; 118 private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL; 119 private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; 120 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; 121 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; 122 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; 123 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; 124 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; 125 private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; 126 private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> (); 127 private RandomAccessFile lockFile; 128 private FileLock lock; 129 private boolean disableLocking = DISABLE_LOCKING; 130 private boolean failIfJournalIsLocked; 131 private boolean lockLogged; 132 private boolean lockAquired; 133 private boolean recoverReferenceStore=true; 134 private boolean forceRecoverReferenceStore=false; 135 private boolean useDedicatedTaskRunner=false; 136 private int journalThreadPriority = Thread.MAX_PRIORITY; 137 138 public String getBrokerName() { 139 return this.brokerName; 140 } 141 142 public void setBrokerName(String brokerName) { 143 this.brokerName = brokerName; 144 if (this.referenceStoreAdapter != null) { 145 this.referenceStoreAdapter.setBrokerName(brokerName); 146 } 147 } 148 149 public BrokerService getBrokerService() { 150 return brokerService; 151 } 152 153 public void setBrokerService(BrokerService brokerService) { 154 this.brokerService = brokerService; 155 } 156 157 public synchronized void start() throws Exception { 158 if (!started.compareAndSet(false, true)) { 159 return; 160 } 161 if (this.directory == null) { 162 if (brokerService != null) { 163 this.directory = brokerService.getBrokerDataDirectory(); 164 165 } else { 166 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); 167 this.directory = new File(directory, "amqstore"); 168 directory.getAbsolutePath(); 169 } 170 } 171 if (this.directoryArchive == null) { 172 this.directoryArchive = new File(this.directory,"archive"); 173 } 174 if (this.brokerService != null) { 175 this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory(); 176 this.scheduler = this.brokerService.getScheduler(); 177 } else { 178 this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(), 179 true, 1000, isUseDedicatedTaskRunner()); 180 this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler"); 181 } 182 183 IOHelper.mkdirs(this.directory); 184 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); 185 lock(); 186 LOG.info("AMQStore starting using directory: " + directory); 187 if (archiveDataLogs) { 188 IOHelper.mkdirs(this.directoryArchive); 189 } 190 191 if (this.usageManager != null) { 192 this.usageManager.getMemoryUsage().addUsageListener(this); 193 } 194 if (asyncDataManager == null) { 195 asyncDataManager = createAsyncDataManager(); 196 } 197 if (referenceStoreAdapter == null) { 198 referenceStoreAdapter = createReferenceStoreAdapter(); 199 } 200 referenceStoreAdapter.setDirectory(new File(directory, "kr-store")); 201 referenceStoreAdapter.setBrokerName(getBrokerName()); 202 referenceStoreAdapter.setUsageManager(usageManager); 203 referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength()); 204 205 if (failIfJournalIsLocked) { 206 asyncDataManager.lock(); 207 } else { 208 while (true) { 209 try { 210 asyncDataManager.lock(); 211 break; 212 } catch (IOException e) { 213 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e); 214 try { 215 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 216 } catch (InterruptedException e1) { 217 } 218 } 219 } 220 } 221 222 asyncDataManager.start(); 223 if (deleteAllMessages) { 224 asyncDataManager.delete(); 225 try { 226 JournalTrace trace = new JournalTrace(); 227 trace.setMessage("DELETED " + new Date()); 228 Location location = asyncDataManager.write(wireFormat.marshal(trace), false); 229 asyncDataManager.setMark(location, true); 230 LOG.info("Journal deleted: "); 231 deleteAllMessages = false; 232 } catch (IOException e) { 233 throw e; 234 } catch (Throwable e) { 235 throw IOExceptionSupport.create(e); 236 } 237 referenceStoreAdapter.deleteAllMessages(); 238 } 239 referenceStoreAdapter.start(); 240 Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse(); 241 LOG.info("Active data files: " + files); 242 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 243 244 public boolean iterate() { 245 doCheckpoint(); 246 return false; 247 } 248 }, "ActiveMQ Journal Checkpoint Worker"); 249 createTransactionStore(); 250 251 // 252 // The following was attempting to reduce startup times by avoiding the 253 // log 254 // file scanning that recovery performs. The problem with it is that XA 255 // transactions 256 // only live in transaction log and are not stored in the reference 257 // store, but they still 258 // need to be recovered when the broker starts up. 259 260 if (isForceRecoverReferenceStore() 261 || (isRecoverReferenceStore() && !referenceStoreAdapter 262 .isStoreValid())) { 263 LOG.warn("The ReferenceStore is not valid - recovering ..."); 264 recover(); 265 LOG.info("Finished recovering the ReferenceStore"); 266 } else { 267 Location location = writeTraceMessage("RECOVERED " + new Date(), 268 true); 269 asyncDataManager.setMark(location, true); 270 // recover transactions 271 getTransactionStore().setPreparedTransactions( 272 referenceStoreAdapter.retrievePreparedState()); 273 } 274 275 // Do a checkpoint periodically. 276 periodicCheckpointTask = new Runnable() { 277 278 public void run() { 279 checkpoint(false); 280 } 281 }; 282 scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval()); 283 periodicCleanupTask = new Runnable() { 284 285 public void run() { 286 cleanup(); 287 } 288 }; 289 scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval()); 290 291 if (lockAquired && lockLogged) { 292 LOG.info("Aquired lock for AMQ Store" + getDirectory()); 293 if (brokerService != null) { 294 brokerService.getBroker().nowMasterBroker(); 295 } 296 } 297 298 } 299 300 public void stop() throws Exception { 301 302 if (!started.compareAndSet(true, false)) { 303 return; 304 } 305 unlock(); 306 if (lockFile != null) { 307 lockFile.close(); 308 lockFile = null; 309 } 310 this.usageManager.getMemoryUsage().removeUsageListener(this); 311 synchronized (this) { 312 scheduler.cancel(periodicCheckpointTask); 313 scheduler.cancel(periodicCleanupTask); 314 } 315 Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); 316 while (queueIterator.hasNext()) { 317 AMQMessageStore ms = queueIterator.next(); 318 ms.stop(); 319 } 320 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); 321 while (topicIterator.hasNext()) { 322 final AMQTopicMessageStore ms = topicIterator.next(); 323 ms.stop(); 324 } 325 // Take one final checkpoint and stop checkpoint processing. 326 checkpoint(true); 327 synchronized (this) { 328 checkpointTask.shutdown(); 329 } 330 referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions()); 331 queues.clear(); 332 topics.clear(); 333 IOException firstException = null; 334 referenceStoreAdapter.stop(); 335 referenceStoreAdapter = null; 336 337 if (this.brokerService == null) { 338 this.taskRunnerFactory.shutdown(); 339 this.scheduler.stop(); 340 } 341 try { 342 LOG.debug("Journal close"); 343 asyncDataManager.close(); 344 } catch (Exception e) { 345 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 346 } 347 if (firstException != null) { 348 throw firstException; 349 } 350 } 351 352 /** 353 * When we checkpoint we move all the journalled data to long term storage. 354 * 355 * @param sync 356 */ 357 public void checkpoint(boolean sync) { 358 try { 359 if (asyncDataManager == null) { 360 throw new IllegalStateException("Journal is closed."); 361 } 362 CountDownLatch latch = null; 363 synchronized (this) { 364 latch = nextCheckpointCountDownLatch; 365 checkpointTask.wakeup(); 366 } 367 if (sync) { 368 if (LOG.isDebugEnabled()) { 369 LOG.debug("Waitng for checkpoint to complete."); 370 } 371 latch.await(); 372 } 373 referenceStoreAdapter.checkpoint(sync); 374 } catch (InterruptedException e) { 375 Thread.currentThread().interrupt(); 376 LOG.warn("Request to start checkpoint failed: " + e, e); 377 } catch (IOException e) { 378 LOG.error("checkpoint failed: " + e, e); 379 } 380 } 381 382 /** 383 * This does the actual checkpoint. 384 * 385 * @return true if successful 386 */ 387 public boolean doCheckpoint() { 388 CountDownLatch latch = null; 389 synchronized (this) { 390 latch = nextCheckpointCountDownLatch; 391 nextCheckpointCountDownLatch = new CountDownLatch(1); 392 } 393 try { 394 if (LOG.isDebugEnabled()) { 395 LOG.debug("Checkpoint started."); 396 } 397 398 Location currentMark = asyncDataManager.getMark(); 399 Location newMark = currentMark; 400 Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); 401 while (queueIterator.hasNext()) { 402 final AMQMessageStore ms = queueIterator.next(); 403 Location mark = ms.getMark(); 404 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { 405 newMark = mark; 406 } 407 } 408 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); 409 while (topicIterator.hasNext()) { 410 final AMQTopicMessageStore ms = topicIterator.next(); 411 Location mark = ms.getMark(); 412 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { 413 newMark = mark; 414 } 415 } 416 try { 417 if (newMark != currentMark) { 418 if (LOG.isDebugEnabled()) { 419 LOG.debug("Marking journal at: " + newMark); 420 } 421 asyncDataManager.setMark(newMark, false); 422 writeTraceMessage("CHECKPOINT " + new Date(), true); 423 } 424 } catch (Exception e) { 425 LOG.error("Failed to mark the Journal: " + e, e); 426 } 427 if (LOG.isDebugEnabled()) { 428 LOG.debug("Checkpoint done."); 429 } 430 } finally { 431 latch.countDown(); 432 } 433 return true; 434 } 435 436 /** 437 * Cleans up the data files 438 * @throws IOException 439 */ 440 public void cleanup() { 441 try { 442 Set<Integer>inProgress = new HashSet<Integer>(); 443 if (LOG.isDebugEnabled()) { 444 LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values()); 445 } 446 for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) { 447 inProgress.addAll(set.keySet()); 448 } 449 Integer lastDataFile = asyncDataManager.getCurrentDataFileId(); 450 inProgress.add(lastDataFile); 451 lastDataFile = asyncDataManager.getMark().getDataFileId(); 452 inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse()); 453 Location lastActiveTx = transactionStore.checkpoint(); 454 if (lastActiveTx != null) { 455 lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId()); 456 } 457 LOG.debug("lastDataFile: " + lastDataFile); 458 asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1); 459 } catch (IOException e) { 460 LOG.error("Could not cleanup data files: " + e, e); 461 } 462 } 463 464 public Set<ActiveMQDestination> getDestinations() { 465 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations()); 466 destinations.addAll(queues.keySet()); 467 destinations.addAll(topics.keySet()); 468 return destinations; 469 } 470 471 MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 472 if (destination.isQueue()) { 473 return createQueueMessageStore((ActiveMQQueue)destination); 474 } else { 475 return createTopicMessageStore((ActiveMQTopic)destination); 476 } 477 } 478 479 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 480 AMQMessageStore store = queues.get(destination); 481 if (store == null) { 482 ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); 483 store = new AMQMessageStore(this, checkpointStore, destination); 484 try { 485 store.start(); 486 } catch (Exception e) { 487 throw IOExceptionSupport.create(e); 488 } 489 queues.put(destination, store); 490 } 491 return store; 492 } 493 494 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 495 AMQTopicMessageStore store = topics.get(destinationName); 496 if (store == null) { 497 TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); 498 store = new AMQTopicMessageStore(this,checkpointStore, destinationName); 499 try { 500 store.start(); 501 } catch (Exception e) { 502 throw IOExceptionSupport.create(e); 503 } 504 topics.put(destinationName, store); 505 } 506 return store; 507 } 508 509 /** 510 * Cleanup method to remove any state associated with the given destination 511 * 512 * @param destination 513 */ 514 public void removeQueueMessageStore(ActiveMQQueue destination) { 515 AMQMessageStore store= queues.remove(destination); 516 referenceStoreAdapter.removeQueueMessageStore(destination); 517 } 518 519 /** 520 * Cleanup method to remove any state associated with the given destination 521 * 522 * @param destination 523 */ 524 public void removeTopicMessageStore(ActiveMQTopic destination) { 525 topics.remove(destination); 526 } 527 528 public TransactionStore createTransactionStore() throws IOException { 529 return transactionStore; 530 } 531 532 public long getLastMessageBrokerSequenceId() throws IOException { 533 return referenceStoreAdapter.getLastMessageBrokerSequenceId(); 534 } 535 536 public void beginTransaction(ConnectionContext context) throws IOException { 537 referenceStoreAdapter.beginTransaction(context); 538 } 539 540 public void commitTransaction(ConnectionContext context) throws IOException { 541 referenceStoreAdapter.commitTransaction(context); 542 } 543 544 public void rollbackTransaction(ConnectionContext context) throws IOException { 545 referenceStoreAdapter.rollbackTransaction(context); 546 } 547 548 public boolean isPersistentIndex() { 549 return persistentIndex; 550 } 551 552 public void setPersistentIndex(boolean persistentIndex) { 553 this.persistentIndex = persistentIndex; 554 } 555 556 /** 557 * @param location 558 * @return 559 * @throws IOException 560 */ 561 public DataStructure readCommand(Location location) throws IOException { 562 try { 563 ByteSequence packet = asyncDataManager.read(location); 564 return (DataStructure)wireFormat.unmarshal(packet); 565 } catch (IOException e) { 566 throw createReadException(location, e); 567 } 568 } 569 570 /** 571 * Move all the messages that were in the journal into long term storage. We 572 * just replay and do a checkpoint. 573 * 574 * @throws IOException 575 * @throws IOException 576 * @throws IllegalStateException 577 */ 578 private void recover() throws IllegalStateException, IOException { 579 referenceStoreAdapter.clearMessages(); 580 Location pos = null; 581 int redoCounter = 0; 582 LOG.info("Journal Recovery Started from: " + asyncDataManager); 583 long start = System.currentTimeMillis(); 584 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 585 // While we have records in the journal. 586 while ((pos = asyncDataManager.getNextLocation(pos)) != null) { 587 ByteSequence data = asyncDataManager.read(pos); 588 DataStructure c = (DataStructure)wireFormat.unmarshal(data); 589 if (c instanceof Message) { 590 Message message = (Message)c; 591 AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination()); 592 if (message.isInTransaction()) { 593 transactionStore.addMessage(store, message, pos); 594 } else { 595 if (store.replayAddMessage(context, message, pos)) { 596 redoCounter++; 597 } 598 } 599 } else { 600 switch (c.getDataStructureType()) { 601 case SubscriptionInfo.DATA_STRUCTURE_TYPE: { 602 referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c); 603 } 604 break; 605 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 606 JournalQueueAck command = (JournalQueueAck)c; 607 AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination()); 608 if (command.getMessageAck().isInTransaction()) { 609 transactionStore.removeMessage(store, command.getMessageAck(), pos); 610 } else { 611 if (store.replayRemoveMessage(context, command.getMessageAck())) { 612 redoCounter++; 613 } 614 } 615 } 616 break; 617 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 618 JournalTopicAck command = (JournalTopicAck)c; 619 AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination()); 620 if (command.getTransactionId() != null) { 621 transactionStore.acknowledge(store, command, pos); 622 } else { 623 if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) { 624 redoCounter++; 625 } 626 } 627 } 628 break; 629 case JournalTransaction.DATA_STRUCTURE_TYPE: { 630 JournalTransaction command = (JournalTransaction)c; 631 try { 632 // Try to replay the packet. 633 switch (command.getType()) { 634 case JournalTransaction.XA_PREPARE: 635 transactionStore.replayPrepare(command.getTransactionId()); 636 break; 637 case JournalTransaction.XA_COMMIT: 638 case JournalTransaction.LOCAL_COMMIT: 639 AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 640 if (tx == null) { 641 break; // We may be trying to replay a commit 642 } 643 // that 644 // was already committed. 645 // Replay the committed operations. 646 tx.getOperations(); 647 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 648 AMQTxOperation op = (AMQTxOperation)iter.next(); 649 if (op.replay(this, context)) { 650 redoCounter++; 651 } 652 } 653 break; 654 case JournalTransaction.LOCAL_ROLLBACK: 655 case JournalTransaction.XA_ROLLBACK: 656 transactionStore.replayRollback(command.getTransactionId()); 657 break; 658 default: 659 throw new IOException("Invalid journal command type: " + command.getType()); 660 } 661 } catch (IOException e) { 662 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 663 } 664 } 665 break; 666 case JournalTrace.DATA_STRUCTURE_TYPE: 667 JournalTrace trace = (JournalTrace)c; 668 LOG.debug("TRACE Entry: " + trace.getMessage()); 669 break; 670 default: 671 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 672 } 673 } 674 } 675 Location location = writeTraceMessage("RECOVERED " + new Date(), true); 676 asyncDataManager.setMark(location, true); 677 long end = System.currentTimeMillis(); 678 LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); 679 } 680 681 private IOException createReadException(Location location, Exception e) { 682 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 683 } 684 685 protected IOException createWriteException(DataStructure packet, Exception e) { 686 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 687 } 688 689 protected IOException createWriteException(String command, Exception e) { 690 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 691 } 692 693 protected IOException createRecoveryFailedException(Exception e) { 694 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 695 } 696 697 /** 698 * @param command 699 * @param syncHint 700 * @return 701 * @throws IOException 702 */ 703 public Location writeCommand(DataStructure command, boolean syncHint) throws IOException { 704 return writeCommand(command, syncHint,false); 705 } 706 707 public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException { 708 try { 709 return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite))); 710 } catch (IOException ioe) { 711 LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe); 712 brokerService.handleIOException(ioe); 713 throw ioe; 714 } 715 } 716 717 private Location writeTraceMessage(String message, boolean sync) throws IOException { 718 JournalTrace trace = new JournalTrace(); 719 trace.setMessage(message); 720 return writeCommand(trace, sync); 721 } 722 723 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 724 newPercentUsage = (newPercentUsage / 10) * 10; 725 oldPercentUsage = (oldPercentUsage / 10) * 10; 726 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 727 checkpoint(false); 728 } 729 } 730 731 public AMQTransactionStore getTransactionStore() { 732 return transactionStore; 733 } 734 735 public synchronized void deleteAllMessages() throws IOException { 736 deleteAllMessages = true; 737 } 738 739 @Override 740 public String toString() { 741 return "AMQPersistenceAdapter(" + directory + ")"; 742 } 743 744 // ///////////////////////////////////////////////////////////////// 745 // Subclass overridables 746 // ///////////////////////////////////////////////////////////////// 747 protected AsyncDataManager createAsyncDataManager() { 748 AsyncDataManager manager = new AsyncDataManager(storeSize); 749 manager.setDirectory(new File(directory, "journal")); 750 manager.setDirectoryArchive(getDirectoryArchive()); 751 manager.setArchiveDataLogs(isArchiveDataLogs()); 752 manager.setMaxFileLength(maxFileLength); 753 manager.setUseNio(useNio); 754 return manager; 755 } 756 757 protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { 758 KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize); 759 adaptor.setPersistentIndex(isPersistentIndex()); 760 adaptor.setIndexBinSize(getIndexBinSize()); 761 adaptor.setIndexKeySize(getIndexKeySize()); 762 adaptor.setIndexPageSize(getIndexPageSize()); 763 adaptor.setIndexMaxBinSize(getIndexMaxBinSize()); 764 adaptor.setIndexLoadFactor(getIndexLoadFactor()); 765 return adaptor; 766 } 767 768 // ///////////////////////////////////////////////////////////////// 769 // Property Accessors 770 // ///////////////////////////////////////////////////////////////// 771 public AsyncDataManager getAsyncDataManager() { 772 return asyncDataManager; 773 } 774 775 public void setAsyncDataManager(AsyncDataManager asyncDataManager) { 776 this.asyncDataManager = asyncDataManager; 777 } 778 779 public ReferenceStoreAdapter getReferenceStoreAdapter() { 780 return referenceStoreAdapter; 781 } 782 783 public TaskRunnerFactory getTaskRunnerFactory() { 784 return taskRunnerFactory; 785 } 786 787 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 788 this.taskRunnerFactory = taskRunnerFactory; 789 } 790 791 /** 792 * @return Returns the wireFormat. 793 */ 794 public WireFormat getWireFormat() { 795 return wireFormat; 796 } 797 798 public void setWireFormat(WireFormat wireFormat) { 799 this.wireFormat = wireFormat; 800 } 801 802 public SystemUsage getUsageManager() { 803 return usageManager; 804 } 805 806 public void setUsageManager(SystemUsage usageManager) { 807 this.usageManager = usageManager; 808 } 809 810 public int getMaxCheckpointMessageAddSize() { 811 return maxCheckpointMessageAddSize; 812 } 813 814 /** 815 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 816 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 817 */ 818 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 819 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 820 } 821 822 823 public synchronized File getDirectory() { 824 return directory; 825 } 826 827 public synchronized void setDirectory(File directory) { 828 this.directory = directory; 829 } 830 831 public boolean isSyncOnWrite() { 832 return this.syncOnWrite; 833 } 834 835 public void setSyncOnWrite(boolean syncOnWrite) { 836 this.syncOnWrite = syncOnWrite; 837 } 838 839 /** 840 * @param referenceStoreAdapter the referenceStoreAdapter to set 841 */ 842 public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { 843 this.referenceStoreAdapter = referenceStoreAdapter; 844 } 845 846 public long size(){ 847 return storeSize.get(); 848 } 849 850 public boolean isUseNio() { 851 return useNio; 852 } 853 854 public void setUseNio(boolean useNio) { 855 this.useNio = useNio; 856 } 857 858 public int getMaxFileLength() { 859 return maxFileLength; 860 } 861 862 /** 863 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 864 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 865 */ 866 public void setMaxFileLength(int maxFileLength) { 867 this.maxFileLength = maxFileLength; 868 } 869 870 public long getCleanupInterval() { 871 return cleanupInterval; 872 } 873 874 public void setCleanupInterval(long cleanupInterval) { 875 this.cleanupInterval = cleanupInterval; 876 } 877 878 public long getCheckpointInterval() { 879 return checkpointInterval; 880 } 881 882 public void setCheckpointInterval(long checkpointInterval) { 883 this.checkpointInterval = checkpointInterval; 884 } 885 886 public int getIndexBinSize() { 887 return indexBinSize; 888 } 889 890 public void setIndexBinSize(int indexBinSize) { 891 this.indexBinSize = indexBinSize; 892 } 893 894 public int getIndexKeySize() { 895 return indexKeySize; 896 } 897 898 public void setIndexKeySize(int indexKeySize) { 899 this.indexKeySize = indexKeySize; 900 } 901 902 public int getIndexPageSize() { 903 return indexPageSize; 904 } 905 906 public int getIndexMaxBinSize() { 907 return indexMaxBinSize; 908 } 909 910 public void setIndexMaxBinSize(int maxBinSize) { 911 this.indexMaxBinSize = maxBinSize; 912 } 913 914 /** 915 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 916 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 917 */ 918 public void setIndexPageSize(int indexPageSize) { 919 this.indexPageSize = indexPageSize; 920 } 921 922 public void setIndexLoadFactor(int factor){ 923 this.indexLoadFactor=factor; 924 } 925 926 public int getIndexLoadFactor(){ 927 return this.indexLoadFactor; 928 } 929 930 public int getMaxReferenceFileLength() { 931 return maxReferenceFileLength; 932 } 933 934 /** 935 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 936 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 937 */ 938 public void setMaxReferenceFileLength(int maxReferenceFileLength) { 939 this.maxReferenceFileLength = maxReferenceFileLength; 940 } 941 942 public File getDirectoryArchive() { 943 return directoryArchive; 944 } 945 946 public void setDirectoryArchive(File directoryArchive) { 947 this.directoryArchive = directoryArchive; 948 } 949 950 public boolean isArchiveDataLogs() { 951 return archiveDataLogs; 952 } 953 954 public void setArchiveDataLogs(boolean archiveDataLogs) { 955 this.archiveDataLogs = archiveDataLogs; 956 } 957 958 public boolean isDisableLocking() { 959 return disableLocking; 960 } 961 962 public void setDisableLocking(boolean disableLocking) { 963 this.disableLocking = disableLocking; 964 } 965 966 /** 967 * @return the recoverReferenceStore 968 */ 969 public boolean isRecoverReferenceStore() { 970 return recoverReferenceStore; 971 } 972 973 /** 974 * @param recoverReferenceStore the recoverReferenceStore to set 975 */ 976 public void setRecoverReferenceStore(boolean recoverReferenceStore) { 977 this.recoverReferenceStore = recoverReferenceStore; 978 } 979 980 /** 981 * @return the forceRecoverReferenceStore 982 */ 983 public boolean isForceRecoverReferenceStore() { 984 return forceRecoverReferenceStore; 985 } 986 987 /** 988 * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set 989 */ 990 public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) { 991 this.forceRecoverReferenceStore = forceRecoverReferenceStore; 992 } 993 994 public boolean isUseDedicatedTaskRunner() { 995 return useDedicatedTaskRunner; 996 } 997 998 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 999 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1000 } 1001 1002 /** 1003 * @return the journalThreadPriority 1004 */ 1005 public int getJournalThreadPriority() { 1006 return this.journalThreadPriority; 1007 } 1008 1009 /** 1010 * @param journalThreadPriority the journalThreadPriority to set 1011 */ 1012 public void setJournalThreadPriority(int journalThreadPriority) { 1013 this.journalThreadPriority = journalThreadPriority; 1014 } 1015 1016 1017 protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { 1018 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); 1019 if (map == null) { 1020 map = new ConcurrentHashMap<Integer, AtomicInteger>(); 1021 dataFilesInProgress.put(store, map); 1022 } 1023 AtomicInteger count = map.get(dataFileId); 1024 if (count == null) { 1025 count = new AtomicInteger(0); 1026 map.put(dataFileId, count); 1027 } 1028 count.incrementAndGet(); 1029 } 1030 1031 protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) { 1032 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); 1033 if (map != null) { 1034 AtomicInteger count = map.get(dataFileId); 1035 if (count != null) { 1036 int newCount = count.decrementAndGet(); 1037 if (newCount <=0) { 1038 map.remove(dataFileId); 1039 } 1040 } 1041 if (map.isEmpty()) { 1042 dataFilesInProgress.remove(store); 1043 } 1044 } 1045 } 1046 1047 1048 protected void lock() throws Exception { 1049 lockLogged = false; 1050 lockAquired = false; 1051 do { 1052 if (doLock()) { 1053 lockAquired = true; 1054 } else { 1055 if (!lockLogged) { 1056 LOG.warn("Waiting to Lock the Store " + getDirectory()); 1057 lockLogged = true; 1058 } 1059 Thread.sleep(1000); 1060 } 1061 1062 } while (!lockAquired && !disableLocking); 1063 } 1064 1065 private synchronized void unlock() throws IOException { 1066 if (!disableLocking && (null != lock)) { 1067 //clear property doesn't work on some platforms 1068 System.getProperties().remove(getPropertyKey()); 1069 System.clearProperty(getPropertyKey()); 1070 assert(System.getProperty(getPropertyKey())==null); 1071 if (lock.isValid()) { 1072 lock.release(); 1073 lock.channel().close(); 1074 1075 } 1076 lock = null; 1077 } 1078 } 1079 1080 1081 protected boolean doLock() throws IOException { 1082 boolean result = true; 1083 if (!disableLocking && directory != null && lock == null) { 1084 String key = getPropertyKey(); 1085 String property = System.getProperty(key); 1086 if (null == property) { 1087 if (!BROKEN_FILE_LOCK) { 1088 lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false); 1089 if (lock == null) { 1090 result = false; 1091 } else { 1092 System.setProperty(key, new Date().toString()); 1093 } 1094 } 1095 } else { // already locked 1096 result = false; 1097 } 1098 } 1099 return result; 1100 } 1101 1102 private String getPropertyKey() throws IOException { 1103 return getClass().getName() + ".lock." + directory.getCanonicalPath(); 1104 } 1105 1106 static { 1107 BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX 1108 + ".FileLockBroken", 1109 "false")); 1110 DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX 1111 + ".DisableLocking", 1112 "false")); 1113 } 1114 1115 1116 public long getLastProducerSequenceId(ProducerId id) { 1117 // reference store send has adequate duplicate suppression 1118 return -1; 1119 } 1120}