001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Properties; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.CopyOnWriteArrayList; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicReference; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039 040import javax.transaction.xa.XAResource; 041 042import org.apache.activemq.advisory.AdvisorySupport; 043import org.apache.activemq.broker.region.ConnectionStatistics; 044import org.apache.activemq.broker.region.DurableTopicSubscription; 045import org.apache.activemq.broker.region.RegionBroker; 046import org.apache.activemq.broker.region.TopicRegion; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.BrokerInfo; 049import org.apache.activemq.command.BrokerSubscriptionInfo; 050import org.apache.activemq.command.Command; 051import org.apache.activemq.command.CommandTypes; 052import org.apache.activemq.command.ConnectionControl; 053import org.apache.activemq.command.ConnectionError; 054import org.apache.activemq.command.ConnectionId; 055import org.apache.activemq.command.ConnectionInfo; 056import org.apache.activemq.command.ConsumerControl; 057import org.apache.activemq.command.ConsumerId; 058import org.apache.activemq.command.ConsumerInfo; 059import org.apache.activemq.command.ControlCommand; 060import org.apache.activemq.command.DataArrayResponse; 061import org.apache.activemq.command.DestinationInfo; 062import org.apache.activemq.command.ExceptionResponse; 063import org.apache.activemq.command.FlushCommand; 064import org.apache.activemq.command.IntegerResponse; 065import org.apache.activemq.command.KeepAliveInfo; 066import org.apache.activemq.command.Message; 067import org.apache.activemq.command.MessageAck; 068import org.apache.activemq.command.MessageDispatch; 069import org.apache.activemq.command.MessageDispatchNotification; 070import org.apache.activemq.command.MessagePull; 071import org.apache.activemq.command.ProducerAck; 072import org.apache.activemq.command.ProducerId; 073import org.apache.activemq.command.ProducerInfo; 074import org.apache.activemq.command.RemoveInfo; 075import org.apache.activemq.command.RemoveSubscriptionInfo; 076import org.apache.activemq.command.Response; 077import org.apache.activemq.command.SessionId; 078import org.apache.activemq.command.SessionInfo; 079import org.apache.activemq.command.ShutdownInfo; 080import org.apache.activemq.command.TransactionId; 081import org.apache.activemq.command.TransactionInfo; 082import org.apache.activemq.command.WireFormatInfo; 083import org.apache.activemq.network.DemandForwardingBridge; 084import org.apache.activemq.network.MBeanNetworkListener; 085import org.apache.activemq.network.NetworkBridgeConfiguration; 086import org.apache.activemq.network.NetworkBridgeFactory; 087import org.apache.activemq.network.NetworkConnector; 088import org.apache.activemq.security.MessageAuthorizationPolicy; 089import org.apache.activemq.state.CommandVisitor; 090import org.apache.activemq.state.ConnectionState; 091import org.apache.activemq.state.ConsumerState; 092import org.apache.activemq.state.ProducerState; 093import org.apache.activemq.state.SessionState; 094import org.apache.activemq.state.TransactionState; 095import org.apache.activemq.thread.Task; 096import org.apache.activemq.thread.TaskRunner; 097import org.apache.activemq.thread.TaskRunnerFactory; 098import org.apache.activemq.transaction.Transaction; 099import org.apache.activemq.transport.DefaultTransportListener; 100import org.apache.activemq.transport.ResponseCorrelator; 101import org.apache.activemq.transport.TransmitCallback; 102import org.apache.activemq.transport.Transport; 103import org.apache.activemq.transport.TransportDisposedIOException; 104import org.apache.activemq.util.IntrospectionSupport; 105import org.apache.activemq.util.MarshallingSupport; 106import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; 107import org.apache.activemq.util.SubscriptionKey; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110import org.slf4j.MDC; 111 112public class TransportConnection implements Connection, Task, CommandVisitor { 113 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 114 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 115 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 116 // Keeps track of the broker and connector that created this connection. 117 protected final Broker broker; 118 protected final BrokerService brokerService; 119 protected final TransportConnector connector; 120 // Keeps track of the state of the connections. 121 // protected final ConcurrentHashMap localConnectionStates=new 122 // ConcurrentHashMap(); 123 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 124 // The broker and wireformat info that was exchanged. 125 protected BrokerInfo brokerInfo; 126 protected final List<Command> dispatchQueue = new LinkedList<>(); 127 protected TaskRunner taskRunner; 128 protected final AtomicReference<Throwable> transportException = new AtomicReference<>(); 129 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 130 private final Transport transport; 131 private MessageAuthorizationPolicy messageAuthorizationPolicy; 132 private WireFormatInfo wireFormatInfo; 133 // Used to do async dispatch.. this should perhaps be pushed down into the 134 // transport layer.. 135 private boolean inServiceException; 136 private final ConnectionStatistics statistics = new ConnectionStatistics(); 137 private boolean manageable; 138 private boolean slow; 139 private boolean markedCandidate; 140 private boolean blockedCandidate; 141 private boolean blocked; 142 private boolean connected; 143 private boolean active; 144 private boolean starting; 145 private boolean pendingStop; 146 private long timeStamp; 147 private final AtomicBoolean stopping = new AtomicBoolean(false); 148 private final CountDownLatch stopped = new CountDownLatch(1); 149 private final AtomicBoolean asyncException = new AtomicBoolean(false); 150 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>(); 151 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>(); 152 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 153 private ConnectionContext context; 154 private boolean networkConnection; 155 private boolean faultTolerantConnection; 156 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 157 private DemandForwardingBridge duplexBridge; 158 private final TaskRunnerFactory taskRunnerFactory; 159 private final TaskRunnerFactory stopTaskRunnerFactory; 160 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 161 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 162 private String duplexNetworkConnectorId; 163 164 /** 165 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 166 * else commands are sent async. 167 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 168 */ 169 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 170 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 171 this.connector = connector; 172 this.broker = broker; 173 this.brokerService = broker.getBrokerService(); 174 175 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 176 brokerConnectionStates = rb.getConnectionStates(); 177 if (connector != null) { 178 this.statistics.setParent(connector.getStatistics()); 179 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 180 } 181 this.taskRunnerFactory = taskRunnerFactory; 182 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 183 this.transport = transport; 184 if( this.transport instanceof BrokerServiceAware ) { 185 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 186 } 187 this.transport.setTransportListener(new DefaultTransportListener() { 188 @Override 189 public void onCommand(Object o) { 190 serviceLock.readLock().lock(); 191 try { 192 if (!(o instanceof Command)) { 193 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 194 } 195 Command command = (Command) o; 196 if (!brokerService.isStopping()) { 197 Response response = service(command); 198 if (response != null && !brokerService.isStopping()) { 199 dispatchSync(response); 200 } 201 } else { 202 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 203 } 204 } finally { 205 serviceLock.readLock().unlock(); 206 } 207 } 208 209 @Override 210 public void onException(IOException exception) { 211 serviceLock.readLock().lock(); 212 try { 213 serviceTransportException(exception); 214 } finally { 215 serviceLock.readLock().unlock(); 216 } 217 } 218 }); 219 connected = true; 220 } 221 222 /** 223 * Returns the number of messages to be dispatched to this connection 224 * 225 * @return size of dispatch queue 226 */ 227 @Override 228 public int getDispatchQueueSize() { 229 synchronized (dispatchQueue) { 230 return dispatchQueue.size(); 231 } 232 } 233 234 public void serviceTransportException(IOException e) { 235 if (!stopping.get() && !pendingStop) { 236 transportException.set(e); 237 if (TRANSPORTLOG.isDebugEnabled()) { 238 TRANSPORTLOG.debug(this + " failed: " + e, e); 239 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 240 TRANSPORTLOG.warn(this + " failed: " + e); 241 } 242 stopAsync(e); 243 } 244 } 245 246 private boolean expected(IOException e) { 247 return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 248 } 249 250 private boolean isStomp() { 251 URI uri = connector.getUri(); 252 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 253 } 254 255 /** 256 * Calls the serviceException method in an async thread. Since handling a 257 * service exception closes a socket, we should not tie up broker threads 258 * since client sockets may hang or cause deadlocks. 259 */ 260 @Override 261 public void serviceExceptionAsync(final IOException e) { 262 if (asyncException.compareAndSet(false, true)) { 263 new Thread("Async Exception Handler") { 264 @Override 265 public void run() { 266 serviceException(e); 267 } 268 }.start(); 269 } 270 } 271 272 /** 273 * Closes a clients connection due to a detected error. Errors are ignored 274 * if: the client is closing or broker is closing. Otherwise, the connection 275 * error transmitted to the client before stopping it's transport. 276 */ 277 @Override 278 public void serviceException(Throwable e) { 279 // are we a transport exception such as not being able to dispatch 280 // synchronously to a transport 281 if (e instanceof IOException) { 282 serviceTransportException((IOException) e); 283 } else if (e.getClass() == BrokerStoppedException.class) { 284 // Handle the case where the broker is stopped 285 // But the client is still connected. 286 if (!stopping.get()) { 287 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 288 ConnectionError ce = new ConnectionError(); 289 ce.setException(e); 290 dispatchSync(ce); 291 // Record the error that caused the transport to stop 292 transportException.set(e); 293 // Wait a little bit to try to get the output buffer to flush 294 // the exception notification to the client. 295 try { 296 Thread.sleep(500); 297 } catch (InterruptedException ie) { 298 Thread.currentThread().interrupt(); 299 } 300 // Worst case is we just kill the connection before the 301 // notification gets to him. 302 stopAsync(); 303 } 304 } else if (!stopping.get() && !inServiceException) { 305 inServiceException = true; 306 try { 307 if (SERVICELOG.isDebugEnabled()) { 308 SERVICELOG.debug("Async error occurred: " + e, e); 309 } else { 310 SERVICELOG.warn("Async error occurred: " + e); 311 } 312 ConnectionError ce = new ConnectionError(); 313 ce.setException(e); 314 if (pendingStop) { 315 dispatchSync(ce); 316 } else { 317 dispatchAsync(ce); 318 } 319 } finally { 320 inServiceException = false; 321 } 322 } 323 } 324 325 @Override 326 public Response service(Command command) { 327 MDC.put("activemq.connector", connector.getUri().toString()); 328 Response response = null; 329 boolean responseRequired = command.isResponseRequired(); 330 int commandId = command.getCommandId(); 331 try { 332 if (!pendingStop) { 333 response = command.visit(this); 334 } else { 335 response = new ExceptionResponse(transportException.get()); 336 } 337 } catch (Throwable e) { 338 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 339 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 340 + " command: " + command + ", exception: " + e, e); 341 } 342 343 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 344 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 345 responseRequired = false; 346 } 347 348 if (responseRequired) { 349 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 350 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 351 transport.getRemoteAddress(), e.getMessage()); 352 } 353 response = new ExceptionResponse(e); 354 } else { 355 forceRollbackOnlyOnFailedAsyncTransactionOp(e, command); 356 serviceException(e); 357 } 358 } 359 if (responseRequired) { 360 if (response == null) { 361 response = new Response(); 362 } 363 response.setCorrelationId(commandId); 364 } 365 // The context may have been flagged so that the response is not 366 // sent. 367 if (context != null) { 368 if (context.isDontSendReponse()) { 369 context.setDontSendReponse(false); 370 response = null; 371 } 372 context = null; 373 } 374 MDC.remove("activemq.connector"); 375 return response; 376 } 377 378 private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) { 379 if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) { 380 Transaction transaction = getActiveTransaction(command); 381 if (transaction != null && !transaction.isRollbackOnly()) { 382 LOG.debug("on async exception, force rollback of transaction for: " + command, e); 383 transaction.setRollbackOnly(e); 384 } 385 } 386 } 387 388 private Transaction getActiveTransaction(Command command) { 389 Transaction transaction = null; 390 try { 391 if (command instanceof Message) { 392 Message messageSend = (Message) command; 393 ProducerId producerId = messageSend.getProducerId(); 394 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 395 transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId()); 396 } else if (command instanceof MessageAck) { 397 MessageAck messageAck = (MessageAck) command; 398 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId()); 399 if (consumerExchange != null) { 400 transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId()); 401 } 402 } 403 } catch(Exception ignored){ 404 LOG.trace("failed to find active transaction for command: " + command, ignored); 405 } 406 return transaction; 407 } 408 409 private boolean isInTransaction(Command command) { 410 return command instanceof Message && ((Message)command).isInTransaction() 411 || command instanceof MessageAck && ((MessageAck)command).isInTransaction(); 412 } 413 414 @Override 415 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 416 return null; 417 } 418 419 @Override 420 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 421 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 422 return null; 423 } 424 425 @Override 426 public Response processWireFormat(WireFormatInfo info) throws Exception { 427 wireFormatInfo = info; 428 protocolVersion.set(info.getVersion()); 429 return null; 430 } 431 432 @Override 433 public Response processShutdown(ShutdownInfo info) throws Exception { 434 stopAsync(); 435 return null; 436 } 437 438 @Override 439 public Response processFlush(FlushCommand command) throws Exception { 440 return null; 441 } 442 443 @Override 444 public Response processBeginTransaction(TransactionInfo info) throws Exception { 445 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 446 context = null; 447 if (cs != null) { 448 context = cs.getContext(); 449 } 450 if (cs == null) { 451 throw new NullPointerException("Context is null"); 452 } 453 // Avoid replaying dup commands 454 if (cs.getTransactionState(info.getTransactionId()) == null) { 455 cs.addTransactionState(info.getTransactionId()); 456 broker.beginTransaction(context, info.getTransactionId()); 457 } 458 return null; 459 } 460 461 @Override 462 public int getActiveTransactionCount() { 463 int rc = 0; 464 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 465 Collection<TransactionState> transactions = cs.getTransactionStates(); 466 for (TransactionState transaction : transactions) { 467 rc++; 468 } 469 } 470 return rc; 471 } 472 473 @Override 474 public Long getOldestActiveTransactionDuration() { 475 TransactionState oldestTX = null; 476 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 477 Collection<TransactionState> transactions = cs.getTransactionStates(); 478 for (TransactionState transaction : transactions) { 479 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 480 oldestTX = transaction; 481 } 482 } 483 } 484 if( oldestTX == null ) { 485 return null; 486 } 487 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 488 } 489 490 @Override 491 public Response processEndTransaction(TransactionInfo info) throws Exception { 492 // No need to do anything. This packet is just sent by the client 493 // make sure he is synced with the server as commit command could 494 // come from a different connection. 495 return null; 496 } 497 498 @Override 499 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 500 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 501 context = null; 502 if (cs != null) { 503 context = cs.getContext(); 504 } 505 if (cs == null) { 506 throw new NullPointerException("Context is null"); 507 } 508 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 509 if (transactionState == null) { 510 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 511 + info.getTransactionId()); 512 } 513 // Avoid dups. 514 if (!transactionState.isPrepared()) { 515 transactionState.setPrepared(true); 516 int result = broker.prepareTransaction(context, info.getTransactionId()); 517 transactionState.setPreparedResult(result); 518 if (result == XAResource.XA_RDONLY) { 519 // we are done, no further rollback or commit from TM 520 cs.removeTransactionState(info.getTransactionId()); 521 } 522 IntegerResponse response = new IntegerResponse(result); 523 return response; 524 } else { 525 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 526 return response; 527 } 528 } 529 530 @Override 531 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 532 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 533 context = cs.getContext(); 534 cs.removeTransactionState(info.getTransactionId()); 535 broker.commitTransaction(context, info.getTransactionId(), true); 536 return null; 537 } 538 539 @Override 540 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 541 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 542 context = cs.getContext(); 543 cs.removeTransactionState(info.getTransactionId()); 544 broker.commitTransaction(context, info.getTransactionId(), false); 545 return null; 546 } 547 548 @Override 549 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 550 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 551 context = cs.getContext(); 552 cs.removeTransactionState(info.getTransactionId()); 553 broker.rollbackTransaction(context, info.getTransactionId()); 554 return null; 555 } 556 557 @Override 558 public Response processForgetTransaction(TransactionInfo info) throws Exception { 559 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 560 context = cs.getContext(); 561 broker.forgetTransaction(context, info.getTransactionId()); 562 return null; 563 } 564 565 @Override 566 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 567 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 568 context = cs.getContext(); 569 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 570 return new DataArrayResponse(preparedTransactions); 571 } 572 573 @Override 574 public Response processMessage(Message messageSend) throws Exception { 575 ProducerId producerId = messageSend.getProducerId(); 576 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 577 if (producerExchange.canDispatch(messageSend)) { 578 broker.send(producerExchange, messageSend); 579 } 580 return null; 581 } 582 583 @Override 584 public Response processMessageAck(MessageAck ack) throws Exception { 585 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 586 if (consumerExchange != null) { 587 broker.acknowledge(consumerExchange, ack); 588 } else if (ack.isInTransaction()) { 589 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 590 } 591 return null; 592 } 593 594 @Override 595 public Response processMessagePull(MessagePull pull) throws Exception { 596 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 597 } 598 599 @Override 600 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 601 broker.processDispatchNotification(notification); 602 return null; 603 } 604 605 @Override 606 public Response processAddDestination(DestinationInfo info) throws Exception { 607 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 608 broker.addDestinationInfo(cs.getContext(), info); 609 if (info.getDestination().isTemporary()) { 610 cs.addTempDestination(info); 611 } 612 return null; 613 } 614 615 @Override 616 public Response processRemoveDestination(DestinationInfo info) throws Exception { 617 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 618 broker.removeDestinationInfo(cs.getContext(), info); 619 if (info.getDestination().isTemporary()) { 620 cs.removeTempDestination(info.getDestination()); 621 } 622 return null; 623 } 624 625 @Override 626 public Response processAddProducer(ProducerInfo info) throws Exception { 627 SessionId sessionId = info.getProducerId().getParentId(); 628 ConnectionId connectionId = sessionId.getParentId(); 629 TransportConnectionState cs = lookupConnectionState(connectionId); 630 if (cs == null) { 631 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 632 + connectionId); 633 } 634 SessionState ss = cs.getSessionState(sessionId); 635 if (ss == null) { 636 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 637 + sessionId); 638 } 639 // Avoid replaying dup commands 640 if (!ss.getProducerIds().contains(info.getProducerId())) { 641 ActiveMQDestination destination = info.getDestination(); 642 // Do not check for null here as it would cause the count of max producers to exclude 643 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 644 // call it from here with a null Destination value. 645 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 646 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 647 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 648 } 649 } 650 broker.addProducer(cs.getContext(), info); 651 try { 652 ss.addProducer(info); 653 } catch (IllegalStateException e) { 654 broker.removeProducer(cs.getContext(), info); 655 } 656 657 } 658 return null; 659 } 660 661 @Override 662 public Response processRemoveProducer(ProducerId id) throws Exception { 663 SessionId sessionId = id.getParentId(); 664 ConnectionId connectionId = sessionId.getParentId(); 665 TransportConnectionState cs = lookupConnectionState(connectionId); 666 SessionState ss = cs.getSessionState(sessionId); 667 if (ss == null) { 668 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 669 + sessionId); 670 } 671 ProducerState ps = ss.removeProducer(id); 672 if (ps == null) { 673 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 674 } 675 removeProducerBrokerExchange(id); 676 broker.removeProducer(cs.getContext(), ps.getInfo()); 677 return null; 678 } 679 680 @Override 681 public Response processAddConsumer(ConsumerInfo info) throws Exception { 682 SessionId sessionId = info.getConsumerId().getParentId(); 683 ConnectionId connectionId = sessionId.getParentId(); 684 TransportConnectionState cs = lookupConnectionState(connectionId); 685 if (cs == null) { 686 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 687 + connectionId); 688 } 689 SessionState ss = cs.getSessionState(sessionId); 690 if (ss == null) { 691 throw new IllegalStateException(broker.getBrokerName() 692 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 693 } 694 // Avoid replaying dup commands 695 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 696 ActiveMQDestination destination = info.getDestination(); 697 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 698 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 699 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 700 } 701 } 702 703 broker.addConsumer(cs.getContext(), info); 704 try { 705 ss.addConsumer(info); 706 addConsumerBrokerExchange(cs, info.getConsumerId()); 707 } catch (IllegalStateException e) { 708 broker.removeConsumer(cs.getContext(), info); 709 } 710 711 } 712 return null; 713 } 714 715 @Override 716 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 717 SessionId sessionId = id.getParentId(); 718 ConnectionId connectionId = sessionId.getParentId(); 719 TransportConnectionState cs = lookupConnectionState(connectionId); 720 if (cs == null) { 721 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 722 + connectionId); 723 } 724 SessionState ss = cs.getSessionState(sessionId); 725 if (ss == null) { 726 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 727 + sessionId); 728 } 729 ConsumerState consumerState = ss.removeConsumer(id); 730 if (consumerState == null) { 731 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 732 } 733 ConsumerInfo info = consumerState.getInfo(); 734 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 735 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 736 removeConsumerBrokerExchange(id); 737 return null; 738 } 739 740 @Override 741 public Response processAddSession(SessionInfo info) throws Exception { 742 ConnectionId connectionId = info.getSessionId().getParentId(); 743 TransportConnectionState cs = lookupConnectionState(connectionId); 744 // Avoid replaying dup commands 745 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 746 broker.addSession(cs.getContext(), info); 747 try { 748 cs.addSession(info); 749 } catch (IllegalStateException e) { 750 LOG.warn("Failed to add session: {}", info.getSessionId(), e); 751 broker.removeSession(cs.getContext(), info); 752 } 753 } 754 return null; 755 } 756 757 @Override 758 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 759 ConnectionId connectionId = id.getParentId(); 760 TransportConnectionState cs = lookupConnectionState(connectionId); 761 if (cs == null) { 762 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 763 } 764 SessionState session = cs.getSessionState(id); 765 if (session == null) { 766 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 767 } 768 // Don't let new consumers or producers get added while we are closing 769 // this down. 770 session.shutdown(); 771 // Cascade the connection stop to the consumers and producers. 772 for (ConsumerId consumerId : session.getConsumerIds()) { 773 try { 774 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 775 } catch (Throwable e) { 776 LOG.warn("Failed to remove consumer: {}", consumerId, e); 777 } 778 } 779 for (ProducerId producerId : session.getProducerIds()) { 780 try { 781 processRemoveProducer(producerId); 782 } catch (Throwable e) { 783 LOG.warn("Failed to remove producer: {}", producerId, e); 784 } 785 } 786 cs.removeSession(id); 787 broker.removeSession(cs.getContext(), session.getInfo()); 788 return null; 789 } 790 791 @Override 792 public Response processAddConnection(ConnectionInfo info) throws Exception { 793 // Older clients should have been defaulting this field to true.. but 794 // they were not. 795 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 796 info.setClientMaster(true); 797 } 798 TransportConnectionState state; 799 // Make sure 2 concurrent connections by the same ID only generate 1 800 // TransportConnectionState object. 801 synchronized (brokerConnectionStates) { 802 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 803 if (state == null) { 804 state = new TransportConnectionState(info, this); 805 brokerConnectionStates.put(info.getConnectionId(), state); 806 } 807 state.incrementReference(); 808 } 809 // If there are 2 concurrent connections for the same connection id, 810 // then last one in wins, we need to sync here 811 // to figure out the winner. 812 synchronized (state.getConnectionMutex()) { 813 if (state.getConnection() != this) { 814 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 815 state.getConnection().stop(); 816 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 817 state.setConnection(this); 818 state.reset(info); 819 } 820 } 821 registerConnectionState(info.getConnectionId(), state); 822 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 823 this.faultTolerantConnection = info.isFaultTolerant(); 824 // Setup the context. 825 String clientId = info.getClientId(); 826 context = new ConnectionContext(); 827 context.setBroker(broker); 828 context.setClientId(clientId); 829 context.setClientMaster(info.isClientMaster()); 830 context.setConnection(this); 831 context.setConnectionId(info.getConnectionId()); 832 context.setConnector(connector); 833 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 834 context.setNetworkConnection(networkConnection); 835 context.setFaultTolerant(faultTolerantConnection); 836 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 837 context.setUserName(info.getUserName()); 838 context.setWireFormatInfo(wireFormatInfo); 839 context.setReconnect(info.isFailoverReconnect()); 840 this.manageable = info.isManageable(); 841 context.setConnectionState(state); 842 state.setContext(context); 843 state.setConnection(this); 844 if (info.getClientIp() == null) { 845 info.setClientIp(getRemoteAddress()); 846 } 847 848 try { 849 broker.addConnection(context, info); 850 } catch (Exception e) { 851 synchronized (brokerConnectionStates) { 852 brokerConnectionStates.remove(info.getConnectionId()); 853 } 854 unregisterConnectionState(info.getConnectionId()); 855 LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e); 856 if (e instanceof SecurityException) { 857 // close this down - in case the peer of this transport doesn't play nice 858 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 859 } 860 throw e; 861 } 862 if (info.isManageable()) { 863 // send ConnectionCommand 864 ConnectionControl command = this.connector.getConnectionControl(); 865 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 866 if (info.isFailoverReconnect()) { 867 command.setRebalanceConnection(false); 868 } 869 dispatchAsync(command); 870 } 871 return null; 872 } 873 874 @Override 875 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 876 throws InterruptedException { 877 LOG.debug("remove connection id: {}", id); 878 TransportConnectionState cs = lookupConnectionState(id); 879 if (cs != null) { 880 // Don't allow things to be added to the connection state while we 881 // are shutting down. 882 cs.shutdown(); 883 // Cascade the connection stop to the sessions. 884 for (SessionId sessionId : cs.getSessionIds()) { 885 try { 886 processRemoveSession(sessionId, lastDeliveredSequenceId); 887 } catch (Throwable e) { 888 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 889 } 890 } 891 // Cascade the connection stop to temp destinations. 892 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 893 DestinationInfo di = iter.next(); 894 try { 895 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 896 } catch (Throwable e) { 897 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 898 } 899 iter.remove(); 900 } 901 try { 902 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 903 } catch (Throwable e) { 904 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 905 } 906 TransportConnectionState state = unregisterConnectionState(id); 907 if (state != null) { 908 synchronized (brokerConnectionStates) { 909 // If we are the last reference, we should remove the state 910 // from the broker. 911 if (state.decrementReference() == 0) { 912 brokerConnectionStates.remove(id); 913 } 914 } 915 } 916 } 917 return null; 918 } 919 920 @Override 921 public Response processProducerAck(ProducerAck ack) throws Exception { 922 // A broker should not get ProducerAck messages. 923 return null; 924 } 925 926 @Override 927 public Connector getConnector() { 928 return connector; 929 } 930 931 @Override 932 public void dispatchSync(Command message) { 933 try { 934 processDispatch(message); 935 } catch (IOException e) { 936 serviceExceptionAsync(e); 937 } 938 } 939 940 @Override 941 public void dispatchAsync(Command message) { 942 if (!stopping.get()) { 943 if (taskRunner == null) { 944 dispatchSync(message); 945 } else { 946 synchronized (dispatchQueue) { 947 dispatchQueue.add(message); 948 } 949 try { 950 taskRunner.wakeup(); 951 } catch (InterruptedException e) { 952 Thread.currentThread().interrupt(); 953 } 954 } 955 } else { 956 if (message.isMessageDispatch()) { 957 MessageDispatch md = (MessageDispatch) message; 958 TransmitCallback sub = md.getTransmitCallback(); 959 broker.postProcessDispatch(md); 960 if (sub != null) { 961 sub.onFailure(); 962 } 963 } 964 } 965 } 966 967 protected void processDispatch(Command command) throws IOException { 968 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 969 try { 970 if (!stopping.get()) { 971 if (messageDispatch != null) { 972 try { 973 broker.preProcessDispatch(messageDispatch); 974 } catch (RuntimeException convertToIO) { 975 throw new IOException(convertToIO); 976 } 977 } 978 dispatch(command); 979 } 980 } catch (IOException e) { 981 if (messageDispatch != null) { 982 TransmitCallback sub = messageDispatch.getTransmitCallback(); 983 broker.postProcessDispatch(messageDispatch); 984 if (sub != null) { 985 sub.onFailure(); 986 } 987 messageDispatch = null; 988 throw e; 989 } 990 } finally { 991 if (messageDispatch != null) { 992 TransmitCallback sub = messageDispatch.getTransmitCallback(); 993 broker.postProcessDispatch(messageDispatch); 994 if (sub != null) { 995 sub.onSuccess(); 996 } 997 } 998 } 999 } 1000 1001 @Override 1002 public boolean iterate() { 1003 try { 1004 if (pendingStop || stopping.get()) { 1005 if (dispatchStopped.compareAndSet(false, true)) { 1006 if (transportException.get() == null) { 1007 try { 1008 dispatch(new ShutdownInfo()); 1009 } catch (Throwable ignore) { 1010 } 1011 } 1012 dispatchStoppedLatch.countDown(); 1013 } 1014 return false; 1015 } 1016 if (!dispatchStopped.get()) { 1017 Command command = null; 1018 synchronized (dispatchQueue) { 1019 if (dispatchQueue.isEmpty()) { 1020 return false; 1021 } 1022 command = dispatchQueue.remove(0); 1023 } 1024 processDispatch(command); 1025 return true; 1026 } 1027 return false; 1028 } catch (IOException e) { 1029 if (dispatchStopped.compareAndSet(false, true)) { 1030 dispatchStoppedLatch.countDown(); 1031 } 1032 serviceExceptionAsync(e); 1033 return false; 1034 } 1035 } 1036 1037 /** 1038 * Returns the statistics for this connection 1039 */ 1040 @Override 1041 public ConnectionStatistics getStatistics() { 1042 return statistics; 1043 } 1044 1045 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1046 return messageAuthorizationPolicy; 1047 } 1048 1049 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1050 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1051 } 1052 1053 @Override 1054 public boolean isManageable() { 1055 return manageable; 1056 } 1057 1058 @Override 1059 public void start() throws Exception { 1060 try { 1061 synchronized (this) { 1062 starting = true; 1063 if (taskRunnerFactory != null) { 1064 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1065 + getRemoteAddress()); 1066 } else { 1067 taskRunner = null; 1068 } 1069 transport.start(); 1070 active = true; 1071 BrokerInfo info = connector.getBrokerInfo().copy(); 1072 if (connector.isUpdateClusterClients()) { 1073 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1074 } else { 1075 info.setPeerBrokerInfos(null); 1076 } 1077 dispatchAsync(info); 1078 1079 connector.onStarted(this); 1080 } 1081 } catch (Exception e) { 1082 // Force clean up on an error starting up. 1083 pendingStop = true; 1084 throw e; 1085 } finally { 1086 // stop() can be called from within the above block, 1087 // but we want to be sure start() completes before 1088 // stop() runs, so queue the stop until right now: 1089 setStarting(false); 1090 if (isPendingStop()) { 1091 LOG.debug("Calling the delayed stop() after start() {}", this); 1092 stop(); 1093 } 1094 } 1095 } 1096 1097 @Override 1098 public void stop() throws Exception { 1099 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1100 // as their lifecycle is handled elsewhere 1101 1102 stopAsync(); 1103 while (!stopped.await(5, TimeUnit.SECONDS)) { 1104 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1105 } 1106 } 1107 1108 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1109 if (waitTime > 0) { 1110 synchronized (this) { 1111 pendingStop = true; 1112 transportException.set(cause); 1113 } 1114 try { 1115 stopTaskRunnerFactory.execute(new Runnable() { 1116 @Override 1117 public void run() { 1118 try { 1119 Thread.sleep(waitTime); 1120 stopAsync(); 1121 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1122 } catch (InterruptedException e) { 1123 } 1124 } 1125 }); 1126 } catch (Throwable t) { 1127 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1128 } 1129 } 1130 } 1131 1132 public void stopAsync(Throwable cause) { 1133 transportException.set(cause); 1134 stopAsync(); 1135 } 1136 1137 public void stopAsync() { 1138 // If we're in the middle of starting then go no further... for now. 1139 synchronized (this) { 1140 pendingStop = true; 1141 if (starting) { 1142 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1143 return; 1144 } 1145 } 1146 if (stopping.compareAndSet(false, true)) { 1147 // Let all the connection contexts know we are shutting down 1148 // so that in progress operations can notice and unblock. 1149 List<TransportConnectionState> connectionStates = listConnectionStates(); 1150 for (TransportConnectionState cs : connectionStates) { 1151 ConnectionContext connectionContext = cs.getContext(); 1152 if (connectionContext != null) { 1153 connectionContext.getStopping().set(true); 1154 } 1155 } 1156 try { 1157 stopTaskRunnerFactory.execute(new Runnable() { 1158 @Override 1159 public void run() { 1160 serviceLock.writeLock().lock(); 1161 try { 1162 doStop(); 1163 } catch (Throwable e) { 1164 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1165 } finally { 1166 stopped.countDown(); 1167 serviceLock.writeLock().unlock(); 1168 } 1169 } 1170 }); 1171 } catch (Throwable t) { 1172 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1173 stopped.countDown(); 1174 } 1175 } 1176 } 1177 1178 @Override 1179 public String toString() { 1180 return "Transport Connection to: " + transport.getRemoteAddress(); 1181 } 1182 1183 protected void doStop() throws Exception { 1184 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1185 connector.onStopped(this); 1186 try { 1187 synchronized (this) { 1188 if (duplexBridge != null) { 1189 duplexBridge.stop(); 1190 } 1191 } 1192 } catch (Exception ignore) { 1193 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1194 } 1195 try { 1196 transport.stop(); 1197 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1198 } catch (Exception e) { 1199 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1200 } 1201 if (taskRunner != null) { 1202 taskRunner.shutdown(1); 1203 taskRunner = null; 1204 } 1205 active = false; 1206 // Run the MessageDispatch callbacks so that message references get 1207 // cleaned up. 1208 synchronized (dispatchQueue) { 1209 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1210 Command command = iter.next(); 1211 if (command.isMessageDispatch()) { 1212 MessageDispatch md = (MessageDispatch) command; 1213 TransmitCallback sub = md.getTransmitCallback(); 1214 broker.postProcessDispatch(md); 1215 if (sub != null) { 1216 sub.onFailure(); 1217 } 1218 } 1219 } 1220 dispatchQueue.clear(); 1221 } 1222 // 1223 // Remove all logical connection associated with this connection 1224 // from the broker. 1225 if (!broker.isStopped()) { 1226 List<TransportConnectionState> connectionStates = listConnectionStates(); 1227 connectionStates = listConnectionStates(); 1228 for (TransportConnectionState cs : connectionStates) { 1229 cs.getContext().getStopping().set(true); 1230 try { 1231 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1232 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1233 } catch (Throwable ignore) { 1234 LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore); 1235 } 1236 } 1237 } 1238 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1239 } 1240 1241 /** 1242 * @return Returns the blockedCandidate. 1243 */ 1244 public boolean isBlockedCandidate() { 1245 return blockedCandidate; 1246 } 1247 1248 /** 1249 * @param blockedCandidate The blockedCandidate to set. 1250 */ 1251 public void setBlockedCandidate(boolean blockedCandidate) { 1252 this.blockedCandidate = blockedCandidate; 1253 } 1254 1255 /** 1256 * @return Returns the markedCandidate. 1257 */ 1258 public boolean isMarkedCandidate() { 1259 return markedCandidate; 1260 } 1261 1262 /** 1263 * @param markedCandidate The markedCandidate to set. 1264 */ 1265 public void setMarkedCandidate(boolean markedCandidate) { 1266 this.markedCandidate = markedCandidate; 1267 if (!markedCandidate) { 1268 timeStamp = 0; 1269 blockedCandidate = false; 1270 } 1271 } 1272 1273 /** 1274 * @param slow The slow to set. 1275 */ 1276 public void setSlow(boolean slow) { 1277 this.slow = slow; 1278 } 1279 1280 /** 1281 * @return true if the Connection is slow 1282 */ 1283 @Override 1284 public boolean isSlow() { 1285 return slow; 1286 } 1287 1288 /** 1289 * @return true if the Connection is potentially blocked 1290 */ 1291 public boolean isMarkedBlockedCandidate() { 1292 return markedCandidate; 1293 } 1294 1295 /** 1296 * Mark the Connection, so we can deem if it's collectable on the next sweep 1297 */ 1298 public void doMark() { 1299 if (timeStamp == 0) { 1300 timeStamp = System.currentTimeMillis(); 1301 } 1302 } 1303 1304 /** 1305 * @return if after being marked, the Connection is still writing 1306 */ 1307 @Override 1308 public boolean isBlocked() { 1309 return blocked; 1310 } 1311 1312 /** 1313 * @return true if the Connection is connected 1314 */ 1315 @Override 1316 public boolean isConnected() { 1317 return connected; 1318 } 1319 1320 /** 1321 * @param blocked The blocked to set. 1322 */ 1323 public void setBlocked(boolean blocked) { 1324 this.blocked = blocked; 1325 } 1326 1327 /** 1328 * @param connected The connected to set. 1329 */ 1330 public void setConnected(boolean connected) { 1331 this.connected = connected; 1332 } 1333 1334 /** 1335 * @return true if the Connection is active 1336 */ 1337 @Override 1338 public boolean isActive() { 1339 return active; 1340 } 1341 1342 /** 1343 * @param active The active to set. 1344 */ 1345 public void setActive(boolean active) { 1346 this.active = active; 1347 } 1348 1349 /** 1350 * @return true if the Connection is starting 1351 */ 1352 public synchronized boolean isStarting() { 1353 return starting; 1354 } 1355 1356 @Override 1357 public synchronized boolean isNetworkConnection() { 1358 return networkConnection; 1359 } 1360 1361 @Override 1362 public boolean isFaultTolerantConnection() { 1363 return this.faultTolerantConnection; 1364 } 1365 1366 protected synchronized void setStarting(boolean starting) { 1367 this.starting = starting; 1368 } 1369 1370 /** 1371 * @return true if the Connection needs to stop 1372 */ 1373 public synchronized boolean isPendingStop() { 1374 return pendingStop; 1375 } 1376 1377 protected synchronized void setPendingStop(boolean pendingStop) { 1378 this.pendingStop = pendingStop; 1379 } 1380 1381 public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) { 1382 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 1383 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 1384 List<ConsumerInfo> subscriptionInfos = new ArrayList<>(); 1385 for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) { 1386 DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key); 1387 if (sub != null) { 1388 ConsumerInfo ci = sub.getConsumerInfo().copy(); 1389 ci.setClientId(key.getClientId()); 1390 subscriptionInfos.add(ci); 1391 } 1392 } 1393 BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName()); 1394 bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0])); 1395 return bsi; 1396 } 1397 1398 private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException { 1399 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1400 Map<String, String> props = createMap(properties); 1401 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1402 IntrospectionSupport.setProperties(config, props, ""); 1403 return config; 1404 } 1405 1406 @Override 1407 public Response processBrokerInfo(BrokerInfo info) { 1408 if (info.isSlaveBroker()) { 1409 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1410 } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { 1411 try { 1412 NetworkBridgeConfiguration config = getNetworkConfiguration(info); 1413 if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 1414 LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); 1415 dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService())); 1416 } 1417 } catch (Exception e) { 1418 LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); 1419 return null; 1420 } 1421 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1422 // so this TransportConnection is the rear end of a network bridge 1423 // We have been requested to create a two way pipe ... 1424 try { 1425 NetworkBridgeConfiguration config = getNetworkConfiguration(info); 1426 config.setBrokerName(broker.getBrokerName()); 1427 1428 if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) { 1429 LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); 1430 dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService())); 1431 } 1432 1433 // check for existing duplex connection hanging about 1434 1435 // We first look if existing network connection already exists for the same broker Id and network connector name 1436 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1437 // and the duplex network connector side wanting to open a new one 1438 // In this case, the old connection must be broken 1439 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1440 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1441 synchronized (connections) { 1442 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1443 TransportConnection c = iter.next(); 1444 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1445 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1446 c.stopAsync(); 1447 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1448 c.getStopped().await(1, TimeUnit.SECONDS); 1449 } 1450 } 1451 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1452 } 1453 Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); 1454 Transport remoteBridgeTransport = transport; 1455 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1456 // the vm transport case is already wrapped 1457 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1458 } 1459 String duplexName = localTransport.toString(); 1460 if (duplexName.contains("#")) { 1461 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1462 } 1463 MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName)); 1464 listener.setCreatedByDuplex(true); 1465 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1466 duplexBridge.setBrokerService(brokerService); 1467 //Need to set durableDestinations to properly restart subs when dynamicOnly=false 1468 duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations( 1469 broker.getDurableDestinations())); 1470 1471 // now turn duplex off this side 1472 info.setDuplexConnection(false); 1473 duplexBridge.setCreatedByDuplex(true); 1474 duplexBridge.duplexStart(this, brokerInfo, info); 1475 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1476 return null; 1477 } catch (TransportDisposedIOException e) { 1478 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1479 return null; 1480 } catch (Exception e) { 1481 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1482 return null; 1483 } 1484 } 1485 // We only expect to get one broker info command per connection 1486 if (this.brokerInfo != null) { 1487 LOG.warn("Unexpected extra broker info command received: {}", info); 1488 } 1489 this.brokerInfo = info; 1490 networkConnection = true; 1491 List<TransportConnectionState> connectionStates = listConnectionStates(); 1492 for (TransportConnectionState cs : connectionStates) { 1493 cs.getContext().setNetworkConnection(true); 1494 } 1495 return null; 1496 } 1497 1498 @SuppressWarnings({"unchecked", "rawtypes"}) 1499 private HashMap<String, String> createMap(Properties properties) { 1500 return new HashMap(properties); 1501 } 1502 1503 protected void dispatch(Command command) throws IOException { 1504 try { 1505 setMarkedCandidate(true); 1506 transport.oneway(command); 1507 } finally { 1508 setMarkedCandidate(false); 1509 } 1510 } 1511 1512 @Override 1513 public String getRemoteAddress() { 1514 return transport.getRemoteAddress(); 1515 } 1516 1517 public Transport getTransport() { 1518 return transport; 1519 } 1520 1521 @Override 1522 public String getConnectionId() { 1523 List<TransportConnectionState> connectionStates = listConnectionStates(); 1524 for (TransportConnectionState cs : connectionStates) { 1525 if (cs.getInfo().getClientId() != null) { 1526 return cs.getInfo().getClientId(); 1527 } 1528 return cs.getInfo().getConnectionId().toString(); 1529 } 1530 return null; 1531 } 1532 1533 @Override 1534 public void updateClient(ConnectionControl control) { 1535 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1536 && this.wireFormatInfo.getVersion() >= 6) { 1537 dispatchAsync(control); 1538 } 1539 } 1540 1541 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1542 ProducerBrokerExchange result = null; 1543 if (producerInfo != null && producerInfo.getProducerId() != null){ 1544 synchronized (producerExchanges){ 1545 result = producerExchanges.get(producerInfo.getProducerId()); 1546 } 1547 } 1548 return result; 1549 } 1550 1551 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1552 ProducerBrokerExchange result = producerExchanges.get(id); 1553 if (result == null) { 1554 synchronized (producerExchanges) { 1555 result = new ProducerBrokerExchange(); 1556 TransportConnectionState state = lookupConnectionState(id); 1557 context = state.getContext(); 1558 result.setConnectionContext(context); 1559 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1560 result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id)); 1561 } 1562 SessionState ss = state.getSessionState(id.getParentId()); 1563 if (ss != null) { 1564 result.setProducerState(ss.getProducerState(id)); 1565 ProducerState producerState = ss.getProducerState(id); 1566 if (producerState != null && producerState.getInfo() != null) { 1567 ProducerInfo info = producerState.getInfo(); 1568 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1569 } 1570 } 1571 producerExchanges.put(id, result); 1572 } 1573 } else { 1574 context = result.getConnectionContext(); 1575 } 1576 return result; 1577 } 1578 1579 private void removeProducerBrokerExchange(ProducerId id) { 1580 synchronized (producerExchanges) { 1581 producerExchanges.remove(id); 1582 } 1583 } 1584 1585 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1586 ConsumerBrokerExchange result = consumerExchanges.get(id); 1587 return result; 1588 } 1589 1590 private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) { 1591 ConsumerBrokerExchange result = consumerExchanges.get(id); 1592 if (result == null) { 1593 synchronized (consumerExchanges) { 1594 result = new ConsumerBrokerExchange(); 1595 context = connectionState.getContext(); 1596 result.setConnectionContext(context); 1597 SessionState ss = connectionState.getSessionState(id.getParentId()); 1598 if (ss != null) { 1599 ConsumerState cs = ss.getConsumerState(id); 1600 if (cs != null) { 1601 ConsumerInfo info = cs.getInfo(); 1602 if (info != null) { 1603 if (info.getDestination() != null && info.getDestination().isPattern()) { 1604 result.setWildcard(true); 1605 } 1606 } 1607 } 1608 } 1609 consumerExchanges.put(id, result); 1610 } 1611 } 1612 return result; 1613 } 1614 1615 private void removeConsumerBrokerExchange(ConsumerId id) { 1616 synchronized (consumerExchanges) { 1617 consumerExchanges.remove(id); 1618 } 1619 } 1620 1621 public int getProtocolVersion() { 1622 return protocolVersion.get(); 1623 } 1624 1625 @Override 1626 public Response processControlCommand(ControlCommand command) throws Exception { 1627 return null; 1628 } 1629 1630 @Override 1631 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1632 return null; 1633 } 1634 1635 @Override 1636 public Response processConnectionControl(ConnectionControl control) throws Exception { 1637 if (control != null) { 1638 faultTolerantConnection = control.isFaultTolerant(); 1639 } 1640 return null; 1641 } 1642 1643 @Override 1644 public Response processConnectionError(ConnectionError error) throws Exception { 1645 return null; 1646 } 1647 1648 @Override 1649 public Response processConsumerControl(ConsumerControl control) throws Exception { 1650 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1651 broker.processConsumerControl(consumerExchange, control); 1652 return null; 1653 } 1654 1655 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1656 TransportConnectionState state) { 1657 TransportConnectionState cs = null; 1658 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1659 // swap implementations 1660 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1661 newRegister.intialize(connectionStateRegister); 1662 connectionStateRegister = newRegister; 1663 } 1664 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1665 return cs; 1666 } 1667 1668 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1669 return connectionStateRegister.unregisterConnectionState(connectionId); 1670 } 1671 1672 protected synchronized List<TransportConnectionState> listConnectionStates() { 1673 return connectionStateRegister.listConnectionStates(); 1674 } 1675 1676 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1677 return connectionStateRegister.lookupConnectionState(connectionId); 1678 } 1679 1680 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1681 return connectionStateRegister.lookupConnectionState(id); 1682 } 1683 1684 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1685 return connectionStateRegister.lookupConnectionState(id); 1686 } 1687 1688 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1689 return connectionStateRegister.lookupConnectionState(id); 1690 } 1691 1692 // public only for testing 1693 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1694 return connectionStateRegister.lookupConnectionState(connectionId); 1695 } 1696 1697 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1698 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1699 } 1700 1701 protected synchronized String getDuplexNetworkConnectorId() { 1702 return this.duplexNetworkConnectorId; 1703 } 1704 1705 public boolean isStopping() { 1706 return stopping.get(); 1707 } 1708 1709 protected CountDownLatch getStopped() { 1710 return stopped; 1711 } 1712 1713 private int getProducerCount(ConnectionId connectionId) { 1714 int result = 0; 1715 TransportConnectionState cs = lookupConnectionState(connectionId); 1716 if (cs != null) { 1717 for (SessionId sessionId : cs.getSessionIds()) { 1718 SessionState sessionState = cs.getSessionState(sessionId); 1719 if (sessionState != null) { 1720 result += sessionState.getProducerIds().size(); 1721 } 1722 } 1723 } 1724 return result; 1725 } 1726 1727 private int getConsumerCount(ConnectionId connectionId) { 1728 int result = 0; 1729 TransportConnectionState cs = lookupConnectionState(connectionId); 1730 if (cs != null) { 1731 for (SessionId sessionId : cs.getSessionIds()) { 1732 SessionState sessionState = cs.getSessionState(sessionId); 1733 if (sessionState != null) { 1734 result += sessionState.getConsumerIds().size(); 1735 } 1736 } 1737 } 1738 return result; 1739 } 1740 1741 public WireFormatInfo getRemoteWireFormatInfo() { 1742 return wireFormatInfo; 1743 } 1744 1745 /* (non-Javadoc) 1746 * @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo) 1747 */ 1748 @Override 1749 public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception { 1750 return null; 1751 } 1752}