001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.Map.Entry; 031import java.util.concurrent.*; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.activemq.broker.ConnectionContext; 035import org.apache.activemq.broker.region.Destination; 036import org.apache.activemq.broker.region.RegionBroker; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQQueue; 039import org.apache.activemq.command.ActiveMQTempQueue; 040import org.apache.activemq.command.ActiveMQTempTopic; 041import org.apache.activemq.command.ActiveMQTopic; 042import org.apache.activemq.command.Message; 043import org.apache.activemq.command.MessageAck; 044import org.apache.activemq.command.MessageId; 045import org.apache.activemq.command.ProducerId; 046import org.apache.activemq.command.SubscriptionInfo; 047import org.apache.activemq.command.TransactionId; 048import org.apache.activemq.openwire.OpenWireFormat; 049import org.apache.activemq.protobuf.Buffer; 050import org.apache.activemq.store.AbstractMessageStore; 051import org.apache.activemq.store.MessageRecoveryListener; 052import org.apache.activemq.store.MessageStore; 053import org.apache.activemq.store.PersistenceAdapter; 054import org.apache.activemq.store.TopicMessageStore; 055import org.apache.activemq.store.TransactionStore; 056import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 057import org.apache.activemq.store.kahadb.data.KahaDestination; 058import org.apache.activemq.store.kahadb.data.KahaLocation; 059import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 060import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 061import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 062import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 063import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 064import org.apache.activemq.usage.MemoryUsage; 065import org.apache.activemq.usage.SystemUsage; 066import org.apache.activemq.util.ServiceStopper; 067import org.apache.activemq.wireformat.WireFormat; 068import org.apache.kahadb.util.ByteSequence; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071import org.apache.kahadb.journal.Location; 072import org.apache.kahadb.page.Transaction; 073 074public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 075 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 076 private static final int MAX_ASYNC_JOBS = 10000; 077 078 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 079 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 080 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 081 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 082 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 083 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 084 085 protected ExecutorService queueExecutor; 086 protected ExecutorService topicExecutor; 087 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 088 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 089 final WireFormat wireFormat = new OpenWireFormat(); 090 private SystemUsage usageManager; 091 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 092 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 093 Semaphore globalQueueSemaphore; 094 Semaphore globalTopicSemaphore; 095 private boolean concurrentStoreAndDispatchQueues = true; 096 // when true, message order may be compromised when cache is exhausted if store is out 097 // or order w.r.t cache 098 private boolean concurrentStoreAndDispatchTopics = false; 099 private boolean concurrentStoreAndDispatchTransactions = false; 100 private int maxAsyncJobs = MAX_ASYNC_JOBS; 101 private final KahaDBTransactionStore transactionStore; 102 private TransactionIdTransformer transactionIdTransformer; 103 104 public KahaDBStore() { 105 this.transactionStore = new KahaDBTransactionStore(this); 106 this.transactionIdTransformer = new TransactionIdTransformer() { 107 @Override 108 public KahaTransactionInfo transform(TransactionId txid) { 109 return TransactionIdConversion.convert(txid); 110 } 111 }; 112 } 113 114 @Override 115 public String toString() { 116 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 117 } 118 119 public void setBrokerName(String brokerName) { 120 } 121 122 public void setUsageManager(SystemUsage usageManager) { 123 this.usageManager = usageManager; 124 } 125 126 public SystemUsage getUsageManager() { 127 return this.usageManager; 128 } 129 130 /** 131 * @return the concurrentStoreAndDispatch 132 */ 133 public boolean isConcurrentStoreAndDispatchQueues() { 134 return this.concurrentStoreAndDispatchQueues; 135 } 136 137 /** 138 * @param concurrentStoreAndDispatch 139 * the concurrentStoreAndDispatch to set 140 */ 141 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 142 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 143 } 144 145 /** 146 * @return the concurrentStoreAndDispatch 147 */ 148 public boolean isConcurrentStoreAndDispatchTopics() { 149 return this.concurrentStoreAndDispatchTopics; 150 } 151 152 /** 153 * @param concurrentStoreAndDispatch 154 * the concurrentStoreAndDispatch to set 155 */ 156 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 157 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 158 } 159 160 public boolean isConcurrentStoreAndDispatchTransactions() { 161 return this.concurrentStoreAndDispatchTransactions; 162 } 163 164 /** 165 * @return the maxAsyncJobs 166 */ 167 public int getMaxAsyncJobs() { 168 return this.maxAsyncJobs; 169 } 170 /** 171 * @param maxAsyncJobs 172 * the maxAsyncJobs to set 173 */ 174 public void setMaxAsyncJobs(int maxAsyncJobs) { 175 this.maxAsyncJobs = maxAsyncJobs; 176 } 177 178 @Override 179 public void doStart() throws Exception { 180 super.doStart(); 181 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 182 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 183 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 184 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 185 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 186 asyncQueueJobQueue, new ThreadFactory() { 187 public Thread newThread(Runnable runnable) { 188 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 189 thread.setDaemon(true); 190 return thread; 191 } 192 }); 193 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 194 asyncTopicJobQueue, new ThreadFactory() { 195 public Thread newThread(Runnable runnable) { 196 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 197 thread.setDaemon(true); 198 return thread; 199 } 200 }); 201 } 202 203 @Override 204 public void doStop(ServiceStopper stopper) throws Exception { 205 // drain down async jobs 206 LOG.info("Stopping async queue tasks"); 207 if (this.globalQueueSemaphore != null) { 208 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 209 } 210 synchronized (this.asyncQueueMaps) { 211 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 212 synchronized (m) { 213 for (StoreTask task : m.values()) { 214 task.cancel(); 215 } 216 } 217 } 218 this.asyncQueueMaps.clear(); 219 } 220 LOG.info("Stopping async topic tasks"); 221 if (this.globalTopicSemaphore != null) { 222 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 223 } 224 synchronized (this.asyncTopicMaps) { 225 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 226 synchronized (m) { 227 for (StoreTask task : m.values()) { 228 task.cancel(); 229 } 230 } 231 } 232 this.asyncTopicMaps.clear(); 233 } 234 if (this.globalQueueSemaphore != null) { 235 this.globalQueueSemaphore.drainPermits(); 236 } 237 if (this.globalTopicSemaphore != null) { 238 this.globalTopicSemaphore.drainPermits(); 239 } 240 if (this.queueExecutor != null) { 241 this.queueExecutor.shutdownNow(); 242 } 243 if (this.topicExecutor != null) { 244 this.topicExecutor.shutdownNow(); 245 } 246 LOG.info("Stopped KahaDB"); 247 super.doStop(stopper); 248 } 249 250 void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException { 251 Location location; 252 this.indexLock.writeLock().lock(); 253 try { 254 location = findMessageLocation(key, destination); 255 } finally { 256 this.indexLock.writeLock().unlock(); 257 } 258 259 if (location != null) { 260 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); 261 Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 262 263 message.incrementRedeliveryCounter(); 264 if (LOG.isTraceEnabled()) { 265 LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter()); 266 } 267 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 268 addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 269 270 final Location rewriteLocation = journal.write(toByteSequence(addMessage), true); 271 272 this.indexLock.writeLock().lock(); 273 try { 274 pageFile.tx().execute(new Transaction.Closure<IOException>() { 275 public void execute(Transaction tx) throws IOException { 276 StoredDestination sd = getStoredDestination(destination, tx); 277 Long sequence = sd.messageIdIndex.get(tx, key); 278 MessageKeys keys = sd.orderIndex.get(tx, sequence); 279 sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation)); 280 } 281 }); 282 } finally { 283 this.indexLock.writeLock().unlock(); 284 } 285 } 286 } 287 288 @Override 289 void rollbackStatsOnDuplicate(KahaDestination commandDestination) { 290 if (brokerService != null) { 291 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 292 if (regionBroker != null) { 293 Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination)); 294 for (Destination destination : destinationSet) { 295 destination.getDestinationStatistics().getMessages().decrement(); 296 destination.getDestinationStatistics().getEnqueues().decrement(); 297 } 298 } 299 } 300 } 301 302 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 303 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 304 public Location execute(Transaction tx) throws IOException { 305 StoredDestination sd = getStoredDestination(destination, tx); 306 Long sequence = sd.messageIdIndex.get(tx, key); 307 if (sequence == null) { 308 return null; 309 } 310 return sd.orderIndex.get(tx, sequence).location; 311 } 312 }); 313 } 314 315 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 316 StoreQueueTask task = null; 317 synchronized (store.asyncTaskMap) { 318 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 319 } 320 return task; 321 } 322 323 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 324 synchronized (store.asyncTaskMap) { 325 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 326 } 327 this.queueExecutor.execute(task); 328 } 329 330 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 331 StoreTopicTask task = null; 332 synchronized (store.asyncTaskMap) { 333 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 334 } 335 return task; 336 } 337 338 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 339 synchronized (store.asyncTaskMap) { 340 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 341 } 342 this.topicExecutor.execute(task); 343 } 344 345 public TransactionStore createTransactionStore() throws IOException { 346 return this.transactionStore; 347 } 348 349 public boolean getForceRecoverIndex() { 350 return this.forceRecoverIndex; 351 } 352 353 public void setForceRecoverIndex(boolean forceRecoverIndex) { 354 this.forceRecoverIndex = forceRecoverIndex; 355 } 356 357 public class KahaDBMessageStore extends AbstractMessageStore { 358 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 359 protected KahaDestination dest; 360 private final int maxAsyncJobs; 361 private final Semaphore localDestinationSemaphore; 362 363 double doneTasks, canceledTasks = 0; 364 365 public KahaDBMessageStore(ActiveMQDestination destination) { 366 super(destination); 367 this.dest = convert(destination); 368 this.maxAsyncJobs = getMaxAsyncJobs(); 369 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 370 } 371 372 @Override 373 public ActiveMQDestination getDestination() { 374 return destination; 375 } 376 377 @Override 378 public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 379 throws IOException { 380 if (isConcurrentStoreAndDispatchQueues()) { 381 StoreQueueTask result = new StoreQueueTask(this, context, message); 382 result.aquireLocks(); 383 addQueueTask(this, result); 384 return result.getFuture(); 385 } else { 386 return super.asyncAddQueueMessage(context, message); 387 } 388 } 389 390 @Override 391 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 392 if (isConcurrentStoreAndDispatchQueues()) { 393 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 394 StoreQueueTask task = null; 395 synchronized (asyncTaskMap) { 396 task = (StoreQueueTask) asyncTaskMap.get(key); 397 } 398 if (task != null) { 399 if (!task.cancel()) { 400 try { 401 402 task.future.get(); 403 } catch (InterruptedException e) { 404 throw new InterruptedIOException(e.toString()); 405 } catch (Exception ignored) { 406 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 407 } 408 removeMessage(context, ack); 409 } else { 410 synchronized (asyncTaskMap) { 411 asyncTaskMap.remove(key); 412 } 413 } 414 } else { 415 removeMessage(context, ack); 416 } 417 } else { 418 removeMessage(context, ack); 419 } 420 } 421 422 public void addMessage(ConnectionContext context, Message message) throws IOException { 423 KahaAddMessageCommand command = new KahaAddMessageCommand(); 424 command.setDestination(dest); 425 command.setMessageId(message.getMessageId().toString()); 426 command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId())); 427 command.setPriority(message.getPriority()); 428 command.setPrioritySupported(isPrioritizedMessages()); 429 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 430 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 431 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); 432 433 } 434 435 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 436 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 437 command.setDestination(dest); 438 command.setMessageId(ack.getLastMessageId().toString()); 439 command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId())); 440 441 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 442 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 443 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 444 } 445 446 public void removeAllMessages(ConnectionContext context) throws IOException { 447 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 448 command.setDestination(dest); 449 store(command, true, null, null); 450 } 451 452 public Message getMessage(MessageId identity) throws IOException { 453 final String key = identity.toString(); 454 455 // Hopefully one day the page file supports concurrent read 456 // operations... but for now we must 457 // externally synchronize... 458 Location location; 459 indexLock.writeLock().lock(); 460 try { 461 location = findMessageLocation(key, dest); 462 }finally { 463 indexLock.writeLock().unlock(); 464 } 465 if (location == null) { 466 return null; 467 } 468 469 return loadMessage(location); 470 } 471 472 public int getMessageCount() throws IOException { 473 try { 474 lockAsyncJobQueue(); 475 indexLock.writeLock().lock(); 476 try { 477 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 478 public Integer execute(Transaction tx) throws IOException { 479 // Iterate through all index entries to get a count 480 // of 481 // messages in the destination. 482 StoredDestination sd = getStoredDestination(dest, tx); 483 int rc = 0; 484 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator 485 .hasNext();) { 486 iterator.next(); 487 rc++; 488 } 489 return rc; 490 } 491 }); 492 }finally { 493 indexLock.writeLock().unlock(); 494 } 495 } finally { 496 unlockAsyncJobQueue(); 497 } 498 } 499 500 @Override 501 public boolean isEmpty() throws IOException { 502 indexLock.writeLock().lock(); 503 try { 504 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 505 public Boolean execute(Transaction tx) throws IOException { 506 // Iterate through all index entries to get a count of 507 // messages in the destination. 508 StoredDestination sd = getStoredDestination(dest, tx); 509 return sd.locationIndex.isEmpty(tx); 510 } 511 }); 512 }finally { 513 indexLock.writeLock().unlock(); 514 } 515 } 516 517 public void recover(final MessageRecoveryListener listener) throws Exception { 518 // recovery may involve expiry which will modify 519 indexLock.writeLock().lock(); 520 try { 521 pageFile.tx().execute(new Transaction.Closure<Exception>() { 522 public void execute(Transaction tx) throws Exception { 523 StoredDestination sd = getStoredDestination(dest, tx); 524 sd.orderIndex.resetCursorPosition(); 525 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 526 .hasNext(); ) { 527 Entry<Long, MessageKeys> entry = iterator.next(); 528 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 529 continue; 530 } 531 Message msg = loadMessage(entry.getValue().location); 532 listener.recoverMessage(msg); 533 } 534 } 535 }); 536 }finally { 537 indexLock.writeLock().unlock(); 538 } 539 } 540 541 542 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 543 indexLock.writeLock().lock(); 544 try { 545 pageFile.tx().execute(new Transaction.Closure<Exception>() { 546 public void execute(Transaction tx) throws Exception { 547 StoredDestination sd = getStoredDestination(dest, tx); 548 Entry<Long, MessageKeys> entry = null; 549 int counter = 0; 550 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); 551 listener.hasSpace() && iterator.hasNext(); ) { 552 entry = iterator.next(); 553 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 554 continue; 555 } 556 Message msg = loadMessage(entry.getValue().location); 557 listener.recoverMessage(msg); 558 counter++; 559 if (counter >= maxReturned) { 560 break; 561 } 562 } 563 sd.orderIndex.stoppedIterating(); 564 } 565 }); 566 }finally { 567 indexLock.writeLock().unlock(); 568 } 569 } 570 571 public void resetBatching() { 572 if (pageFile.isLoaded()) { 573 indexLock.writeLock().lock(); 574 try { 575 pageFile.tx().execute(new Transaction.Closure<Exception>() { 576 public void execute(Transaction tx) throws Exception { 577 StoredDestination sd = getExistingStoredDestination(dest, tx); 578 if (sd != null) { 579 sd.orderIndex.resetCursorPosition();} 580 } 581 }); 582 } catch (Exception e) { 583 LOG.error("Failed to reset batching",e); 584 }finally { 585 indexLock.writeLock().unlock(); 586 } 587 } 588 } 589 590 @Override 591 public void setBatch(MessageId identity) throws IOException { 592 try { 593 final String key = identity.toString(); 594 lockAsyncJobQueue(); 595 596 // Hopefully one day the page file supports concurrent read 597 // operations... but for now we must 598 // externally synchronize... 599 600 indexLock.writeLock().lock(); 601 try { 602 pageFile.tx().execute(new Transaction.Closure<IOException>() { 603 public void execute(Transaction tx) throws IOException { 604 StoredDestination sd = getStoredDestination(dest, tx); 605 Long location = sd.messageIdIndex.get(tx, key); 606 if (location != null) { 607 sd.orderIndex.setBatch(tx, location); 608 } 609 } 610 }); 611 } finally { 612 indexLock.writeLock().unlock(); 613 } 614 } finally { 615 unlockAsyncJobQueue(); 616 } 617 } 618 619 @Override 620 public void setMemoryUsage(MemoryUsage memoeyUSage) { 621 } 622 @Override 623 public void start() throws Exception { 624 super.start(); 625 } 626 @Override 627 public void stop() throws Exception { 628 super.stop(); 629 } 630 631 protected void lockAsyncJobQueue() { 632 try { 633 this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 634 } catch (Exception e) { 635 LOG.error("Failed to lock async jobs for " + this.destination, e); 636 } 637 } 638 639 protected void unlockAsyncJobQueue() { 640 this.localDestinationSemaphore.release(this.maxAsyncJobs); 641 } 642 643 protected void acquireLocalAsyncLock() { 644 try { 645 this.localDestinationSemaphore.acquire(); 646 } catch (InterruptedException e) { 647 LOG.error("Failed to aquire async lock for " + this.destination, e); 648 } 649 } 650 651 protected void releaseLocalAsyncLock() { 652 this.localDestinationSemaphore.release(); 653 } 654 655 } 656 657 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 658 private final AtomicInteger subscriptionCount = new AtomicInteger(); 659 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 660 super(destination); 661 this.subscriptionCount.set(getAllSubscriptions().length); 662 asyncTopicMaps.add(asyncTaskMap); 663 } 664 665 @Override 666 public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 667 throws IOException { 668 if (isConcurrentStoreAndDispatchTopics()) { 669 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 670 result.aquireLocks(); 671 addTopicTask(this, result); 672 return result.getFuture(); 673 } else { 674 return super.asyncAddTopicMessage(context, message); 675 } 676 } 677 678 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 679 MessageId messageId, MessageAck ack) 680 throws IOException { 681 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 682 if (isConcurrentStoreAndDispatchTopics()) { 683 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 684 StoreTopicTask task = null; 685 synchronized (asyncTaskMap) { 686 task = (StoreTopicTask) asyncTaskMap.get(key); 687 } 688 if (task != null) { 689 if (task.addSubscriptionKey(subscriptionKey)) { 690 removeTopicTask(this, messageId); 691 if (task.cancel()) { 692 synchronized (asyncTaskMap) { 693 asyncTaskMap.remove(key); 694 } 695 } 696 } 697 } else { 698 doAcknowledge(context, subscriptionKey, messageId, ack); 699 } 700 } else { 701 doAcknowledge(context, subscriptionKey, messageId, ack); 702 } 703 } 704 705 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 706 throws IOException { 707 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 708 command.setDestination(dest); 709 command.setSubscriptionKey(subscriptionKey); 710 command.setMessageId(messageId.toString()); 711 command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId())); 712 if (ack != null && ack.isUnmatchedAck()) { 713 command.setAck(UNMATCHED); 714 } 715 store(command, false, null, null); 716 } 717 718 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 719 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 720 .getSubscriptionName()); 721 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 722 command.setDestination(dest); 723 command.setSubscriptionKey(subscriptionKey.toString()); 724 command.setRetroactive(retroactive); 725 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 726 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 727 store(command, isEnableJournalDiskSyncs() && true, null, null); 728 this.subscriptionCount.incrementAndGet(); 729 } 730 731 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 732 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 733 command.setDestination(dest); 734 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 735 store(command, isEnableJournalDiskSyncs() && true, null, null); 736 this.subscriptionCount.decrementAndGet(); 737 } 738 739 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 740 741 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 742 indexLock.writeLock().lock(); 743 try { 744 pageFile.tx().execute(new Transaction.Closure<IOException>() { 745 public void execute(Transaction tx) throws IOException { 746 StoredDestination sd = getStoredDestination(dest, tx); 747 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 748 .hasNext();) { 749 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 750 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 751 .getValue().getSubscriptionInfo().newInput())); 752 subscriptions.add(info); 753 754 } 755 } 756 }); 757 }finally { 758 indexLock.writeLock().unlock(); 759 } 760 761 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 762 subscriptions.toArray(rc); 763 return rc; 764 } 765 766 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 767 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 768 indexLock.writeLock().lock(); 769 try { 770 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 771 public SubscriptionInfo execute(Transaction tx) throws IOException { 772 StoredDestination sd = getStoredDestination(dest, tx); 773 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 774 if (command == null) { 775 return null; 776 } 777 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 778 .getSubscriptionInfo().newInput())); 779 } 780 }); 781 }finally { 782 indexLock.writeLock().unlock(); 783 } 784 } 785 786 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 787 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 788 indexLock.writeLock().lock(); 789 try { 790 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 791 public Integer execute(Transaction tx) throws IOException { 792 StoredDestination sd = getStoredDestination(dest, tx); 793 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 794 if (cursorPos == null) { 795 // The subscription might not exist. 796 return 0; 797 } 798 799 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 800 } 801 }); 802 }finally { 803 indexLock.writeLock().unlock(); 804 } 805 } 806 807 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 808 throws Exception { 809 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 810 @SuppressWarnings("unused") 811 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 812 indexLock.writeLock().lock(); 813 try { 814 pageFile.tx().execute(new Transaction.Closure<Exception>() { 815 public void execute(Transaction tx) throws Exception { 816 StoredDestination sd = getStoredDestination(dest, tx); 817 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 818 sd.orderIndex.setBatch(tx, cursorPos); 819 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 820 .hasNext();) { 821 Entry<Long, MessageKeys> entry = iterator.next(); 822 listener.recoverMessage(loadMessage(entry.getValue().location)); 823 } 824 sd.orderIndex.resetCursorPosition(); 825 } 826 }); 827 }finally { 828 indexLock.writeLock().unlock(); 829 } 830 } 831 832 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 833 final MessageRecoveryListener listener) throws Exception { 834 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 835 @SuppressWarnings("unused") 836 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 837 indexLock.writeLock().lock(); 838 try { 839 pageFile.tx().execute(new Transaction.Closure<Exception>() { 840 public void execute(Transaction tx) throws Exception { 841 StoredDestination sd = getStoredDestination(dest, tx); 842 sd.orderIndex.resetCursorPosition(); 843 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 844 if (moc == null) { 845 LastAck pos = getLastAck(tx, sd, subscriptionKey); 846 if (pos == null) { 847 // sub deleted 848 return; 849 } 850 sd.orderIndex.setBatch(tx, pos); 851 moc = sd.orderIndex.cursor; 852 } else { 853 sd.orderIndex.cursor.sync(moc); 854 } 855 856 Entry<Long, MessageKeys> entry = null; 857 int counter = 0; 858 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 859 .hasNext();) { 860 entry = iterator.next(); 861 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 862 counter++; 863 } 864 if (counter >= maxReturned || listener.hasSpace() == false) { 865 break; 866 } 867 } 868 sd.orderIndex.stoppedIterating(); 869 if (entry != null) { 870 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 871 sd.subscriptionCursors.put(subscriptionKey, copy); 872 } 873 } 874 }); 875 }finally { 876 indexLock.writeLock().unlock(); 877 } 878 } 879 880 public void resetBatching(String clientId, String subscriptionName) { 881 try { 882 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 883 indexLock.writeLock().lock(); 884 try { 885 pageFile.tx().execute(new Transaction.Closure<IOException>() { 886 public void execute(Transaction tx) throws IOException { 887 StoredDestination sd = getStoredDestination(dest, tx); 888 sd.subscriptionCursors.remove(subscriptionKey); 889 } 890 }); 891 }finally { 892 indexLock.writeLock().unlock(); 893 } 894 } catch (IOException e) { 895 throw new RuntimeException(e); 896 } 897 } 898 } 899 900 String subscriptionKey(String clientId, String subscriptionName) { 901 return clientId + ":" + subscriptionName; 902 } 903 904 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 905 return this.transactionStore.proxy(new KahaDBMessageStore(destination)); 906 } 907 908 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 909 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 910 } 911 912 /** 913 * Cleanup method to remove any state associated with the given destination. 914 * This method does not stop the message store (it might not be cached). 915 * 916 * @param destination 917 * Destination to forget 918 */ 919 public void removeQueueMessageStore(ActiveMQQueue destination) { 920 } 921 922 /** 923 * Cleanup method to remove any state associated with the given destination 924 * This method does not stop the message store (it might not be cached). 925 * 926 * @param destination 927 * Destination to forget 928 */ 929 public void removeTopicMessageStore(ActiveMQTopic destination) { 930 } 931 932 public void deleteAllMessages() throws IOException { 933 deleteAllMessages = true; 934 } 935 936 public Set<ActiveMQDestination> getDestinations() { 937 try { 938 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 939 indexLock.writeLock().lock(); 940 try { 941 pageFile.tx().execute(new Transaction.Closure<IOException>() { 942 public void execute(Transaction tx) throws IOException { 943 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 944 .hasNext();) { 945 Entry<String, StoredDestination> entry = iterator.next(); 946 if (!isEmptyTopic(entry, tx)) { 947 rc.add(convert(entry.getKey())); 948 } 949 } 950 } 951 952 private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) 953 throws IOException { 954 boolean isEmptyTopic = false; 955 ActiveMQDestination dest = convert(entry.getKey()); 956 if (dest.isTopic()) { 957 StoredDestination loadedStore = getStoredDestination(convert(dest), tx); 958 if (loadedStore.subscriptionAcks.isEmpty(tx)) { 959 isEmptyTopic = true; 960 } 961 } 962 return isEmptyTopic; 963 } 964 }); 965 }finally { 966 indexLock.writeLock().unlock(); 967 } 968 return rc; 969 } catch (IOException e) { 970 throw new RuntimeException(e); 971 } 972 } 973 974 public long getLastMessageBrokerSequenceId() throws IOException { 975 return 0; 976 } 977 978 public long getLastProducerSequenceId(ProducerId id) { 979 indexLock.readLock().lock(); 980 try { 981 return metadata.producerSequenceIdTracker.getLastSeqId(id); 982 } finally { 983 indexLock.readLock().unlock(); 984 } 985 } 986 987 public long size() { 988 return storeSize.get(); 989 } 990 991 public void beginTransaction(ConnectionContext context) throws IOException { 992 throw new IOException("Not yet implemented."); 993 } 994 public void commitTransaction(ConnectionContext context) throws IOException { 995 throw new IOException("Not yet implemented."); 996 } 997 public void rollbackTransaction(ConnectionContext context) throws IOException { 998 throw new IOException("Not yet implemented."); 999 } 1000 1001 public void checkpoint(boolean sync) throws IOException { 1002 super.checkpointCleanup(sync); 1003 } 1004 1005 // ///////////////////////////////////////////////////////////////// 1006 // Internal helper methods. 1007 // ///////////////////////////////////////////////////////////////// 1008 1009 /** 1010 * @param location 1011 * @return 1012 * @throws IOException 1013 */ 1014 Message loadMessage(Location location) throws IOException { 1015 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); 1016 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1017 return msg; 1018 } 1019 1020 // ///////////////////////////////////////////////////////////////// 1021 // Internal conversion methods. 1022 // ///////////////////////////////////////////////////////////////// 1023 1024 KahaLocation convert(Location location) { 1025 KahaLocation rc = new KahaLocation(); 1026 rc.setLogId(location.getDataFileId()); 1027 rc.setOffset(location.getOffset()); 1028 return rc; 1029 } 1030 1031 KahaDestination convert(ActiveMQDestination dest) { 1032 KahaDestination rc = new KahaDestination(); 1033 rc.setName(dest.getPhysicalName()); 1034 switch (dest.getDestinationType()) { 1035 case ActiveMQDestination.QUEUE_TYPE: 1036 rc.setType(DestinationType.QUEUE); 1037 return rc; 1038 case ActiveMQDestination.TOPIC_TYPE: 1039 rc.setType(DestinationType.TOPIC); 1040 return rc; 1041 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1042 rc.setType(DestinationType.TEMP_QUEUE); 1043 return rc; 1044 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1045 rc.setType(DestinationType.TEMP_TOPIC); 1046 return rc; 1047 default: 1048 return null; 1049 } 1050 } 1051 1052 ActiveMQDestination convert(String dest) { 1053 int p = dest.indexOf(":"); 1054 if (p < 0) { 1055 throw new IllegalArgumentException("Not in the valid destination format"); 1056 } 1057 int type = Integer.parseInt(dest.substring(0, p)); 1058 String name = dest.substring(p + 1); 1059 return convert(type, name); 1060 } 1061 1062 private ActiveMQDestination convert(KahaDestination commandDestination) { 1063 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1064 } 1065 1066 private ActiveMQDestination convert(int type, String name) { 1067 switch (KahaDestination.DestinationType.valueOf(type)) { 1068 case QUEUE: 1069 return new ActiveMQQueue(name); 1070 case TOPIC: 1071 return new ActiveMQTopic(name); 1072 case TEMP_QUEUE: 1073 return new ActiveMQTempQueue(name); 1074 case TEMP_TOPIC: 1075 return new ActiveMQTempTopic(name); 1076 default: 1077 throw new IllegalArgumentException("Not in the valid destination format"); 1078 } 1079 } 1080 1081 public TransactionIdTransformer getTransactionIdTransformer() { 1082 return transactionIdTransformer; 1083 } 1084 1085 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1086 this.transactionIdTransformer = transactionIdTransformer; 1087 } 1088 1089 static class AsyncJobKey { 1090 MessageId id; 1091 ActiveMQDestination destination; 1092 1093 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1094 this.id = id; 1095 this.destination = destination; 1096 } 1097 1098 @Override 1099 public boolean equals(Object obj) { 1100 if (obj == this) { 1101 return true; 1102 } 1103 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1104 && destination.equals(((AsyncJobKey) obj).destination); 1105 } 1106 1107 @Override 1108 public int hashCode() { 1109 return id.hashCode() + destination.hashCode(); 1110 } 1111 1112 @Override 1113 public String toString() { 1114 return destination.getPhysicalName() + "-" + id; 1115 } 1116 } 1117 1118 public interface StoreTask { 1119 public boolean cancel(); 1120 1121 public void aquireLocks(); 1122 1123 public void releaseLocks(); 1124 } 1125 1126 class StoreQueueTask implements Runnable, StoreTask { 1127 protected final Message message; 1128 protected final ConnectionContext context; 1129 protected final KahaDBMessageStore store; 1130 protected final InnerFutureTask future; 1131 protected final AtomicBoolean done = new AtomicBoolean(); 1132 protected final AtomicBoolean locked = new AtomicBoolean(); 1133 1134 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1135 this.store = store; 1136 this.context = context; 1137 this.message = message; 1138 this.future = new InnerFutureTask(this); 1139 } 1140 1141 public Future<Object> getFuture() { 1142 return this.future; 1143 } 1144 1145 public boolean cancel() { 1146 if (this.done.compareAndSet(false, true)) { 1147 return this.future.cancel(false); 1148 } 1149 return false; 1150 } 1151 1152 public void aquireLocks() { 1153 if (this.locked.compareAndSet(false, true)) { 1154 try { 1155 globalQueueSemaphore.acquire(); 1156 store.acquireLocalAsyncLock(); 1157 message.incrementReferenceCount(); 1158 } catch (InterruptedException e) { 1159 LOG.warn("Failed to aquire lock", e); 1160 } 1161 } 1162 1163 } 1164 1165 public void releaseLocks() { 1166 if (this.locked.compareAndSet(true, false)) { 1167 store.releaseLocalAsyncLock(); 1168 globalQueueSemaphore.release(); 1169 message.decrementReferenceCount(); 1170 } 1171 } 1172 1173 public void run() { 1174 this.store.doneTasks++; 1175 try { 1176 if (this.done.compareAndSet(false, true)) { 1177 this.store.addMessage(context, message); 1178 removeQueueTask(this.store, this.message.getMessageId()); 1179 this.future.complete(); 1180 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1181 System.err.println(this.store.dest.getName() + " cancelled: " 1182 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1183 this.store.canceledTasks = this.store.doneTasks = 0; 1184 } 1185 } catch (Exception e) { 1186 this.future.setException(e); 1187 } 1188 } 1189 1190 protected Message getMessage() { 1191 return this.message; 1192 } 1193 1194 private class InnerFutureTask extends FutureTask<Object> { 1195 1196 public InnerFutureTask(Runnable runnable) { 1197 super(runnable, null); 1198 1199 } 1200 1201 public void setException(final Exception e) { 1202 super.setException(e); 1203 } 1204 1205 public void complete() { 1206 super.set(null); 1207 } 1208 } 1209 } 1210 1211 class StoreTopicTask extends StoreQueueTask { 1212 private final int subscriptionCount; 1213 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1214 private final KahaDBTopicMessageStore topicStore; 1215 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1216 int subscriptionCount) { 1217 super(store, context, message); 1218 this.topicStore = store; 1219 this.subscriptionCount = subscriptionCount; 1220 1221 } 1222 1223 @Override 1224 public void aquireLocks() { 1225 if (this.locked.compareAndSet(false, true)) { 1226 try { 1227 globalTopicSemaphore.acquire(); 1228 store.acquireLocalAsyncLock(); 1229 message.incrementReferenceCount(); 1230 } catch (InterruptedException e) { 1231 LOG.warn("Failed to aquire lock", e); 1232 } 1233 } 1234 1235 } 1236 1237 @Override 1238 public void releaseLocks() { 1239 if (this.locked.compareAndSet(true, false)) { 1240 message.decrementReferenceCount(); 1241 store.releaseLocalAsyncLock(); 1242 globalTopicSemaphore.release(); 1243 } 1244 } 1245 1246 /** 1247 * add a key 1248 * 1249 * @param key 1250 * @return true if all acknowledgements received 1251 */ 1252 public boolean addSubscriptionKey(String key) { 1253 synchronized (this.subscriptionKeys) { 1254 this.subscriptionKeys.add(key); 1255 } 1256 return this.subscriptionKeys.size() >= this.subscriptionCount; 1257 } 1258 1259 @Override 1260 public void run() { 1261 this.store.doneTasks++; 1262 try { 1263 if (this.done.compareAndSet(false, true)) { 1264 this.topicStore.addMessage(context, message); 1265 // apply any acks we have 1266 synchronized (this.subscriptionKeys) { 1267 for (String key : this.subscriptionKeys) { 1268 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1269 1270 } 1271 } 1272 removeTopicTask(this.topicStore, this.message.getMessageId()); 1273 this.future.complete(); 1274 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1275 System.err.println(this.store.dest.getName() + " cancelled: " 1276 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1277 this.store.canceledTasks = this.store.doneTasks = 0; 1278 } 1279 } catch (Exception e) { 1280 this.future.setException(e); 1281 } 1282 } 1283 } 1284 1285 public class StoreTaskExecutor extends ThreadPoolExecutor { 1286 1287 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1288 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1289 } 1290 1291 protected void afterExecute(Runnable runnable, Throwable throwable) { 1292 super.afterExecute(runnable, throwable); 1293 1294 if (runnable instanceof StoreTask) { 1295 ((StoreTask)runnable).releaseLocks(); 1296 } 1297 1298 } 1299 } 1300}