001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.File; 020import java.io.FileNotFoundException; 021import java.io.FilenameFilter; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.io.UnsupportedEncodingException; 025import java.nio.ByteBuffer; 026import java.nio.channels.ClosedByInterruptException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int MAX_BATCH_SIZE = 32*1024*1024; 069 070 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 071 072 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 073 public static final int RECORD_HEAD_SPACE = 4 + 1; 074 075 public static final byte USER_RECORD_TYPE = 1; 076 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 077 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 078 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 079 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 080 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 081 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 082 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 083 public static final byte EOF_EOT = '4'; 084 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 085 086 private ScheduledExecutorService scheduler; 087 088 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 089 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 090 DataFile dataFile = getDataFile(recoveryPosition); 091 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 092 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 093 try { 094 int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); 095 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); 096 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 097 098 // skip corruption on getNextLocation 099 recoveryPosition.setOffset((int) sequence.getLast() + 1); 100 recoveryPosition.setSize(-1); 101 102 dataFile.corruptedBlocks.add(sequence); 103 } catch (IOException e) { 104 } finally { 105 accessorPool.closeDataFileAccessor(reader); 106 } 107 } 108 109 public DataFileAccessorPool getAccessorPool() { 110 return accessorPool; 111 } 112 113 public enum PreallocationStrategy { 114 SPARSE_FILE, 115 OS_KERNEL_COPY, 116 ZEROS, 117 CHUNKED_ZEROS; 118 } 119 120 public enum PreallocationScope { 121 ENTIRE_JOURNAL, 122 ENTIRE_JOURNAL_ASYNC, 123 NONE; 124 } 125 126 public enum JournalDiskSyncStrategy { 127 ALWAYS, 128 PERIODIC, 129 NEVER; 130 } 131 132 private static byte[] createBatchControlRecordHeader() { 133 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 134 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 135 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 136 os.write(BATCH_CONTROL_RECORD_MAGIC); 137 ByteSequence sequence = os.toByteSequence(); 138 sequence.compact(); 139 return sequence.getData(); 140 } catch (IOException e) { 141 throw new RuntimeException("Could not create batch control record header.", e); 142 } 143 } 144 145 private static byte[] createEmptyBatchControlRecordHeader() { 146 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 147 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 148 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 149 os.write(BATCH_CONTROL_RECORD_MAGIC); 150 os.writeInt(0); 151 os.writeLong(0l); 152 ByteSequence sequence = os.toByteSequence(); 153 sequence.compact(); 154 return sequence.getData(); 155 } catch (IOException e) { 156 throw new RuntimeException("Could not create empty batch control record header.", e); 157 } 158 } 159 160 private static byte[] createEofBatchAndLocationRecord() { 161 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 162 os.writeInt(EOF_INT); 163 os.writeByte(EOF_EOT); 164 ByteSequence sequence = os.toByteSequence(); 165 sequence.compact(); 166 return sequence.getData(); 167 } catch (IOException e) { 168 throw new RuntimeException("Could not create eof header.", e); 169 } 170 } 171 172 public static final String DEFAULT_DIRECTORY = "."; 173 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 174 public static final String DEFAULT_FILE_PREFIX = "db-"; 175 public static final String DEFAULT_FILE_SUFFIX = ".log"; 176 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 177 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 178 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 179 180 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 181 182 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 183 184 protected File directory = new File(DEFAULT_DIRECTORY); 185 protected File directoryArchive; 186 private boolean directoryArchiveOverridden = false; 187 188 protected String filePrefix = DEFAULT_FILE_PREFIX; 189 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 190 protected boolean started; 191 192 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 193 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 194 195 protected FileAppender appender; 196 protected DataFileAccessorPool accessorPool; 197 198 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 199 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 200 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 201 202 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 203 protected ScheduledFuture cleanupTask; 204 protected AtomicLong totalLength = new AtomicLong(); 205 protected boolean archiveDataLogs; 206 private ReplicationTarget replicationTarget; 207 protected boolean checksum; 208 protected boolean checkForCorruptionOnStartup; 209 protected boolean enableAsyncDiskSync = true; 210 private int nextDataFileId = 1; 211 private Object dataFileIdLock = new Object(); 212 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 213 private volatile DataFile nextDataFile; 214 215 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 216 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 217 private File osKernelCopyTemplateFile = null; 218 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 219 220 public interface DataFileRemovedListener { 221 void fileRemoved(DataFile datafile); 222 } 223 224 private DataFileRemovedListener dataFileRemovedListener; 225 226 public synchronized void start() throws IOException { 227 if (started) { 228 return; 229 } 230 231 long start = System.currentTimeMillis(); 232 accessorPool = new DataFileAccessorPool(this); 233 started = true; 234 235 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 236 237 File[] files = directory.listFiles(new FilenameFilter() { 238 @Override 239 public boolean accept(File dir, String n) { 240 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 241 } 242 }); 243 244 if (files != null) { 245 for (File file : files) { 246 try { 247 String n = file.getName(); 248 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 249 int num = Integer.parseInt(numStr); 250 DataFile dataFile = new DataFile(file, num); 251 fileMap.put(dataFile.getDataFileId(), dataFile); 252 totalLength.addAndGet(dataFile.getLength()); 253 } catch (NumberFormatException e) { 254 // Ignore file that do not match the pattern. 255 } 256 } 257 258 // Sort the list so that we can link the DataFiles together in the 259 // right order. 260 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 261 Collections.sort(l); 262 for (DataFile df : l) { 263 if (df.getLength() == 0) { 264 // possibly the result of a previous failed write 265 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 266 continue; 267 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 268 continue; 269 } 270 dataFiles.addLast(df); 271 fileByFileMap.put(df.getFile(), df); 272 273 if( isCheckForCorruptionOnStartup() ) { 274 lastAppendLocation.set(recoveryCheck(df)); 275 } 276 } 277 } 278 279 if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { 280 // create a template file that will be used to pre-allocate the journal files 281 if (osKernelCopyTemplateFile == null) { 282 osKernelCopyTemplateFile = createJournalTemplateFile(); 283 } 284 } 285 286 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 287 @Override 288 public Thread newThread(Runnable r) { 289 Thread schedulerThread = new Thread(r); 290 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 291 schedulerThread.setDaemon(true); 292 return schedulerThread; 293 } 294 }); 295 296 // init current write file 297 if (dataFiles.isEmpty()) { 298 nextDataFileId = 1; 299 rotateWriteFile(); 300 } else { 301 currentDataFile.set(dataFiles.getTail()); 302 nextDataFileId = currentDataFile.get().dataFileId + 1; 303 } 304 305 if( lastAppendLocation.get()==null ) { 306 DataFile df = dataFiles.getTail(); 307 lastAppendLocation.set(recoveryCheck(df)); 308 } 309 310 // ensure we don't report unused space of last journal file in size metric 311 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 312 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 313 } 314 315 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 316 @Override 317 public void run() { 318 cleanup(); 319 } 320 }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); 321 322 long end = System.currentTimeMillis(); 323 LOG.trace("Startup took: "+(end-start)+" ms"); 324 } 325 326 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 327 328 if (PreallocationScope.NONE != preallocationScope) { 329 330 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 331 doPreallocationKernelCopy(file); 332 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 333 doPreallocationZeros(file); 334 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 335 doPreallocationChunkedZeros(file); 336 } else { 337 doPreallocationSparseFile(file); 338 } 339 } 340 } 341 342 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 343 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 344 try { 345 FileChannel channel = file.getChannel(); 346 channel.position(0); 347 channel.write(journalEof); 348 channel.position(maxFileLength - 5); 349 journalEof.rewind(); 350 channel.write(journalEof); 351 channel.force(false); 352 channel.position(0); 353 } catch (ClosedByInterruptException ignored) { 354 LOG.trace("Could not preallocate journal file with sparse file", ignored); 355 } catch (IOException e) { 356 LOG.error("Could not preallocate journal file with sparse file", e); 357 } 358 } 359 360 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 361 ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); 362 buffer.put(EOF_RECORD); 363 buffer.rewind(); 364 try { 365 FileChannel channel = file.getChannel(); 366 channel.write(buffer); 367 channel.force(false); 368 channel.position(0); 369 } catch (ClosedByInterruptException ignored) { 370 LOG.trace("Could not preallocate journal file with zeros", ignored); 371 } catch (IOException e) { 372 LOG.error("Could not preallocate journal file with zeros", e); 373 } 374 } 375 376 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 377 try { 378 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 379 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 380 templateRaf.close(); 381 } catch (ClosedByInterruptException ignored) { 382 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 383 } catch (FileNotFoundException e) { 384 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 385 } catch (IOException e) { 386 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 387 } 388 } 389 390 private File createJournalTemplateFile() { 391 String fileName = "db-log.template"; 392 File rc = new File(directory, fileName); 393 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 394 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 395 templateRaf.setLength(maxFileLength); 396 templateRaf.getChannel().force(true); 397 } catch (FileNotFoundException e) { 398 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 399 } catch (IOException e) { 400 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 401 } 402 return rc; 403 } 404 405 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 406 407 ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); 408 buffer.put(EOF_RECORD); 409 buffer.rewind(); 410 411 try { 412 FileChannel channel = file.getChannel(); 413 414 int remLen = maxFileLength; 415 while (remLen > 0) { 416 if (remLen < buffer.remaining()) { 417 buffer.limit(remLen); 418 } 419 int writeLen = channel.write(buffer); 420 remLen -= writeLen; 421 buffer.rewind(); 422 } 423 424 channel.force(false); 425 channel.position(0); 426 } catch (ClosedByInterruptException ignored) { 427 LOG.trace("Could not preallocate journal file with zeros", ignored); 428 } catch (IOException e) { 429 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 430 } 431 } 432 433 private static byte[] bytes(String string) { 434 try { 435 return string.getBytes("UTF-8"); 436 } catch (UnsupportedEncodingException e) { 437 throw new RuntimeException(e); 438 } 439 } 440 441 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 442 int firstBatchRecordSize = -1; 443 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 444 Location location = new Location(); 445 location.setDataFileId(dataFile.getDataFileId()); 446 location.setOffset(0); 447 448 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 449 try { 450 firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); 451 } catch (Exception ignored) { 452 } finally { 453 accessorPool.closeDataFileAccessor(reader); 454 } 455 } 456 return firstBatchRecordSize == 0; 457 } 458 459 protected Location recoveryCheck(DataFile dataFile) throws IOException { 460 Location location = new Location(); 461 location.setDataFileId(dataFile.getDataFileId()); 462 location.setOffset(0); 463 464 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 465 try { 466 while (true) { 467 int size = checkBatchRecord(reader, location.getOffset()); 468 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { 469 if (size == 0) { 470 // eof batch record 471 break; 472 } 473 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 474 } else { 475 476 // Perhaps it's just some corruption... scan through the 477 // file to find the next valid batch record. We 478 // may have subsequent valid batch records. 479 int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); 480 if (nextOffset >= 0) { 481 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 482 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 483 dataFile.corruptedBlocks.add(sequence); 484 location.setOffset(nextOffset); 485 } else { 486 break; 487 } 488 } 489 } 490 491 } catch (IOException e) { 492 } finally { 493 accessorPool.closeDataFileAccessor(reader); 494 } 495 496 int existingLen = dataFile.getLength(); 497 dataFile.setLength(location.getOffset()); 498 if (existingLen > dataFile.getLength()) { 499 totalLength.addAndGet(dataFile.getLength() - existingLen); 500 } 501 502 if (!dataFile.corruptedBlocks.isEmpty()) { 503 // Is the end of the data file corrupted? 504 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 505 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 506 } 507 } 508 509 return location; 510 } 511 512 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 513 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 514 byte data[] = new byte[1024*4]; 515 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 516 517 int pos = 0; 518 while (true) { 519 pos = bs.indexOf(header, pos); 520 if (pos >= 0) { 521 return offset + pos; 522 } else { 523 // need to load the next data chunck in.. 524 if (bs.length != data.length) { 525 // If we had a short read then we were at EOF 526 return -1; 527 } 528 offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; 529 bs = new ByteSequence(data, 0, reader.read(offset, data)); 530 pos = 0; 531 } 532 } 533 } 534 535 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 536 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 537 538 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) { 539 540 reader.readFully(offset, controlRecord); 541 542 // check for journal eof 543 if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) { 544 // eof batch 545 return 0; 546 } 547 548 // Assert that it's a batch record. 549 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 550 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 551 return -1; 552 } 553 } 554 555 int size = controlIs.readInt(); 556 if (size > MAX_BATCH_SIZE) { 557 return -1; 558 } 559 560 if (isChecksum()) { 561 562 long expectedChecksum = controlIs.readLong(); 563 if (expectedChecksum == 0) { 564 // Checksuming was not enabled when the record was stored. 565 // we can't validate the record :( 566 return size; 567 } 568 569 byte data[] = new byte[size]; 570 reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); 571 572 Checksum checksum = new Adler32(); 573 checksum.update(data, 0, data.length); 574 575 if (expectedChecksum != checksum.getValue()) { 576 return -1; 577 } 578 } 579 return size; 580 } 581 } 582 583 void addToTotalLength(int size) { 584 totalLength.addAndGet(size); 585 } 586 587 public long length() { 588 return totalLength.get(); 589 } 590 591 private void rotateWriteFile() throws IOException { 592 synchronized (dataFileIdLock) { 593 DataFile dataFile = nextDataFile; 594 if (dataFile == null) { 595 dataFile = newDataFile(); 596 } 597 synchronized (currentDataFile) { 598 fileMap.put(dataFile.getDataFileId(), dataFile); 599 fileByFileMap.put(dataFile.getFile(), dataFile); 600 dataFiles.addLast(dataFile); 601 currentDataFile.set(dataFile); 602 } 603 nextDataFile = null; 604 } 605 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 606 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 607 } 608 } 609 610 private Runnable preAllocateNextDataFileTask = new Runnable() { 611 @Override 612 public void run() { 613 if (nextDataFile == null) { 614 synchronized (dataFileIdLock){ 615 try { 616 nextDataFile = newDataFile(); 617 } catch (IOException e) { 618 LOG.warn("Failed to proactively allocate data file", e); 619 } 620 } 621 } 622 } 623 }; 624 625 private volatile Future preAllocateNextDataFileFuture; 626 627 private DataFile newDataFile() throws IOException { 628 int nextNum = nextDataFileId++; 629 File file = getFile(nextNum); 630 DataFile nextWriteFile = new DataFile(file, nextNum); 631 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 632 return nextWriteFile; 633 } 634 635 636 public DataFile reserveDataFile() { 637 synchronized (dataFileIdLock) { 638 int nextNum = nextDataFileId++; 639 File file = getFile(nextNum); 640 DataFile reservedDataFile = new DataFile(file, nextNum); 641 synchronized (currentDataFile) { 642 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 643 fileByFileMap.put(file, reservedDataFile); 644 if (dataFiles.isEmpty()) { 645 dataFiles.addLast(reservedDataFile); 646 } else { 647 dataFiles.getTail().linkBefore(reservedDataFile); 648 } 649 } 650 return reservedDataFile; 651 } 652 } 653 654 public File getFile(int nextNum) { 655 String fileName = filePrefix + nextNum + fileSuffix; 656 File file = new File(directory, fileName); 657 return file; 658 } 659 660 DataFile getDataFile(Location item) throws IOException { 661 Integer key = Integer.valueOf(item.getDataFileId()); 662 DataFile dataFile = null; 663 synchronized (currentDataFile) { 664 dataFile = fileMap.get(key); 665 } 666 if (dataFile == null) { 667 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 668 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 669 } 670 return dataFile; 671 } 672 673 public void close() throws IOException { 674 synchronized (this) { 675 if (!started) { 676 return; 677 } 678 cleanupTask.cancel(true); 679 if (preAllocateNextDataFileFuture != null) { 680 preAllocateNextDataFileFuture.cancel(true); 681 } 682 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 683 accessorPool.close(); 684 } 685 // the appender can be calling back to to the journal blocking a close AMQ-5620 686 appender.close(); 687 synchronized (currentDataFile) { 688 fileMap.clear(); 689 fileByFileMap.clear(); 690 dataFiles.clear(); 691 lastAppendLocation.set(null); 692 started = false; 693 } 694 } 695 696 public synchronized void cleanup() { 697 if (accessorPool != null) { 698 accessorPool.disposeUnused(); 699 } 700 } 701 702 public synchronized boolean delete() throws IOException { 703 704 // Close all open file handles... 705 appender.close(); 706 accessorPool.close(); 707 708 boolean result = true; 709 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 710 DataFile dataFile = i.next(); 711 result &= dataFile.delete(); 712 } 713 714 if (preAllocateNextDataFileFuture != null) { 715 preAllocateNextDataFileFuture.cancel(true); 716 } 717 synchronized (dataFileIdLock) { 718 if (nextDataFile != null) { 719 nextDataFile.delete(); 720 nextDataFile = null; 721 } 722 } 723 724 totalLength.set(0); 725 synchronized (currentDataFile) { 726 fileMap.clear(); 727 fileByFileMap.clear(); 728 lastAppendLocation.set(null); 729 dataFiles = new LinkedNodeList<DataFile>(); 730 } 731 // reopen open file handles... 732 accessorPool = new DataFileAccessorPool(this); 733 appender = new DataFileAppender(this); 734 return result; 735 } 736 737 public void removeDataFiles(Set<Integer> files) throws IOException { 738 for (Integer key : files) { 739 // Can't remove the data file (or subsequent files) that is currently being written to. 740 if (key >= lastAppendLocation.get().getDataFileId()) { 741 continue; 742 } 743 DataFile dataFile = null; 744 synchronized (currentDataFile) { 745 dataFile = fileMap.remove(key); 746 if (dataFile != null) { 747 fileByFileMap.remove(dataFile.getFile()); 748 dataFile.unlink(); 749 } 750 } 751 if (dataFile != null) { 752 forceRemoveDataFile(dataFile); 753 } 754 } 755 } 756 757 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 758 accessorPool.disposeDataFileAccessors(dataFile); 759 totalLength.addAndGet(-dataFile.getLength()); 760 if (archiveDataLogs) { 761 File directoryArchive = getDirectoryArchive(); 762 if (directoryArchive.exists()) { 763 LOG.debug("Archive directory exists: {}", directoryArchive); 764 } else { 765 if (directoryArchive.isAbsolute()) 766 if (LOG.isDebugEnabled()) { 767 LOG.debug("Archive directory [{}] does not exist - creating it now", 768 directoryArchive.getAbsolutePath()); 769 } 770 IOHelper.mkdirs(directoryArchive); 771 } 772 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 773 dataFile.move(directoryArchive); 774 LOG.debug("Successfully moved data file"); 775 } else { 776 LOG.debug("Deleting data file: {}", dataFile); 777 if (dataFile.delete()) { 778 LOG.debug("Discarded data file: {}", dataFile); 779 } else { 780 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 781 } 782 } 783 if (dataFileRemovedListener != null) { 784 dataFileRemovedListener.fileRemoved(dataFile); 785 } 786 } 787 788 /** 789 * @return the maxFileLength 790 */ 791 public int getMaxFileLength() { 792 return maxFileLength; 793 } 794 795 /** 796 * @param maxFileLength the maxFileLength to set 797 */ 798 public void setMaxFileLength(int maxFileLength) { 799 this.maxFileLength = maxFileLength; 800 } 801 802 @Override 803 public String toString() { 804 return directory.toString(); 805 } 806 807 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 808 Location cur = null; 809 while (true) { 810 if (cur == null) { 811 if (location == null) { 812 DataFile head = null; 813 synchronized (currentDataFile) { 814 head = dataFiles.getHead(); 815 } 816 if (head == null) { 817 return null; 818 } 819 cur = new Location(); 820 cur.setDataFileId(head.getDataFileId()); 821 cur.setOffset(0); 822 } else { 823 // Set to the next offset.. 824 if (location.getSize() == -1) { 825 cur = new Location(location); 826 } else { 827 cur = new Location(location); 828 cur.setOffset(location.getOffset() + location.getSize()); 829 } 830 } 831 } else { 832 cur.setOffset(cur.getOffset() + cur.getSize()); 833 } 834 835 DataFile dataFile = getDataFile(cur); 836 837 // Did it go into the next file?? 838 if (dataFile.getLength() <= cur.getOffset()) { 839 synchronized (currentDataFile) { 840 dataFile = dataFile.getNext(); 841 } 842 if (dataFile == null) { 843 return null; 844 } else { 845 cur.setDataFileId(dataFile.getDataFileId().intValue()); 846 cur.setOffset(0); 847 } 848 } 849 850 // Load in location size and type. 851 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 852 try { 853 reader.readLocationDetails(cur); 854 } finally { 855 accessorPool.closeDataFileAccessor(reader); 856 } 857 858 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 859 if (corruptedRange != null) { 860 // skip corruption 861 cur.setSize((int) corruptedRange.range()); 862 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 863 (cur.getType() == 0 && cur.getSize() == 0)) { 864 // eof - jump to next datafile 865 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 866 // replay of existing journals 867 // possibly journal is larger than maxFileLength after config change 868 cur.setSize(EOF_RECORD.length); 869 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 870 } else if (cur.getType() == USER_RECORD_TYPE) { 871 // Only return user records. 872 return cur; 873 } 874 } 875 } 876 877 public ByteSequence read(Location location) throws IOException, IllegalStateException { 878 DataFile dataFile = getDataFile(location); 879 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 880 ByteSequence rc = null; 881 try { 882 rc = reader.readRecord(location); 883 } finally { 884 accessorPool.closeDataFileAccessor(reader); 885 } 886 return rc; 887 } 888 889 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 890 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 891 return loc; 892 } 893 894 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 895 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 896 return loc; 897 } 898 899 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 900 DataFile dataFile = getDataFile(location); 901 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 902 try { 903 updater.updateRecord(location, data, sync); 904 } finally { 905 accessorPool.closeDataFileAccessor(updater); 906 } 907 } 908 909 public PreallocationStrategy getPreallocationStrategy() { 910 return preallocationStrategy; 911 } 912 913 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 914 this.preallocationStrategy = preallocationStrategy; 915 } 916 917 public PreallocationScope getPreallocationScope() { 918 return preallocationScope; 919 } 920 921 public void setPreallocationScope(PreallocationScope preallocationScope) { 922 this.preallocationScope = preallocationScope; 923 } 924 925 public File getDirectory() { 926 return directory; 927 } 928 929 public void setDirectory(File directory) { 930 this.directory = directory; 931 } 932 933 public String getFilePrefix() { 934 return filePrefix; 935 } 936 937 public void setFilePrefix(String filePrefix) { 938 this.filePrefix = filePrefix; 939 } 940 941 public Map<WriteKey, WriteCommand> getInflightWrites() { 942 return inflightWrites; 943 } 944 945 public Location getLastAppendLocation() { 946 return lastAppendLocation.get(); 947 } 948 949 public void setLastAppendLocation(Location lastSyncedLocation) { 950 this.lastAppendLocation.set(lastSyncedLocation); 951 } 952 953 public File getDirectoryArchive() { 954 if (!directoryArchiveOverridden && (directoryArchive == null)) { 955 // create the directoryArchive relative to the journal location 956 directoryArchive = new File(directory.getAbsolutePath() + 957 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 958 } 959 return directoryArchive; 960 } 961 962 public void setDirectoryArchive(File directoryArchive) { 963 directoryArchiveOverridden = true; 964 this.directoryArchive = directoryArchive; 965 } 966 967 public boolean isArchiveDataLogs() { 968 return archiveDataLogs; 969 } 970 971 public void setArchiveDataLogs(boolean archiveDataLogs) { 972 this.archiveDataLogs = archiveDataLogs; 973 } 974 975 public DataFile getDataFileById(int dataFileId) { 976 synchronized (currentDataFile) { 977 return fileMap.get(Integer.valueOf(dataFileId)); 978 } 979 } 980 981 public DataFile getCurrentDataFile(int capacity) throws IOException { 982 synchronized (currentDataFile) { 983 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 984 rotateWriteFile(); 985 } 986 return currentDataFile.get(); 987 } 988 } 989 990 public Integer getCurrentDataFileId() { 991 synchronized (currentDataFile) { 992 return currentDataFile.get().getDataFileId(); 993 } 994 } 995 996 /** 997 * Get a set of files - only valid after start() 998 * 999 * @return files currently being used 1000 */ 1001 public Set<File> getFiles() { 1002 synchronized (currentDataFile) { 1003 return fileByFileMap.keySet(); 1004 } 1005 } 1006 1007 public Map<Integer, DataFile> getFileMap() { 1008 synchronized (currentDataFile) { 1009 return new TreeMap<Integer, DataFile>(fileMap); 1010 } 1011 } 1012 1013 public long getDiskSize() { 1014 return totalLength.get(); 1015 } 1016 1017 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1018 this.replicationTarget = replicationTarget; 1019 } 1020 1021 public ReplicationTarget getReplicationTarget() { 1022 return replicationTarget; 1023 } 1024 1025 public String getFileSuffix() { 1026 return fileSuffix; 1027 } 1028 1029 public void setFileSuffix(String fileSuffix) { 1030 this.fileSuffix = fileSuffix; 1031 } 1032 1033 public boolean isChecksum() { 1034 return checksum; 1035 } 1036 1037 public void setChecksum(boolean checksumWrites) { 1038 this.checksum = checksumWrites; 1039 } 1040 1041 public boolean isCheckForCorruptionOnStartup() { 1042 return checkForCorruptionOnStartup; 1043 } 1044 1045 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1046 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1047 } 1048 1049 public void setWriteBatchSize(int writeBatchSize) { 1050 this.writeBatchSize = writeBatchSize; 1051 } 1052 1053 public int getWriteBatchSize() { 1054 return writeBatchSize; 1055 } 1056 1057 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1058 this.totalLength = storeSizeAccumulator; 1059 } 1060 1061 public void setEnableAsyncDiskSync(boolean val) { 1062 this.enableAsyncDiskSync = val; 1063 } 1064 1065 public boolean isEnableAsyncDiskSync() { 1066 return enableAsyncDiskSync; 1067 } 1068 1069 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1070 return journalDiskSyncStrategy; 1071 } 1072 1073 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1074 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1075 } 1076 1077 public boolean isJournalDiskSyncPeriodic() { 1078 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1079 } 1080 1081 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1082 this.dataFileRemovedListener = dataFileRemovedListener; 1083 } 1084 1085 public static class WriteCommand extends LinkedNode<WriteCommand> { 1086 public final Location location; 1087 public final ByteSequence data; 1088 final boolean sync; 1089 public final Runnable onComplete; 1090 1091 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1092 this.location = location; 1093 this.data = data; 1094 this.sync = sync; 1095 this.onComplete = null; 1096 } 1097 1098 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1099 this.location = location; 1100 this.data = data; 1101 this.onComplete = onComplete; 1102 this.sync = false; 1103 } 1104 } 1105 1106 public static class WriteKey { 1107 private final int file; 1108 private final long offset; 1109 private final int hash; 1110 1111 public WriteKey(Location item) { 1112 file = item.getDataFileId(); 1113 offset = item.getOffset(); 1114 // TODO: see if we can build a better hash 1115 hash = (int)(file ^ offset); 1116 } 1117 1118 @Override 1119 public int hashCode() { 1120 return hash; 1121 } 1122 1123 @Override 1124 public boolean equals(Object obj) { 1125 if (obj instanceof WriteKey) { 1126 WriteKey di = (WriteKey)obj; 1127 return di.file == file && di.offset == offset; 1128 } 1129 return false; 1130 } 1131 } 1132}