001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.Serializable; 020 021import org.slf4j.Logger; 022import org.slf4j.LoggerFactory; 023 024/** 025 * Defines the prefetch message policies for different types of consumers 026 * 027 * @org.apache.xbean.XBean element="prefetchPolicy" 028 * 029 */ 030@SuppressWarnings("serial") 031public class ActiveMQPrefetchPolicy extends Object implements Serializable { 032 033 public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE; 034 public static final int DEFAULT_QUEUE_PREFETCH = 1000; 035 public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500; 036 public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100; 037 public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH = 1000; 038 public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE; 039 040 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class); 041 042 private int queuePrefetch; 043 private int queueBrowserPrefetch; 044 private int topicPrefetch; 045 private int durableTopicPrefetch; 046 private int optimizeDurableTopicPrefetch; 047 private int maximumPendingMessageLimit; 048 049 /** 050 * Initialize default prefetch policies 051 */ 052 public ActiveMQPrefetchPolicy() { 053 this.queuePrefetch = DEFAULT_QUEUE_PREFETCH; 054 this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH; 055 this.topicPrefetch = DEFAULT_TOPIC_PREFETCH; 056 this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH; 057 this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH; 058 } 059 060 /** 061 * @return Returns the durableTopicPrefetch. 062 */ 063 public int getDurableTopicPrefetch() { 064 return durableTopicPrefetch; 065 } 066 067 /** 068 * @param durableTopicPrefetch 069 * The durableTopicPrefetch to set. 070 */ 071 public void setDurableTopicPrefetch(int durableTopicPrefetch) { 072 this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch); 073 } 074 075 /** 076 * @return Returns the queuePrefetch. 077 */ 078 public int getQueuePrefetch() { 079 return queuePrefetch; 080 } 081 082 /** 083 * @param queuePrefetch 084 * The queuePrefetch to set. 085 */ 086 public void setQueuePrefetch(int queuePrefetch) { 087 this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch); 088 } 089 090 /** 091 * @return Returns the queueBrowserPrefetch. 092 */ 093 public int getQueueBrowserPrefetch() { 094 return queueBrowserPrefetch; 095 } 096 097 /** 098 * @param queueBrowserPrefetch 099 * The queueBrowserPrefetch to set. 100 */ 101 public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { 102 this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch); 103 } 104 105 /** 106 * @return Returns the topicPrefetch. 107 */ 108 public int getTopicPrefetch() { 109 return topicPrefetch; 110 } 111 112 /** 113 * @param topicPrefetch 114 * The topicPrefetch to set. 115 */ 116 public void setTopicPrefetch(int topicPrefetch) { 117 this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); 118 } 119 120 /** 121 * @return Returns the optimizeDurableTopicPrefetch. 122 */ 123 public int getOptimizeDurableTopicPrefetch() { 124 return optimizeDurableTopicPrefetch; 125 } 126 127 /** 128 * @param optimizeAcknowledgePrefetch 129 * The optimizeDurableTopicPrefetch to set. 130 */ 131 public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) { 132 this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch; 133 } 134 135 public int getMaximumPendingMessageLimit() { 136 return maximumPendingMessageLimit; 137 } 138 139 /** 140 * Sets how many messages a broker will keep around, above the prefetch 141 * limit, for non-durable topics before starting to discard older messages. 142 */ 143 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { 144 this.maximumPendingMessageLimit = maximumPendingMessageLimit; 145 } 146 147 private int getMaxPrefetchLimit(int value) { 148 int result = Math.min(value, MAX_PREFETCH_SIZE); 149 if (result < value) { 150 LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE); 151 } 152 return result; 153 } 154 155 public void setAll(int i) { 156 this.durableTopicPrefetch = getMaxPrefetchLimit(i); 157 this.queueBrowserPrefetch = getMaxPrefetchLimit(i); 158 this.queuePrefetch = getMaxPrefetchLimit(i); 159 this.topicPrefetch = getMaxPrefetchLimit(i); 160 this.optimizeDurableTopicPrefetch = getMaxPrefetchLimit(i); 161 } 162 163 @Override 164 public boolean equals(Object object) { 165 if (object instanceof ActiveMQPrefetchPolicy) { 166 ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object; 167 return this.queuePrefetch == other.queuePrefetch && 168 this.queueBrowserPrefetch == other.queueBrowserPrefetch && 169 this.topicPrefetch == other.topicPrefetch && 170 this.durableTopicPrefetch == other.durableTopicPrefetch && 171 this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch; 172 } 173 return false; 174 } 175}