001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.CancellationException; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.command.Message; 032import org.apache.activemq.command.MessageAck; 033import org.apache.activemq.command.MessageId; 034import org.apache.activemq.command.TransactionId; 035import org.apache.activemq.command.XATransactionId; 036import org.apache.activemq.openwire.OpenWireFormat; 037import org.apache.activemq.protobuf.Buffer; 038import org.apache.activemq.store.AbstractMessageStore; 039import org.apache.activemq.store.MessageStore; 040import org.apache.activemq.store.ProxyMessageStore; 041import org.apache.activemq.store.ProxyTopicMessageStore; 042import org.apache.activemq.store.TopicMessageStore; 043import org.apache.activemq.store.TransactionRecoveryListener; 044import org.apache.activemq.store.TransactionStore; 045import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation; 046import org.apache.activemq.store.kahadb.MessageDatabase.Operation; 047import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation; 048import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 050import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 051import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 052import org.apache.activemq.wireformat.WireFormat; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Provides a TransactionStore implementation that can create transaction aware 058 * MessageStore objects from non transaction aware MessageStore objects. 059 * 060 * 061 */ 062public class KahaDBTransactionStore implements TransactionStore { 063 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); 064 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 065 private final WireFormat wireFormat = new OpenWireFormat(); 066 private final KahaDBStore theStore; 067 068 public KahaDBTransactionStore(KahaDBStore theStore) { 069 this.theStore = theStore; 070 } 071 072 public class Tx { 073 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 074 075 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 076 077 public void add(AddMessageCommand msg) { 078 messages.add(msg); 079 } 080 081 public void add(RemoveMessageCommand ack) { 082 acks.add(ack); 083 } 084 085 public Message[] getMessages() { 086 Message rc[] = new Message[messages.size()]; 087 int count = 0; 088 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 089 AddMessageCommand cmd = iter.next(); 090 rc[count++] = cmd.getMessage(); 091 } 092 return rc; 093 } 094 095 public MessageAck[] getAcks() { 096 MessageAck rc[] = new MessageAck[acks.size()]; 097 int count = 0; 098 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 099 RemoveMessageCommand cmd = iter.next(); 100 rc[count++] = cmd.getMessageAck(); 101 } 102 return rc; 103 } 104 105 /** 106 * @return true if something to commit 107 * @throws IOException 108 */ 109 public List<Future<Object>> commit() throws IOException { 110 List<Future<Object>> results = new ArrayList<Future<Object>>(); 111 // Do all the message adds. 112 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 113 AddMessageCommand cmd = iter.next(); 114 results.add(cmd.run()); 115 116 } 117 // And removes.. 118 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 119 RemoveMessageCommand cmd = iter.next(); 120 cmd.run(); 121 results.add(cmd.run()); 122 } 123 124 return results; 125 } 126 } 127 128 public abstract class AddMessageCommand { 129 private final ConnectionContext ctx; 130 AddMessageCommand(ConnectionContext ctx) { 131 this.ctx = ctx; 132 } 133 abstract Message getMessage(); 134 Future<Object> run() throws IOException { 135 return run(this.ctx); 136 } 137 abstract Future<Object> run(ConnectionContext ctx) throws IOException; 138 } 139 140 public abstract class RemoveMessageCommand { 141 142 private final ConnectionContext ctx; 143 RemoveMessageCommand(ConnectionContext ctx) { 144 this.ctx = ctx; 145 } 146 abstract MessageAck getMessageAck(); 147 Future<Object> run() throws IOException { 148 return run(this.ctx); 149 } 150 abstract Future<Object> run(ConnectionContext context) throws IOException; 151 } 152 153 public MessageStore proxy(MessageStore messageStore) { 154 return new ProxyMessageStore(messageStore) { 155 @Override 156 public void addMessage(ConnectionContext context, final Message send) throws IOException { 157 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 158 } 159 160 @Override 161 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 162 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 163 } 164 165 @Override 166 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 167 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 168 } 169 170 @Override 171 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 172 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 173 } 174 175 @Override 176 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 177 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 178 } 179 180 @Override 181 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 182 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 183 } 184 }; 185 } 186 187 public TopicMessageStore proxy(TopicMessageStore messageStore) { 188 return new ProxyTopicMessageStore(messageStore) { 189 @Override 190 public void addMessage(ConnectionContext context, final Message send) throws IOException { 191 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 192 } 193 194 @Override 195 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 196 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 197 } 198 199 @Override 200 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 201 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 202 } 203 204 @Override 205 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 206 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 207 } 208 209 @Override 210 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 211 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 212 } 213 214 @Override 215 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 216 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 217 } 218 219 @Override 220 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 221 MessageId messageId, MessageAck ack) throws IOException { 222 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId, 223 subscriptionName, messageId, ack); 224 } 225 226 }; 227 } 228 229 /** 230 * @throws IOException 231 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 232 */ 233 public void prepare(TransactionId txid) throws IOException { 234 KahaTransactionInfo info = getTransactionInfo(txid); 235 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 236 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 237 } else { 238 Tx tx = inflightTransactions.remove(txid); 239 if (tx != null) { 240 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 241 } 242 } 243 } 244 245 public Tx getTx(Object txid) { 246 Tx tx = inflightTransactions.get(txid); 247 if (tx == null) { 248 tx = new Tx(); 249 inflightTransactions.put(txid, tx); 250 } 251 return tx; 252 } 253 254 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 255 throws IOException { 256 if (txid != null) { 257 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { 258 if (preCommit != null) { 259 preCommit.run(); 260 } 261 Tx tx = inflightTransactions.remove(txid); 262 if (tx != null) { 263 List<Future<Object>> results = tx.commit(); 264 boolean doneSomething = false; 265 for (Future<Object> result : results) { 266 try { 267 result.get(); 268 } catch (InterruptedException e) { 269 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 270 } catch (ExecutionException e) { 271 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 272 }catch(CancellationException e) { 273 } 274 if (!result.isCancelled()) { 275 doneSomething = true; 276 } 277 } 278 if (postCommit != null) { 279 postCommit.run(); 280 } 281 if (doneSomething) { 282 KahaTransactionInfo info = getTransactionInfo(txid); 283 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null); 284 } 285 }else { 286 //The Tx will be null for failed over clients - lets run their post commits 287 if (postCommit != null) { 288 postCommit.run(); 289 } 290 } 291 292 } else { 293 KahaTransactionInfo info = getTransactionInfo(txid); 294 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); 295 forgetRecoveredAcks(txid); 296 } 297 }else { 298 LOG.error("Null transaction passed on commit"); 299 } 300 } 301 302 /** 303 * @throws IOException 304 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 305 */ 306 public void rollback(TransactionId txid) throws IOException { 307 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 308 KahaTransactionInfo info = getTransactionInfo(txid); 309 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null); 310 forgetRecoveredAcks(txid); 311 } else { 312 inflightTransactions.remove(txid); 313 } 314 } 315 316 protected void forgetRecoveredAcks(TransactionId txid) throws IOException { 317 if (txid.isXATransaction()) { 318 XATransactionId xaTid = ((XATransactionId) txid); 319 theStore.forgetRecoveredAcks(xaTid.getPreparedAcks()); 320 } 321 } 322 323 public void start() throws Exception { 324 } 325 326 public void stop() throws Exception { 327 } 328 329 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 330 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) { 331 XATransactionId xid = (XATransactionId) entry.getKey(); 332 ArrayList<Message> messageList = new ArrayList<Message>(); 333 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 334 335 for (Operation op : entry.getValue()) { 336 if (op.getClass() == AddOpperation.class) { 337 AddOpperation addOp = (AddOpperation) op; 338 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage() 339 .newInput())); 340 messageList.add(msg); 341 } else { 342 RemoveOpperation rmOp = (RemoveOpperation) op; 343 Buffer ackb = rmOp.getCommand().getAck(); 344 MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput())); 345 ackList.add(ack); 346 } 347 } 348 349 Message[] addedMessages = new Message[messageList.size()]; 350 MessageAck[] acks = new MessageAck[ackList.size()]; 351 messageList.toArray(addedMessages); 352 ackList.toArray(acks); 353 xid.setPreparedAcks(ackList); 354 theStore.trackRecoveredAcks(ackList); 355 listener.recover(xid, addedMessages, acks); 356 } 357 } 358 359 /** 360 * @param message 361 * @throws IOException 362 */ 363 void addMessage(ConnectionContext context, final MessageStore destination, final Message message) 364 throws IOException { 365 366 if (message.getTransactionId() != null) { 367 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 368 destination.addMessage(context, message); 369 } else { 370 Tx tx = getTx(message.getTransactionId()); 371 tx.add(new AddMessageCommand(context) { 372 @Override 373 public Message getMessage() { 374 return message; 375 } 376 @Override 377 public Future<Object> run(ConnectionContext ctx) throws IOException { 378 destination.addMessage(ctx, message); 379 return AbstractMessageStore.FUTURE; 380 } 381 382 }); 383 } 384 } else { 385 destination.addMessage(context, message); 386 } 387 } 388 389 Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) 390 throws IOException { 391 392 if (message.getTransactionId() != null) { 393 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 394 destination.addMessage(context, message); 395 return AbstractMessageStore.FUTURE; 396 } else { 397 Tx tx = getTx(message.getTransactionId()); 398 tx.add(new AddMessageCommand(context) { 399 @Override 400 public Message getMessage() { 401 return message; 402 } 403 @Override 404 public Future<Object> run(ConnectionContext ctx) throws IOException { 405 return destination.asyncAddQueueMessage(ctx, message); 406 } 407 408 }); 409 return AbstractMessageStore.FUTURE; 410 } 411 } else { 412 return destination.asyncAddQueueMessage(context, message); 413 } 414 } 415 416 Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message) 417 throws IOException { 418 419 if (message.getTransactionId() != null) { 420 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 421 destination.addMessage(context, message); 422 return AbstractMessageStore.FUTURE; 423 } else { 424 Tx tx = getTx(message.getTransactionId()); 425 tx.add(new AddMessageCommand(context) { 426 @Override 427 public Message getMessage() { 428 return message; 429 } 430 @Override 431 public Future<Object> run(ConnectionContext ctx) throws IOException { 432 return destination.asyncAddTopicMessage(ctx, message); 433 } 434 435 }); 436 return AbstractMessageStore.FUTURE; 437 } 438 } else { 439 return destination.asyncAddTopicMessage(context, message); 440 } 441 } 442 443 /** 444 * @param ack 445 * @throws IOException 446 */ 447 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 448 throws IOException { 449 450 if (ack.isInTransaction()) { 451 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 452 destination.removeMessage(context, ack); 453 } else { 454 Tx tx = getTx(ack.getTransactionId()); 455 tx.add(new RemoveMessageCommand(context) { 456 @Override 457 public MessageAck getMessageAck() { 458 return ack; 459 } 460 461 @Override 462 public Future<Object> run(ConnectionContext ctx) throws IOException { 463 destination.removeMessage(ctx, ack); 464 return AbstractMessageStore.FUTURE; 465 } 466 }); 467 } 468 } else { 469 destination.removeMessage(context, ack); 470 } 471 } 472 473 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 474 throws IOException { 475 476 if (ack.isInTransaction()) { 477 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 478 destination.removeAsyncMessage(context, ack); 479 } else { 480 Tx tx = getTx(ack.getTransactionId()); 481 tx.add(new RemoveMessageCommand(context) { 482 @Override 483 public MessageAck getMessageAck() { 484 return ack; 485 } 486 487 @Override 488 public Future<Object> run(ConnectionContext ctx) throws IOException { 489 destination.removeMessage(ctx, ack); 490 return AbstractMessageStore.FUTURE; 491 } 492 }); 493 } 494 } else { 495 destination.removeAsyncMessage(context, ack); 496 } 497 } 498 499 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, 500 final MessageId messageId, final MessageAck ack) throws IOException { 501 502 if (ack.isInTransaction()) { 503 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 504 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 505 } else { 506 Tx tx = getTx(ack.getTransactionId()); 507 tx.add(new RemoveMessageCommand(context) { 508 public MessageAck getMessageAck() { 509 return ack; 510 } 511 512 public Future<Object> run(ConnectionContext ctx) throws IOException { 513 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 514 return AbstractMessageStore.FUTURE; 515 } 516 }); 517 } 518 } else { 519 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 520 } 521 } 522 523 524 private KahaTransactionInfo getTransactionInfo(TransactionId txid) { 525 return theStore.getTransactionIdTransformer().transform(txid); 526 } 527}