001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.activemq.broker.ConnectionContext; 044import org.apache.activemq.broker.region.BaseDestination; 045import org.apache.activemq.broker.scheduler.JobSchedulerStore; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQQueue; 048import org.apache.activemq.command.ActiveMQTempQueue; 049import org.apache.activemq.command.ActiveMQTempTopic; 050import org.apache.activemq.command.ActiveMQTopic; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.MessageId; 054import org.apache.activemq.command.ProducerId; 055import org.apache.activemq.command.SubscriptionInfo; 056import org.apache.activemq.command.TransactionId; 057import org.apache.activemq.openwire.OpenWireFormat; 058import org.apache.activemq.protobuf.Buffer; 059import org.apache.activemq.store.AbstractMessageStore; 060import org.apache.activemq.store.IndexListener; 061import org.apache.activemq.store.ListenableFuture; 062import org.apache.activemq.store.MessageRecoveryListener; 063import org.apache.activemq.store.MessageStore; 064import org.apache.activemq.store.MessageStoreStatistics; 065import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 066import org.apache.activemq.store.PersistenceAdapter; 067import org.apache.activemq.store.TopicMessageStore; 068import org.apache.activemq.store.TransactionIdTransformer; 069import org.apache.activemq.store.TransactionStore; 070import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 071import org.apache.activemq.store.kahadb.data.KahaDestination; 072import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 073import org.apache.activemq.store.kahadb.data.KahaLocation; 074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 076import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 077import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 078import org.apache.activemq.store.kahadb.disk.journal.Location; 079import org.apache.activemq.store.kahadb.disk.page.Transaction; 080import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 081import org.apache.activemq.usage.MemoryUsage; 082import org.apache.activemq.usage.SystemUsage; 083import org.apache.activemq.util.ServiceStopper; 084import org.apache.activemq.util.ThreadPoolUtils; 085import org.apache.activemq.wireformat.WireFormat; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 090 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 091 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 092 093 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 094 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 095 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 096 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 097 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 098 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 099 100 protected ExecutorService queueExecutor; 101 protected ExecutorService topicExecutor; 102 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 103 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 104 final WireFormat wireFormat = new OpenWireFormat(); 105 private SystemUsage usageManager; 106 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 107 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 108 Semaphore globalQueueSemaphore; 109 Semaphore globalTopicSemaphore; 110 private boolean concurrentStoreAndDispatchQueues = true; 111 // when true, message order may be compromised when cache is exhausted if store is out 112 // or order w.r.t cache 113 private boolean concurrentStoreAndDispatchTopics = false; 114 private final boolean concurrentStoreAndDispatchTransactions = false; 115 private int maxAsyncJobs = MAX_ASYNC_JOBS; 116 private final KahaDBTransactionStore transactionStore; 117 private TransactionIdTransformer transactionIdTransformer; 118 119 public KahaDBStore() { 120 this.transactionStore = new KahaDBTransactionStore(this); 121 this.transactionIdTransformer = new TransactionIdTransformer() { 122 @Override 123 public TransactionId transform(TransactionId txid) { 124 return txid; 125 } 126 }; 127 } 128 129 @Override 130 public String toString() { 131 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 132 } 133 134 @Override 135 public void setBrokerName(String brokerName) { 136 } 137 138 @Override 139 public void setUsageManager(SystemUsage usageManager) { 140 this.usageManager = usageManager; 141 } 142 143 public SystemUsage getUsageManager() { 144 return this.usageManager; 145 } 146 147 /** 148 * @return the concurrentStoreAndDispatch 149 */ 150 public boolean isConcurrentStoreAndDispatchQueues() { 151 return this.concurrentStoreAndDispatchQueues; 152 } 153 154 /** 155 * @param concurrentStoreAndDispatch 156 * the concurrentStoreAndDispatch to set 157 */ 158 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 159 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 160 } 161 162 /** 163 * @return the concurrentStoreAndDispatch 164 */ 165 public boolean isConcurrentStoreAndDispatchTopics() { 166 return this.concurrentStoreAndDispatchTopics; 167 } 168 169 /** 170 * @param concurrentStoreAndDispatch 171 * the concurrentStoreAndDispatch to set 172 */ 173 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 174 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 175 } 176 177 public boolean isConcurrentStoreAndDispatchTransactions() { 178 return this.concurrentStoreAndDispatchTransactions; 179 } 180 181 /** 182 * @return the maxAsyncJobs 183 */ 184 public int getMaxAsyncJobs() { 185 return this.maxAsyncJobs; 186 } 187 188 /** 189 * @param maxAsyncJobs 190 * the maxAsyncJobs to set 191 */ 192 public void setMaxAsyncJobs(int maxAsyncJobs) { 193 this.maxAsyncJobs = maxAsyncJobs; 194 } 195 196 197 @Override 198 protected void configureMetadata() { 199 if (brokerService != null) { 200 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 201 wireFormat.setVersion(metadata.openwireVersion); 202 203 if (LOG.isDebugEnabled()) { 204 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 205 } 206 207 } 208 } 209 210 @Override 211 public void doStart() throws Exception { 212 //configure the metadata before start, right now 213 //this is just the open wire version 214 configureMetadata(); 215 216 super.doStart(); 217 218 if (brokerService != null) { 219 // In case the recovered store used a different OpenWire version log a warning 220 // to assist in determining why journal reads fail. 221 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 222 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 223 "than the version configured[{}] reverting to the version " + 224 "used by this store, some newer broker features may not work" + 225 "as expected.", 226 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 227 228 // Update the broker service instance to the actual version in use. 229 wireFormat.setVersion(metadata.openwireVersion); 230 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 231 } 232 } 233 234 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 235 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 236 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 237 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 238 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 239 asyncQueueJobQueue, new ThreadFactory() { 240 @Override 241 public Thread newThread(Runnable runnable) { 242 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 243 thread.setDaemon(true); 244 return thread; 245 } 246 }); 247 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 248 asyncTopicJobQueue, new ThreadFactory() { 249 @Override 250 public Thread newThread(Runnable runnable) { 251 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 252 thread.setDaemon(true); 253 return thread; 254 } 255 }); 256 } 257 258 @Override 259 public void doStop(ServiceStopper stopper) throws Exception { 260 // drain down async jobs 261 LOG.info("Stopping async queue tasks"); 262 if (this.globalQueueSemaphore != null) { 263 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 264 } 265 synchronized (this.asyncQueueMaps) { 266 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 267 synchronized (m) { 268 for (StoreTask task : m.values()) { 269 task.cancel(); 270 } 271 } 272 } 273 this.asyncQueueMaps.clear(); 274 } 275 LOG.info("Stopping async topic tasks"); 276 if (this.globalTopicSemaphore != null) { 277 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 278 } 279 synchronized (this.asyncTopicMaps) { 280 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 281 synchronized (m) { 282 for (StoreTask task : m.values()) { 283 task.cancel(); 284 } 285 } 286 } 287 this.asyncTopicMaps.clear(); 288 } 289 if (this.globalQueueSemaphore != null) { 290 this.globalQueueSemaphore.drainPermits(); 291 } 292 if (this.globalTopicSemaphore != null) { 293 this.globalTopicSemaphore.drainPermits(); 294 } 295 if (this.queueExecutor != null) { 296 ThreadPoolUtils.shutdownNow(queueExecutor); 297 queueExecutor = null; 298 } 299 if (this.topicExecutor != null) { 300 ThreadPoolUtils.shutdownNow(topicExecutor); 301 topicExecutor = null; 302 } 303 LOG.info("Stopped KahaDB"); 304 super.doStop(stopper); 305 } 306 307 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 308 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 309 @Override 310 public Location execute(Transaction tx) throws IOException { 311 StoredDestination sd = getStoredDestination(destination, tx); 312 Long sequence = sd.messageIdIndex.get(tx, key); 313 if (sequence == null) { 314 return null; 315 } 316 return sd.orderIndex.get(tx, sequence).location; 317 } 318 }); 319 } 320 321 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 322 StoreQueueTask task = null; 323 synchronized (store.asyncTaskMap) { 324 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 325 } 326 return task; 327 } 328 329 // with asyncTaskMap locked 330 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 331 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 332 this.queueExecutor.execute(task); 333 } 334 335 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 336 StoreTopicTask task = null; 337 synchronized (store.asyncTaskMap) { 338 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 339 } 340 return task; 341 } 342 343 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 344 synchronized (store.asyncTaskMap) { 345 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 346 } 347 this.topicExecutor.execute(task); 348 } 349 350 @Override 351 public TransactionStore createTransactionStore() throws IOException { 352 return this.transactionStore; 353 } 354 355 public boolean getForceRecoverIndex() { 356 return this.forceRecoverIndex; 357 } 358 359 public void setForceRecoverIndex(boolean forceRecoverIndex) { 360 this.forceRecoverIndex = forceRecoverIndex; 361 } 362 363 public class KahaDBMessageStore extends AbstractMessageStore { 364 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 365 protected KahaDestination dest; 366 private final int maxAsyncJobs; 367 private final Semaphore localDestinationSemaphore; 368 369 double doneTasks, canceledTasks = 0; 370 371 public KahaDBMessageStore(ActiveMQDestination destination) { 372 super(destination); 373 this.dest = convert(destination); 374 this.maxAsyncJobs = getMaxAsyncJobs(); 375 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 376 } 377 378 @Override 379 public ActiveMQDestination getDestination() { 380 return destination; 381 } 382 383 @Override 384 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 385 throws IOException { 386 if (isConcurrentStoreAndDispatchQueues()) { 387 message.beforeMarshall(wireFormat); 388 StoreQueueTask result = new StoreQueueTask(this, context, message); 389 ListenableFuture<Object> future = result.getFuture(); 390 message.getMessageId().setFutureOrSequenceLong(future); 391 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 392 result.aquireLocks(); 393 synchronized (asyncTaskMap) { 394 addQueueTask(this, result); 395 if (indexListener != null) { 396 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 397 } 398 } 399 return future; 400 } else { 401 return super.asyncAddQueueMessage(context, message); 402 } 403 } 404 405 @Override 406 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 407 if (isConcurrentStoreAndDispatchQueues()) { 408 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 409 StoreQueueTask task = null; 410 synchronized (asyncTaskMap) { 411 task = (StoreQueueTask) asyncTaskMap.get(key); 412 } 413 if (task != null) { 414 if (ack.isInTransaction() || !task.cancel()) { 415 try { 416 task.future.get(); 417 } catch (InterruptedException e) { 418 throw new InterruptedIOException(e.toString()); 419 } catch (Exception ignored) { 420 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 421 } 422 removeMessage(context, ack); 423 } else { 424 synchronized (asyncTaskMap) { 425 asyncTaskMap.remove(key); 426 } 427 } 428 } else { 429 removeMessage(context, ack); 430 } 431 } else { 432 removeMessage(context, ack); 433 } 434 } 435 436 @Override 437 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 438 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 439 command.setDestination(dest); 440 command.setMessageId(message.getMessageId().toProducerKey()); 441 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 442 command.setPriority(message.getPriority()); 443 command.setPrioritySupported(isPrioritizedMessages()); 444 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 445 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 446 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 447 // sync add? (for async, future present from getFutureOrSequenceLong) 448 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 449 450 @Override 451 public void sequenceAssignedWithIndexLocked(final long sequence) { 452 message.getMessageId().setFutureOrSequenceLong(sequence); 453 if (indexListener != null) { 454 if (possibleFuture == null) { 455 trackPendingAdd(dest, sequence); 456 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 457 @Override 458 public void run() { 459 trackPendingAddComplete(dest, sequence); 460 } 461 })); 462 } 463 } 464 } 465 }, null); 466 } 467 468 @Override 469 public void updateMessage(Message message) throws IOException { 470 if (LOG.isTraceEnabled()) { 471 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 472 } 473 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 474 KahaAddMessageCommand command = new KahaAddMessageCommand(); 475 command.setDestination(dest); 476 command.setMessageId(message.getMessageId().toProducerKey()); 477 command.setPriority(message.getPriority()); 478 command.setPrioritySupported(prioritizedMessages); 479 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 480 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 481 updateMessageCommand.setMessage(command); 482 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 483 } 484 485 @Override 486 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 487 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 488 command.setDestination(dest); 489 command.setMessageId(ack.getLastMessageId().toProducerKey()); 490 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 491 492 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 493 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 494 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 495 } 496 497 @Override 498 public void removeAllMessages(ConnectionContext context) throws IOException { 499 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 500 command.setDestination(dest); 501 store(command, true, null, null); 502 } 503 504 @Override 505 public Message getMessage(MessageId identity) throws IOException { 506 final String key = identity.toProducerKey(); 507 508 // Hopefully one day the page file supports concurrent read 509 // operations... but for now we must 510 // externally synchronize... 511 Location location; 512 indexLock.writeLock().lock(); 513 try { 514 location = findMessageLocation(key, dest); 515 } finally { 516 indexLock.writeLock().unlock(); 517 } 518 if (location == null) { 519 return null; 520 } 521 522 return loadMessage(location); 523 } 524 525 @Override 526 public boolean isEmpty() throws IOException { 527 indexLock.writeLock().lock(); 528 try { 529 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 530 @Override 531 public Boolean execute(Transaction tx) throws IOException { 532 // Iterate through all index entries to get a count of 533 // messages in the destination. 534 StoredDestination sd = getStoredDestination(dest, tx); 535 return sd.locationIndex.isEmpty(tx); 536 } 537 }); 538 } finally { 539 indexLock.writeLock().unlock(); 540 } 541 } 542 543 @Override 544 public void recover(final MessageRecoveryListener listener) throws Exception { 545 // recovery may involve expiry which will modify 546 indexLock.writeLock().lock(); 547 try { 548 pageFile.tx().execute(new Transaction.Closure<Exception>() { 549 @Override 550 public void execute(Transaction tx) throws Exception { 551 StoredDestination sd = getStoredDestination(dest, tx); 552 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 553 sd.orderIndex.resetCursorPosition(); 554 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 555 .hasNext(); ) { 556 Entry<Long, MessageKeys> entry = iterator.next(); 557 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 558 continue; 559 } 560 Message msg = loadMessage(entry.getValue().location); 561 listener.recoverMessage(msg); 562 } 563 } 564 }); 565 } finally { 566 indexLock.writeLock().unlock(); 567 } 568 } 569 570 @Override 571 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 572 indexLock.writeLock().lock(); 573 try { 574 pageFile.tx().execute(new Transaction.Closure<Exception>() { 575 @Override 576 public void execute(Transaction tx) throws Exception { 577 StoredDestination sd = getStoredDestination(dest, tx); 578 Entry<Long, MessageKeys> entry = null; 579 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 580 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 581 entry = iterator.next(); 582 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 583 continue; 584 } 585 Message msg = loadMessage(entry.getValue().location); 586 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 587 listener.recoverMessage(msg); 588 counter++; 589 if (counter >= maxReturned) { 590 break; 591 } 592 } 593 sd.orderIndex.stoppedIterating(); 594 } 595 }); 596 } finally { 597 indexLock.writeLock().unlock(); 598 } 599 } 600 601 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 602 int counter = 0; 603 String id; 604 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 605 id = iterator.next(); 606 iterator.remove(); 607 Long sequence = sd.messageIdIndex.get(tx, id); 608 if (sequence != null) { 609 if (sd.orderIndex.alreadyDispatched(sequence)) { 610 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 611 counter++; 612 if (counter >= maxReturned) { 613 break; 614 } 615 } else { 616 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 617 } 618 } else { 619 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 620 } 621 } 622 return counter; 623 } 624 625 626 @Override 627 public void resetBatching() { 628 if (pageFile.isLoaded()) { 629 indexLock.writeLock().lock(); 630 try { 631 pageFile.tx().execute(new Transaction.Closure<Exception>() { 632 @Override 633 public void execute(Transaction tx) throws Exception { 634 StoredDestination sd = getExistingStoredDestination(dest, tx); 635 if (sd != null) { 636 sd.orderIndex.resetCursorPosition();} 637 } 638 }); 639 } catch (Exception e) { 640 LOG.error("Failed to reset batching",e); 641 } finally { 642 indexLock.writeLock().unlock(); 643 } 644 } 645 } 646 647 @Override 648 public void setBatch(final MessageId identity) throws IOException { 649 indexLock.writeLock().lock(); 650 try { 651 pageFile.tx().execute(new Transaction.Closure<IOException>() { 652 @Override 653 public void execute(Transaction tx) throws IOException { 654 StoredDestination sd = getStoredDestination(dest, tx); 655 Long location = (Long) identity.getFutureOrSequenceLong(); 656 Long pending = sd.orderIndex.minPendingAdd(); 657 if (pending != null) { 658 location = Math.min(location, pending-1); 659 } 660 sd.orderIndex.setBatch(tx, location); 661 } 662 }); 663 } finally { 664 indexLock.writeLock().unlock(); 665 } 666 } 667 668 @Override 669 public void setMemoryUsage(MemoryUsage memoryUsage) { 670 } 671 @Override 672 public void start() throws Exception { 673 super.start(); 674 } 675 @Override 676 public void stop() throws Exception { 677 super.stop(); 678 } 679 680 protected void lockAsyncJobQueue() { 681 try { 682 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 683 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 684 } 685 } catch (Exception e) { 686 LOG.error("Failed to lock async jobs for " + this.destination, e); 687 } 688 } 689 690 protected void unlockAsyncJobQueue() { 691 this.localDestinationSemaphore.release(this.maxAsyncJobs); 692 } 693 694 protected void acquireLocalAsyncLock() { 695 try { 696 this.localDestinationSemaphore.acquire(); 697 } catch (InterruptedException e) { 698 LOG.error("Failed to aquire async lock for " + this.destination, e); 699 } 700 } 701 702 protected void releaseLocalAsyncLock() { 703 this.localDestinationSemaphore.release(); 704 } 705 706 @Override 707 public String toString(){ 708 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 709 } 710 711 @Override 712 protected void recoverMessageStoreStatistics() throws IOException { 713 try { 714 MessageStoreStatistics recoveredStatistics; 715 lockAsyncJobQueue(); 716 indexLock.writeLock().lock(); 717 try { 718 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 719 @Override 720 public MessageStoreStatistics execute(Transaction tx) throws IOException { 721 MessageStoreStatistics statistics = new MessageStoreStatistics(); 722 723 // Iterate through all index entries to get the size of each message 724 StoredDestination sd = getStoredDestination(dest, tx); 725 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 726 int locationSize = iterator.next().getKey().getSize(); 727 statistics.getMessageCount().increment(); 728 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 729 } 730 return statistics; 731 } 732 }); 733 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 734 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 735 } finally { 736 indexLock.writeLock().unlock(); 737 } 738 } finally { 739 unlockAsyncJobQueue(); 740 } 741 } 742 } 743 744 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 745 private final AtomicInteger subscriptionCount = new AtomicInteger(); 746 protected final MessageStoreSubscriptionStatistics messageStoreSubStats = 747 new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics()); 748 749 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 750 super(destination); 751 this.subscriptionCount.set(getAllSubscriptions().length); 752 if (isConcurrentStoreAndDispatchTopics()) { 753 asyncTopicMaps.add(asyncTaskMap); 754 } 755 } 756 757 @Override 758 protected void recoverMessageStoreStatistics() throws IOException { 759 super.recoverMessageStoreStatistics(); 760 this.recoverMessageStoreSubMetrics(); 761 } 762 763 @Override 764 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 765 throws IOException { 766 if (isConcurrentStoreAndDispatchTopics()) { 767 message.beforeMarshall(wireFormat); 768 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 769 result.aquireLocks(); 770 addTopicTask(this, result); 771 return result.getFuture(); 772 } else { 773 return super.asyncAddTopicMessage(context, message); 774 } 775 } 776 777 @Override 778 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 779 MessageId messageId, MessageAck ack) throws IOException { 780 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 781 if (isConcurrentStoreAndDispatchTopics()) { 782 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 783 StoreTopicTask task = null; 784 synchronized (asyncTaskMap) { 785 task = (StoreTopicTask) asyncTaskMap.get(key); 786 } 787 if (task != null) { 788 if (task.addSubscriptionKey(subscriptionKey)) { 789 removeTopicTask(this, messageId); 790 if (task.cancel()) { 791 synchronized (asyncTaskMap) { 792 asyncTaskMap.remove(key); 793 } 794 } 795 } 796 } else { 797 doAcknowledge(context, subscriptionKey, messageId, ack); 798 } 799 } else { 800 doAcknowledge(context, subscriptionKey, messageId, ack); 801 } 802 } 803 804 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 805 throws IOException { 806 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 807 command.setDestination(dest); 808 command.setSubscriptionKey(subscriptionKey); 809 command.setMessageId(messageId.toProducerKey()); 810 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 811 if (ack != null && ack.isUnmatchedAck()) { 812 command.setAck(UNMATCHED); 813 } else { 814 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 815 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 816 } 817 store(command, false, null, null); 818 } 819 820 @Override 821 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 822 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 823 .getSubscriptionName()); 824 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 825 command.setDestination(dest); 826 command.setSubscriptionKey(subscriptionKey.toString()); 827 command.setRetroactive(retroactive); 828 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 829 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 830 store(command, isEnableJournalDiskSyncs() && true, null, null); 831 this.subscriptionCount.incrementAndGet(); 832 } 833 834 @Override 835 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 836 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 837 command.setDestination(dest); 838 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 839 store(command, isEnableJournalDiskSyncs() && true, null, null); 840 this.subscriptionCount.decrementAndGet(); 841 } 842 843 @Override 844 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 845 846 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 847 indexLock.writeLock().lock(); 848 try { 849 pageFile.tx().execute(new Transaction.Closure<IOException>() { 850 @Override 851 public void execute(Transaction tx) throws IOException { 852 StoredDestination sd = getStoredDestination(dest, tx); 853 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 854 .hasNext();) { 855 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 856 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 857 .getValue().getSubscriptionInfo().newInput())); 858 subscriptions.add(info); 859 860 } 861 } 862 }); 863 } finally { 864 indexLock.writeLock().unlock(); 865 } 866 867 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 868 subscriptions.toArray(rc); 869 return rc; 870 } 871 872 @Override 873 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 874 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 875 indexLock.writeLock().lock(); 876 try { 877 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 878 @Override 879 public SubscriptionInfo execute(Transaction tx) throws IOException { 880 StoredDestination sd = getStoredDestination(dest, tx); 881 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 882 if (command == null) { 883 return null; 884 } 885 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 886 .getSubscriptionInfo().newInput())); 887 } 888 }); 889 } finally { 890 indexLock.writeLock().unlock(); 891 } 892 } 893 894 @Override 895 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 896 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 897 898 if (isEnableSubscriptionStatistics()) { 899 return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); 900 } else { 901 902 indexLock.writeLock().lock(); 903 try { 904 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 905 @Override 906 public Integer execute(Transaction tx) throws IOException { 907 StoredDestination sd = getStoredDestination(dest, tx); 908 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 909 if (cursorPos == null) { 910 // The subscription might not exist. 911 return 0; 912 } 913 914 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 915 } 916 }); 917 } finally { 918 indexLock.writeLock().unlock(); 919 } 920 } 921 } 922 923 924 @Override 925 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 926 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 927 if (isEnableSubscriptionStatistics()) { 928 return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); 929 } else { 930 indexLock.writeLock().lock(); 931 try { 932 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 933 @Override 934 public Integer execute(Transaction tx) throws IOException { 935 StoredDestination sd = getStoredDestination(dest, tx); 936 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 937 if (cursorPos == null) { 938 // The subscription might not exist. 939 return 0; 940 } 941 942 return (int) getStoredMessageSize(tx, sd, subscriptionKey); 943 } 944 }); 945 } finally { 946 indexLock.writeLock().unlock(); 947 } 948 } 949 } 950 951 952 protected void recoverMessageStoreSubMetrics() throws IOException { 953 if (isEnableSubscriptionStatistics()) { 954 955 final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); 956 indexLock.writeLock().lock(); 957 try { 958 pageFile.tx().execute(new Transaction.Closure<IOException>() { 959 @Override 960 public void execute(Transaction tx) throws IOException { 961 StoredDestination sd = getStoredDestination(dest, tx); 962 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions 963 .iterator(tx); iterator.hasNext();) { 964 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 965 966 String subscriptionKey = entry.getKey(); 967 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 968 if (cursorPos != null) { 969 long size = getStoredMessageSize(tx, sd, subscriptionKey); 970 statistics.getMessageCount(subscriptionKey) 971 .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); 972 statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0); 973 } 974 } 975 } 976 }); 977 } finally { 978 indexLock.writeLock().unlock(); 979 } 980 } 981 } 982 983 @Override 984 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 985 throws Exception { 986 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 987 @SuppressWarnings("unused") 988 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 989 indexLock.writeLock().lock(); 990 try { 991 pageFile.tx().execute(new Transaction.Closure<Exception>() { 992 @Override 993 public void execute(Transaction tx) throws Exception { 994 StoredDestination sd = getStoredDestination(dest, tx); 995 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 996 sd.orderIndex.setBatch(tx, cursorPos); 997 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 998 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 999 .hasNext();) { 1000 Entry<Long, MessageKeys> entry = iterator.next(); 1001 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1002 continue; 1003 } 1004 listener.recoverMessage(loadMessage(entry.getValue().location)); 1005 } 1006 sd.orderIndex.resetCursorPosition(); 1007 } 1008 }); 1009 } finally { 1010 indexLock.writeLock().unlock(); 1011 } 1012 } 1013 1014 @Override 1015 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1016 final MessageRecoveryListener listener) throws Exception { 1017 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1018 @SuppressWarnings("unused") 1019 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1020 indexLock.writeLock().lock(); 1021 try { 1022 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1023 @Override 1024 public void execute(Transaction tx) throws Exception { 1025 StoredDestination sd = getStoredDestination(dest, tx); 1026 sd.orderIndex.resetCursorPosition(); 1027 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1028 if (moc == null) { 1029 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1030 if (pos == null) { 1031 // sub deleted 1032 return; 1033 } 1034 sd.orderIndex.setBatch(tx, pos); 1035 moc = sd.orderIndex.cursor; 1036 } else { 1037 sd.orderIndex.cursor.sync(moc); 1038 } 1039 1040 Entry<Long, MessageKeys> entry = null; 1041 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 1042 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1043 .hasNext();) { 1044 entry = iterator.next(); 1045 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1046 continue; 1047 } 1048 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1049 counter++; 1050 } 1051 if (counter >= maxReturned || listener.hasSpace() == false) { 1052 break; 1053 } 1054 } 1055 sd.orderIndex.stoppedIterating(); 1056 if (entry != null) { 1057 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1058 sd.subscriptionCursors.put(subscriptionKey, copy); 1059 } 1060 } 1061 }); 1062 } finally { 1063 indexLock.writeLock().unlock(); 1064 } 1065 } 1066 1067 @Override 1068 public void resetBatching(String clientId, String subscriptionName) { 1069 try { 1070 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1071 indexLock.writeLock().lock(); 1072 try { 1073 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1074 @Override 1075 public void execute(Transaction tx) throws IOException { 1076 StoredDestination sd = getStoredDestination(dest, tx); 1077 sd.subscriptionCursors.remove(subscriptionKey); 1078 } 1079 }); 1080 }finally { 1081 indexLock.writeLock().unlock(); 1082 } 1083 } catch (IOException e) { 1084 throw new RuntimeException(e); 1085 } 1086 } 1087 1088 @Override 1089 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 1090 return messageStoreSubStats; 1091 } 1092 } 1093 1094 String subscriptionKey(String clientId, String subscriptionName) { 1095 return clientId + ":" + subscriptionName; 1096 } 1097 1098 @Override 1099 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1100 String key = key(convert(destination)); 1101 MessageStore store = storeCache.get(key(convert(destination))); 1102 if (store == null) { 1103 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1104 store = storeCache.putIfAbsent(key, queueStore); 1105 if (store == null) { 1106 store = queueStore; 1107 } 1108 } 1109 1110 return store; 1111 } 1112 1113 @Override 1114 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1115 String key = key(convert(destination)); 1116 MessageStore store = storeCache.get(key(convert(destination))); 1117 if (store == null) { 1118 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1119 store = storeCache.putIfAbsent(key, topicStore); 1120 if (store == null) { 1121 store = topicStore; 1122 } 1123 } 1124 1125 return (TopicMessageStore) store; 1126 } 1127 1128 /** 1129 * Cleanup method to remove any state associated with the given destination. 1130 * This method does not stop the message store (it might not be cached). 1131 * 1132 * @param destination 1133 * Destination to forget 1134 */ 1135 @Override 1136 public void removeQueueMessageStore(ActiveMQQueue destination) { 1137 } 1138 1139 /** 1140 * Cleanup method to remove any state associated with the given destination 1141 * This method does not stop the message store (it might not be cached). 1142 * 1143 * @param destination 1144 * Destination to forget 1145 */ 1146 @Override 1147 public void removeTopicMessageStore(ActiveMQTopic destination) { 1148 } 1149 1150 @Override 1151 public void deleteAllMessages() throws IOException { 1152 deleteAllMessages = true; 1153 } 1154 1155 @Override 1156 public Set<ActiveMQDestination> getDestinations() { 1157 try { 1158 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1159 indexLock.writeLock().lock(); 1160 try { 1161 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1162 @Override 1163 public void execute(Transaction tx) throws IOException { 1164 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1165 .hasNext();) { 1166 Entry<String, StoredDestination> entry = iterator.next(); 1167 //Removing isEmpty topic check - see AMQ-5875 1168 rc.add(convert(entry.getKey())); 1169 } 1170 } 1171 }); 1172 }finally { 1173 indexLock.writeLock().unlock(); 1174 } 1175 return rc; 1176 } catch (IOException e) { 1177 throw new RuntimeException(e); 1178 } 1179 } 1180 1181 @Override 1182 public long getLastMessageBrokerSequenceId() throws IOException { 1183 return 0; 1184 } 1185 1186 @Override 1187 public long getLastProducerSequenceId(ProducerId id) { 1188 indexLock.writeLock().lock(); 1189 try { 1190 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1191 } finally { 1192 indexLock.writeLock().unlock(); 1193 } 1194 } 1195 1196 @Override 1197 public long size() { 1198 try { 1199 return journalSize.get() + getPageFile().getDiskSize(); 1200 } catch (IOException e) { 1201 throw new RuntimeException(e); 1202 } 1203 } 1204 1205 @Override 1206 public void beginTransaction(ConnectionContext context) throws IOException { 1207 throw new IOException("Not yet implemented."); 1208 } 1209 @Override 1210 public void commitTransaction(ConnectionContext context) throws IOException { 1211 throw new IOException("Not yet implemented."); 1212 } 1213 @Override 1214 public void rollbackTransaction(ConnectionContext context) throws IOException { 1215 throw new IOException("Not yet implemented."); 1216 } 1217 1218 @Override 1219 public void checkpoint(boolean sync) throws IOException { 1220 super.checkpointCleanup(sync); 1221 } 1222 1223 // ///////////////////////////////////////////////////////////////// 1224 // Internal helper methods. 1225 // ///////////////////////////////////////////////////////////////// 1226 1227 /** 1228 * @param location 1229 * @return 1230 * @throws IOException 1231 */ 1232 Message loadMessage(Location location) throws IOException { 1233 try { 1234 JournalCommand<?> command = load(location); 1235 KahaAddMessageCommand addMessage = null; 1236 switch (command.type()) { 1237 case KAHA_UPDATE_MESSAGE_COMMAND: 1238 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1239 break; 1240 default: 1241 addMessage = (KahaAddMessageCommand) command; 1242 } 1243 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1244 return msg; 1245 } catch (IOException ioe) { 1246 LOG.error("Failed to load message at: {}", location , ioe); 1247 brokerService.handleIOException(ioe); 1248 throw ioe; 1249 } 1250 } 1251 1252 // ///////////////////////////////////////////////////////////////// 1253 // Internal conversion methods. 1254 // ///////////////////////////////////////////////////////////////// 1255 1256 KahaLocation convert(Location location) { 1257 KahaLocation rc = new KahaLocation(); 1258 rc.setLogId(location.getDataFileId()); 1259 rc.setOffset(location.getOffset()); 1260 return rc; 1261 } 1262 1263 KahaDestination convert(ActiveMQDestination dest) { 1264 KahaDestination rc = new KahaDestination(); 1265 rc.setName(dest.getPhysicalName()); 1266 switch (dest.getDestinationType()) { 1267 case ActiveMQDestination.QUEUE_TYPE: 1268 rc.setType(DestinationType.QUEUE); 1269 return rc; 1270 case ActiveMQDestination.TOPIC_TYPE: 1271 rc.setType(DestinationType.TOPIC); 1272 return rc; 1273 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1274 rc.setType(DestinationType.TEMP_QUEUE); 1275 return rc; 1276 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1277 rc.setType(DestinationType.TEMP_TOPIC); 1278 return rc; 1279 default: 1280 return null; 1281 } 1282 } 1283 1284 ActiveMQDestination convert(String dest) { 1285 int p = dest.indexOf(":"); 1286 if (p < 0) { 1287 throw new IllegalArgumentException("Not in the valid destination format"); 1288 } 1289 int type = Integer.parseInt(dest.substring(0, p)); 1290 String name = dest.substring(p + 1); 1291 return convert(type, name); 1292 } 1293 1294 private ActiveMQDestination convert(KahaDestination commandDestination) { 1295 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1296 } 1297 1298 private ActiveMQDestination convert(int type, String name) { 1299 switch (KahaDestination.DestinationType.valueOf(type)) { 1300 case QUEUE: 1301 return new ActiveMQQueue(name); 1302 case TOPIC: 1303 return new ActiveMQTopic(name); 1304 case TEMP_QUEUE: 1305 return new ActiveMQTempQueue(name); 1306 case TEMP_TOPIC: 1307 return new ActiveMQTempTopic(name); 1308 default: 1309 throw new IllegalArgumentException("Not in the valid destination format"); 1310 } 1311 } 1312 1313 public TransactionIdTransformer getTransactionIdTransformer() { 1314 return transactionIdTransformer; 1315 } 1316 1317 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1318 this.transactionIdTransformer = transactionIdTransformer; 1319 } 1320 1321 static class AsyncJobKey { 1322 MessageId id; 1323 ActiveMQDestination destination; 1324 1325 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1326 this.id = id; 1327 this.destination = destination; 1328 } 1329 1330 @Override 1331 public boolean equals(Object obj) { 1332 if (obj == this) { 1333 return true; 1334 } 1335 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1336 && destination.equals(((AsyncJobKey) obj).destination); 1337 } 1338 1339 @Override 1340 public int hashCode() { 1341 return id.hashCode() + destination.hashCode(); 1342 } 1343 1344 @Override 1345 public String toString() { 1346 return destination.getPhysicalName() + "-" + id; 1347 } 1348 } 1349 1350 public interface StoreTask { 1351 public boolean cancel(); 1352 1353 public void aquireLocks(); 1354 1355 public void releaseLocks(); 1356 } 1357 1358 class StoreQueueTask implements Runnable, StoreTask { 1359 protected final Message message; 1360 protected final ConnectionContext context; 1361 protected final KahaDBMessageStore store; 1362 protected final InnerFutureTask future; 1363 protected final AtomicBoolean done = new AtomicBoolean(); 1364 protected final AtomicBoolean locked = new AtomicBoolean(); 1365 1366 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1367 this.store = store; 1368 this.context = context; 1369 this.message = message; 1370 this.future = new InnerFutureTask(this); 1371 } 1372 1373 public ListenableFuture<Object> getFuture() { 1374 return this.future; 1375 } 1376 1377 @Override 1378 public boolean cancel() { 1379 if (this.done.compareAndSet(false, true)) { 1380 return this.future.cancel(false); 1381 } 1382 return false; 1383 } 1384 1385 @Override 1386 public void aquireLocks() { 1387 if (this.locked.compareAndSet(false, true)) { 1388 try { 1389 globalQueueSemaphore.acquire(); 1390 store.acquireLocalAsyncLock(); 1391 message.incrementReferenceCount(); 1392 } catch (InterruptedException e) { 1393 LOG.warn("Failed to aquire lock", e); 1394 } 1395 } 1396 1397 } 1398 1399 @Override 1400 public void releaseLocks() { 1401 if (this.locked.compareAndSet(true, false)) { 1402 store.releaseLocalAsyncLock(); 1403 globalQueueSemaphore.release(); 1404 message.decrementReferenceCount(); 1405 } 1406 } 1407 1408 @Override 1409 public void run() { 1410 this.store.doneTasks++; 1411 try { 1412 if (this.done.compareAndSet(false, true)) { 1413 this.store.addMessage(context, message); 1414 removeQueueTask(this.store, this.message.getMessageId()); 1415 this.future.complete(); 1416 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1417 System.err.println(this.store.dest.getName() + " cancelled: " 1418 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1419 this.store.canceledTasks = this.store.doneTasks = 0; 1420 } 1421 } catch (Exception e) { 1422 this.future.setException(e); 1423 } 1424 } 1425 1426 protected Message getMessage() { 1427 return this.message; 1428 } 1429 1430 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1431 1432 private Runnable listener; 1433 public InnerFutureTask(Runnable runnable) { 1434 super(runnable, null); 1435 1436 } 1437 1438 public void setException(final Exception e) { 1439 super.setException(e); 1440 } 1441 1442 public void complete() { 1443 super.set(null); 1444 } 1445 1446 @Override 1447 public void done() { 1448 fireListener(); 1449 } 1450 1451 @Override 1452 public void addListener(Runnable listener) { 1453 this.listener = listener; 1454 if (isDone()) { 1455 fireListener(); 1456 } 1457 } 1458 1459 private void fireListener() { 1460 if (listener != null) { 1461 try { 1462 listener.run(); 1463 } catch (Exception ignored) { 1464 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1465 } 1466 } 1467 } 1468 } 1469 } 1470 1471 class StoreTopicTask extends StoreQueueTask { 1472 private final int subscriptionCount; 1473 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1474 private final KahaDBTopicMessageStore topicStore; 1475 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1476 int subscriptionCount) { 1477 super(store, context, message); 1478 this.topicStore = store; 1479 this.subscriptionCount = subscriptionCount; 1480 1481 } 1482 1483 @Override 1484 public void aquireLocks() { 1485 if (this.locked.compareAndSet(false, true)) { 1486 try { 1487 globalTopicSemaphore.acquire(); 1488 store.acquireLocalAsyncLock(); 1489 message.incrementReferenceCount(); 1490 } catch (InterruptedException e) { 1491 LOG.warn("Failed to aquire lock", e); 1492 } 1493 } 1494 } 1495 1496 @Override 1497 public void releaseLocks() { 1498 if (this.locked.compareAndSet(true, false)) { 1499 message.decrementReferenceCount(); 1500 store.releaseLocalAsyncLock(); 1501 globalTopicSemaphore.release(); 1502 } 1503 } 1504 1505 /** 1506 * add a key 1507 * 1508 * @param key 1509 * @return true if all acknowledgements received 1510 */ 1511 public boolean addSubscriptionKey(String key) { 1512 synchronized (this.subscriptionKeys) { 1513 this.subscriptionKeys.add(key); 1514 } 1515 return this.subscriptionKeys.size() >= this.subscriptionCount; 1516 } 1517 1518 @Override 1519 public void run() { 1520 this.store.doneTasks++; 1521 try { 1522 if (this.done.compareAndSet(false, true)) { 1523 this.topicStore.addMessage(context, message); 1524 // apply any acks we have 1525 synchronized (this.subscriptionKeys) { 1526 for (String key : this.subscriptionKeys) { 1527 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1528 1529 } 1530 } 1531 removeTopicTask(this.topicStore, this.message.getMessageId()); 1532 this.future.complete(); 1533 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1534 System.err.println(this.store.dest.getName() + " cancelled: " 1535 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1536 this.store.canceledTasks = this.store.doneTasks = 0; 1537 } 1538 } catch (Exception e) { 1539 this.future.setException(e); 1540 } 1541 } 1542 } 1543 1544 public class StoreTaskExecutor extends ThreadPoolExecutor { 1545 1546 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1547 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1548 } 1549 1550 @Override 1551 protected void afterExecute(Runnable runnable, Throwable throwable) { 1552 super.afterExecute(runnable, throwable); 1553 1554 if (runnable instanceof StoreTask) { 1555 ((StoreTask)runnable).releaseLocks(); 1556 } 1557 } 1558 } 1559 1560 @Override 1561 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1562 return new JobSchedulerStoreImpl(); 1563 } 1564}