001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.IOException; 020import java.nio.channels.spi.AbstractSelectableChannel; 021import java.util.LinkedList; 022import java.util.concurrent.Executor; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.ThreadFactory; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028 029/** 030 * The SelectorManager will manage one Selector and the thread that checks the 031 * selector. 032 * 033 * We may need to consider running more than one thread to check the selector if 034 * servicing the selector takes too long. 035 */ 036public final class SelectorManager { 037 038 public static final SelectorManager SINGLETON = new SelectorManager(); 039 040 private Executor selectorExecutor = createDefaultExecutor(); 041 private Executor channelExecutor = selectorExecutor; 042 private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>(); 043 private int maxChannelsPerWorker = 1024; 044 045 protected ExecutorService createDefaultExecutor() { 046 ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 047 new ThreadFactory() { 048 049 private long i = 0; 050 051 @Override 052 public Thread newThread(Runnable runnable) { 053 Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + (i++)); 054 t.setDaemon(true); 055 return t; 056 } 057 }, new ThreadPoolExecutor.CallerRunsPolicy()); 058 059 return rc; 060 } 061 062 private static int getDefaultCorePoolSize() { 063 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10); 064 } 065 066 private static int getDefaultMaximumPoolSize() { 067 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", 1024); 068 } 069 070 private static int getDefaultKeepAliveTime() { 071 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30); 072 } 073 074 public static SelectorManager getInstance() { 075 return SINGLETON; 076 } 077 078 public interface Listener { 079 void onSelect(SelectorSelection selector); 080 081 void onError(SelectorSelection selection, Throwable error); 082 } 083 084 public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException { 085 SelectorSelection selection = null; 086 while (selection == null) { 087 if (freeWorkers.size() > 0) { 088 SelectorWorker worker = freeWorkers.getFirst(); 089 if (worker.isReleased()) { 090 freeWorkers.remove(worker); 091 } else { 092 worker.retain(); 093 selection = new SelectorSelection(worker, selectableChannel, listener); 094 } 095 } else { 096 // Worker starts /w retain count of 1 097 SelectorWorker worker = new SelectorWorker(this); 098 freeWorkers.addFirst(worker); 099 selection = new SelectorSelection(worker, selectableChannel, listener); 100 } 101 } 102 103 return selection; 104 } 105 106 synchronized void onWorkerFullEvent(SelectorWorker worker) { 107 freeWorkers.remove(worker); 108 } 109 110 public synchronized void onWorkerEmptyEvent(SelectorWorker worker) { 111 freeWorkers.remove(worker); 112 } 113 114 public synchronized void onWorkerNotFullEvent(SelectorWorker worker) { 115 freeWorkers.addFirst(worker); 116 } 117 118 public Executor getChannelExecutor() { 119 return channelExecutor; 120 } 121 122 public void setChannelExecutor(Executor channelExecutor) { 123 this.channelExecutor = channelExecutor; 124 } 125 126 public int getMaxChannelsPerWorker() { 127 return maxChannelsPerWorker; 128 } 129 130 public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { 131 this.maxChannelsPerWorker = maxChannelsPerWorker; 132 } 133 134 public Executor getSelectorExecutor() { 135 return selectorExecutor; 136 } 137 138 public void setSelectorExecutor(Executor selectorExecutor) { 139 this.selectorExecutor = selectorExecutor; 140 } 141}