001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.InterruptedIOException; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024 025import javax.jms.JMSException; 026import javax.jms.TransactionInProgressException; 027import javax.jms.TransactionRolledBackException; 028import javax.transaction.xa.XAException; 029import javax.transaction.xa.XAResource; 030import javax.transaction.xa.Xid; 031 032import org.apache.activemq.command.Command; 033import org.apache.activemq.command.ConnectionId; 034import org.apache.activemq.command.DataArrayResponse; 035import org.apache.activemq.command.DataStructure; 036import org.apache.activemq.command.IntegerResponse; 037import org.apache.activemq.command.LocalTransactionId; 038import org.apache.activemq.command.Response; 039import org.apache.activemq.command.TransactionId; 040import org.apache.activemq.command.TransactionInfo; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.transaction.Synchronization; 043import org.apache.activemq.util.JMSExceptionSupport; 044import org.apache.activemq.util.LongSequenceGenerator; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * A TransactionContext provides the means to control a JMS transaction. It 050 * provides a local transaction interface and also an XAResource interface. <p/> 051 * An application server controls the transactional assignment of an XASession 052 * by obtaining its XAResource. It uses the XAResource to assign the session to 053 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 054 * XAResource provides some fairly sophisticated facilities for interleaving 055 * work on multiple transactions, recovering a list of transactions in progress, 056 * and so on. A JTA aware JMS provider must fully implement this functionality. 057 * This could be done by using the services of a database that supports XA, or a 058 * JMS provider may choose to implement this functionality from scratch. <p/> 059 * 060 * 061 * @see javax.jms.Session 062 * @see javax.jms.QueueSession 063 * @see javax.jms.TopicSession 064 * @see javax.jms.XASession 065 */ 066public class TransactionContext implements XAResource { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 069 070 // XATransactionId -> ArrayList of TransactionContext objects 071 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 072 new HashMap<TransactionId, List<TransactionContext>>(); 073 074 private final ActiveMQConnection connection; 075 private final LongSequenceGenerator localTransactionIdGenerator; 076 private final ConnectionId connectionId; 077 private List<Synchronization> synchronizations; 078 079 // To track XA transactions. 080 private Xid associatedXid; 081 private TransactionId transactionId; 082 private LocalTransactionEventListener localTransactionEventListener; 083 private int beforeEndIndex; 084 085 public TransactionContext(ActiveMQConnection connection) { 086 this.connection = connection; 087 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 088 this.connectionId = connection.getConnectionInfo().getConnectionId(); 089 } 090 091 public boolean isInXATransaction() { 092 if (transactionId != null && transactionId.isXATransaction()) { 093 return true; 094 } else { 095 if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) { 096 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 097 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 098 if (transactions.contains(this)) { 099 return true; 100 } 101 } 102 } 103 } 104 } 105 106 return false; 107 } 108 109 public boolean isInLocalTransaction() { 110 return transactionId != null && transactionId.isLocalTransaction(); 111 } 112 113 public boolean isInTransaction() { 114 return transactionId != null; 115 } 116 117 /** 118 * @return Returns the localTransactionEventListener. 119 */ 120 public LocalTransactionEventListener getLocalTransactionEventListener() { 121 return localTransactionEventListener; 122 } 123 124 /** 125 * Used by the resource adapter to listen to transaction events. 126 * 127 * @param localTransactionEventListener The localTransactionEventListener to 128 * set. 129 */ 130 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 131 this.localTransactionEventListener = localTransactionEventListener; 132 } 133 134 // /////////////////////////////////////////////////////////// 135 // 136 // Methods that work with the Synchronization objects registered with 137 // the transaction. 138 // 139 // /////////////////////////////////////////////////////////// 140 141 public void addSynchronization(Synchronization s) { 142 if (synchronizations == null) { 143 synchronizations = new ArrayList<Synchronization>(10); 144 } 145 synchronizations.add(s); 146 } 147 148 private void afterRollback() throws JMSException { 149 if (synchronizations == null) { 150 return; 151 } 152 153 Throwable firstException = null; 154 int size = synchronizations.size(); 155 for (int i = 0; i < size; i++) { 156 try { 157 synchronizations.get(i).afterRollback(); 158 } catch (Throwable t) { 159 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); 160 if (firstException == null) { 161 firstException = t; 162 } 163 } 164 } 165 synchronizations = null; 166 if (firstException != null) { 167 throw JMSExceptionSupport.create(firstException); 168 } 169 } 170 171 private void afterCommit() throws JMSException { 172 if (synchronizations == null) { 173 return; 174 } 175 176 Throwable firstException = null; 177 int size = synchronizations.size(); 178 for (int i = 0; i < size; i++) { 179 try { 180 synchronizations.get(i).afterCommit(); 181 } catch (Throwable t) { 182 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); 183 if (firstException == null) { 184 firstException = t; 185 } 186 } 187 } 188 synchronizations = null; 189 if (firstException != null) { 190 throw JMSExceptionSupport.create(firstException); 191 } 192 } 193 194 private void beforeEnd() throws JMSException { 195 if (synchronizations == null) { 196 return; 197 } 198 199 int size = synchronizations.size(); 200 try { 201 for (;beforeEndIndex < size;) { 202 synchronizations.get(beforeEndIndex++).beforeEnd(); 203 } 204 } catch (JMSException e) { 205 throw e; 206 } catch (Throwable e) { 207 throw JMSExceptionSupport.create(e); 208 } 209 } 210 211 public TransactionId getTransactionId() { 212 return transactionId; 213 } 214 215 // /////////////////////////////////////////////////////////// 216 // 217 // Local transaction interface. 218 // 219 // /////////////////////////////////////////////////////////// 220 221 /** 222 * Start a local transaction. 223 * @throws javax.jms.JMSException on internal error 224 */ 225 public void begin() throws JMSException { 226 227 if (isInXATransaction()) { 228 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 229 } 230 231 if (transactionId == null) { 232 synchronizations = null; 233 beforeEndIndex = 0; 234 this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId()); 235 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 236 this.connection.ensureConnectionInfoSent(); 237 this.connection.asyncSendPacket(info); 238 239 // Notify the listener that the tx was started. 240 if (localTransactionEventListener != null) { 241 localTransactionEventListener.beginEvent(); 242 } 243 if (LOG.isDebugEnabled()) { 244 LOG.debug("Begin:" + transactionId); 245 } 246 } 247 248 } 249 250 /** 251 * Rolls back any work done in this transaction and releases any locks 252 * currently held. 253 * 254 * @throws JMSException if the JMS provider fails to roll back the 255 * transaction due to some internal error. 256 * @throws javax.jms.IllegalStateException if the method is not called by a 257 * transacted session. 258 */ 259 public void rollback() throws JMSException { 260 if (isInXATransaction()) { 261 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 262 } 263 264 try { 265 beforeEnd(); 266 } catch (TransactionRolledBackException canOcurrOnFailover) { 267 LOG.warn("rollback processing error", canOcurrOnFailover); 268 } 269 if (transactionId != null) { 270 if (LOG.isDebugEnabled()) { 271 LOG.debug("Rollback: " + transactionId 272 + " syncCount: " 273 + (synchronizations != null ? synchronizations.size() : 0)); 274 } 275 276 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 277 this.transactionId = null; 278 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 279 this.connection.syncSendPacket(info); 280 // Notify the listener that the tx was rolled back 281 if (localTransactionEventListener != null) { 282 localTransactionEventListener.rollbackEvent(); 283 } 284 } 285 286 afterRollback(); 287 } 288 289 /** 290 * Commits all work done in this transaction and releases any locks 291 * currently held. 292 * 293 * @throws JMSException if the JMS provider fails to commit the transaction 294 * due to some internal error. 295 * @throws javax.jms.IllegalStateException if the method is not called by a 296 * transacted session. 297 */ 298 public void commit() throws JMSException { 299 if (isInXATransaction()) { 300 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 301 } 302 303 try { 304 beforeEnd(); 305 } catch (JMSException e) { 306 rollback(); 307 throw e; 308 } 309 310 // Only send commit if the transaction was started. 311 if (transactionId != null) { 312 if (LOG.isDebugEnabled()) { 313 LOG.debug("Commit: " + transactionId 314 + " syncCount: " 315 + (synchronizations != null ? synchronizations.size() : 0)); 316 } 317 318 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 319 this.transactionId = null; 320 // Notify the listener that the tx was committed back 321 try { 322 syncSendPacketWithInterruptionHandling(info); 323 if (localTransactionEventListener != null) { 324 localTransactionEventListener.commitEvent(); 325 } 326 afterCommit(); 327 } catch (JMSException cause) { 328 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 329 if (localTransactionEventListener != null) { 330 localTransactionEventListener.rollbackEvent(); 331 } 332 afterRollback(); 333 throw cause; 334 } 335 336 } 337 } 338 339 // /////////////////////////////////////////////////////////// 340 // 341 // XAResource Implementation 342 // 343 // /////////////////////////////////////////////////////////// 344 /** 345 * Associates a transaction with the resource. 346 */ 347 public void start(Xid xid, int flags) throws XAException { 348 349 if (LOG.isDebugEnabled()) { 350 LOG.debug("Start: " + xid); 351 } 352 if (isInLocalTransaction()) { 353 throw new XAException(XAException.XAER_PROTO); 354 } 355 // Are we already associated? 356 if (associatedXid != null) { 357 throw new XAException(XAException.XAER_PROTO); 358 } 359 360 // if ((flags & TMJOIN) == TMJOIN) { 361 // TODO: verify that the server has seen the xid 362 // // } 363 // if ((flags & TMJOIN) == TMRESUME) { 364 // // TODO: verify that the xid was suspended. 365 // } 366 367 // associate 368 synchronizations = null; 369 beforeEndIndex = 0; 370 setXid(xid); 371 } 372 373 /** 374 * @return connectionId for connection 375 */ 376 private ConnectionId getConnectionId() { 377 return connection.getConnectionInfo().getConnectionId(); 378 } 379 380 public void end(Xid xid, int flags) throws XAException { 381 382 if (LOG.isDebugEnabled()) { 383 LOG.debug("End: " + xid); 384 } 385 386 if (isInLocalTransaction()) { 387 throw new XAException(XAException.XAER_PROTO); 388 } 389 390 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 391 // You can only suspend the associated xid. 392 if (!equals(associatedXid, xid)) { 393 throw new XAException(XAException.XAER_PROTO); 394 } 395 396 // TODO: we may want to put the xid in a suspended list. 397 try { 398 beforeEnd(); 399 } catch (JMSException e) { 400 throw toXAException(e); 401 } 402 setXid(null); 403 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 404 // set to null if this is the current xid. 405 // otherwise this could be an asynchronous success call 406 if (equals(associatedXid, xid)) { 407 try { 408 beforeEnd(); 409 } catch (JMSException e) { 410 throw toXAException(e); 411 } 412 setXid(null); 413 } 414 } else { 415 throw new XAException(XAException.XAER_INVAL); 416 } 417 } 418 419 private boolean equals(Xid xid1, Xid xid2) { 420 if (xid1 == xid2) { 421 return true; 422 } 423 if (xid1 == null ^ xid2 == null) { 424 return false; 425 } 426 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 427 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 428 } 429 430 public int prepare(Xid xid) throws XAException { 431 if (LOG.isDebugEnabled()) { 432 LOG.debug("Prepare: " + xid); 433 } 434 435 // We allow interleaving multiple transactions, so 436 // we don't limit prepare to the associated xid. 437 XATransactionId x; 438 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 439 // called first 440 if (xid == null || (equals(associatedXid, xid))) { 441 throw new XAException(XAException.XAER_PROTO); 442 } else { 443 // TODO: cache the known xids so we don't keep recreating this one?? 444 x = new XATransactionId(xid); 445 } 446 447 try { 448 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 449 450 // Find out if the server wants to commit or rollback. 451 IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); 452 if (XAResource.XA_RDONLY == response.getResult()) { 453 // transaction stops now, may be syncs that need a callback 454 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 455 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 456 if (l != null && !l.isEmpty()) { 457 if (LOG.isDebugEnabled()) { 458 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid); 459 } 460 for (TransactionContext ctx : l) { 461 ctx.afterCommit(); 462 } 463 } 464 } 465 } 466 return response.getResult(); 467 468 } catch (JMSException e) { 469 LOG.warn("prepare of: " + x + " failed with: " + e, e); 470 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 471 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 472 if (l != null && !l.isEmpty()) { 473 for (TransactionContext ctx : l) { 474 try { 475 ctx.afterRollback(); 476 } catch (Throwable ignored) { 477 if (LOG.isDebugEnabled()) { 478 LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + 479 x + ", context: " + ctx, ignored); 480 } 481 } 482 } 483 } 484 } 485 throw toXAException(e); 486 } 487 } 488 489 public void rollback(Xid xid) throws XAException { 490 491 if (LOG.isDebugEnabled()) { 492 LOG.debug("Rollback: " + xid); 493 } 494 495 // We allow interleaving multiple transactions, so 496 // we don't limit rollback to the associated xid. 497 XATransactionId x; 498 if (xid == null) { 499 throw new XAException(XAException.XAER_PROTO); 500 } 501 if (equals(associatedXid, xid)) { 502 // I think this can happen even without an end(xid) call. Need to 503 // check spec. 504 x = (XATransactionId)transactionId; 505 } else { 506 x = new XATransactionId(xid); 507 } 508 509 try { 510 this.connection.checkClosedOrFailed(); 511 this.connection.ensureConnectionInfoSent(); 512 513 // Let the server know that the tx is rollback. 514 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 515 syncSendPacketWithInterruptionHandling(info); 516 517 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 518 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 519 if (l != null && !l.isEmpty()) { 520 for (TransactionContext ctx : l) { 521 ctx.afterRollback(); 522 } 523 } 524 } 525 } catch (JMSException e) { 526 throw toXAException(e); 527 } 528 } 529 530 // XAResource interface 531 public void commit(Xid xid, boolean onePhase) throws XAException { 532 533 if (LOG.isDebugEnabled()) { 534 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); 535 } 536 537 // We allow interleaving multiple transactions, so 538 // we don't limit commit to the associated xid. 539 XATransactionId x; 540 if (xid == null || (equals(associatedXid, xid))) { 541 // should never happen, end(xid,TMSUCCESS) must have been previously 542 // called 543 throw new XAException(XAException.XAER_PROTO); 544 } else { 545 x = new XATransactionId(xid); 546 } 547 548 try { 549 this.connection.checkClosedOrFailed(); 550 this.connection.ensureConnectionInfoSent(); 551 552 // Notify the server that the tx was committed back 553 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 554 555 syncSendPacketWithInterruptionHandling(info); 556 557 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 558 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 559 if (l != null && !l.isEmpty()) { 560 for (TransactionContext ctx : l) { 561 try { 562 ctx.afterCommit(); 563 } catch (Exception ignored) { 564 LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored); 565 } 566 } 567 } 568 } 569 570 } catch (JMSException e) { 571 LOG.warn("commit of: " + x + " failed with: " + e, e); 572 if (onePhase) { 573 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 574 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 575 if (l != null && !l.isEmpty()) { 576 for (TransactionContext ctx : l) { 577 try { 578 ctx.afterRollback(); 579 } catch (Throwable ignored) { 580 if (LOG.isDebugEnabled()) { 581 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored); 582 } 583 } 584 } 585 } 586 } 587 } 588 throw toXAException(e); 589 } 590 591 } 592 593 public void forget(Xid xid) throws XAException { 594 if (LOG.isDebugEnabled()) { 595 LOG.debug("Forget: " + xid); 596 } 597 598 // We allow interleaving multiple transactions, so 599 // we don't limit forget to the associated xid. 600 XATransactionId x; 601 if (xid == null) { 602 throw new XAException(XAException.XAER_PROTO); 603 } 604 if (equals(associatedXid, xid)) { 605 // TODO determine if this can happen... I think not. 606 x = (XATransactionId)transactionId; 607 } else { 608 x = new XATransactionId(xid); 609 } 610 611 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 612 613 try { 614 // Tell the server to forget the transaction. 615 syncSendPacketWithInterruptionHandling(info); 616 } catch (JMSException e) { 617 throw toXAException(e); 618 } 619 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 620 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 621 } 622 } 623 624 public boolean isSameRM(XAResource xaResource) throws XAException { 625 if (xaResource == null) { 626 return false; 627 } 628 if (!(xaResource instanceof TransactionContext)) { 629 return false; 630 } 631 TransactionContext xar = (TransactionContext)xaResource; 632 try { 633 return getResourceManagerId().equals(xar.getResourceManagerId()); 634 } catch (Throwable e) { 635 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 636 } 637 } 638 639 public Xid[] recover(int flag) throws XAException { 640 if (LOG.isDebugEnabled()) { 641 LOG.debug("Recover: " + flag); 642 } 643 644 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 645 try { 646 this.connection.checkClosedOrFailed(); 647 this.connection.ensureConnectionInfoSent(); 648 649 DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); 650 DataStructure[] data = receipt.getData(); 651 XATransactionId[] answer; 652 if (data instanceof XATransactionId[]) { 653 answer = (XATransactionId[])data; 654 } else { 655 answer = new XATransactionId[data.length]; 656 System.arraycopy(data, 0, answer, 0, data.length); 657 } 658 return answer; 659 } catch (JMSException e) { 660 throw toXAException(e); 661 } 662 } 663 664 public int getTransactionTimeout() throws XAException { 665 return 0; 666 } 667 668 public boolean setTransactionTimeout(int seconds) throws XAException { 669 return false; 670 } 671 672 // /////////////////////////////////////////////////////////// 673 // 674 // Helper methods. 675 // 676 // /////////////////////////////////////////////////////////// 677 private String getResourceManagerId() throws JMSException { 678 return this.connection.getResourceManagerId(); 679 } 680 681 private void setXid(Xid xid) throws XAException { 682 683 try { 684 this.connection.checkClosedOrFailed(); 685 this.connection.ensureConnectionInfoSent(); 686 } catch (JMSException e) { 687 throw toXAException(e); 688 } 689 690 if (xid != null) { 691 // associate 692 associatedXid = xid; 693 transactionId = new XATransactionId(xid); 694 695 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN); 696 try { 697 this.connection.asyncSendPacket(info); 698 if (LOG.isDebugEnabled()) { 699 LOG.debug("Started XA transaction: " + transactionId); 700 } 701 } catch (JMSException e) { 702 throw toXAException(e); 703 } 704 705 } else { 706 707 if (transactionId != null) { 708 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END); 709 try { 710 syncSendPacketWithInterruptionHandling(info); 711 if (LOG.isDebugEnabled()) { 712 LOG.debug("Ended XA transaction: " + transactionId); 713 } 714 } catch (JMSException e) { 715 throw toXAException(e); 716 } 717 718 // Add our self to the list of contexts that are interested in 719 // post commit/rollback events. 720 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 721 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 722 if (l == null) { 723 l = new ArrayList<TransactionContext>(3); 724 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 725 l.add(this); 726 } else if (!l.contains(this)) { 727 l.add(this); 728 } 729 } 730 } 731 732 // dis-associate 733 associatedXid = null; 734 transactionId = null; 735 } 736 } 737 738 /** 739 * Sends the given command. Also sends the command in case of interruption, 740 * so that important commands like rollback and commit are never interrupted. 741 * If interruption occurred, set the interruption state of the current 742 * after performing the action again. 743 * 744 * @return the response 745 */ 746 private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException { 747 try { 748 return this.connection.syncSendPacket(command); 749 } catch (JMSException e) { 750 if (e.getLinkedException() instanceof InterruptedIOException) { 751 try { 752 Thread.interrupted(); 753 return this.connection.syncSendPacket(command); 754 } finally { 755 Thread.currentThread().interrupt(); 756 } 757 } 758 759 throw e; 760 } 761 } 762 763 /** 764 * Converts a JMSException from the server to an XAException. if the 765 * JMSException contained a linked XAException that is returned instead. 766 * 767 * @param e JMSException to convert 768 * @return XAException wrapping original exception or its message 769 */ 770 private XAException toXAException(JMSException e) { 771 if (e.getCause() != null && e.getCause() instanceof XAException) { 772 XAException original = (XAException)e.getCause(); 773 XAException xae = new XAException(original.getMessage()); 774 xae.errorCode = original.errorCode; 775 xae.initCause(original); 776 return xae; 777 } 778 779 XAException xae = new XAException(e.getMessage()); 780 xae.errorCode = XAException.XAER_RMFAIL; 781 xae.initCause(e); 782 return xae; 783 } 784 785 public ActiveMQConnection getConnection() { 786 return connection; 787 } 788 789 public void cleanup() { 790 associatedXid = null; 791 transactionId = null; 792 } 793 794 @Override 795 public String toString() { 796 return "TransactionContext{" + 797 "transactionId=" + transactionId + 798 '}'; 799 } 800}