001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.thread;
018
019import java.util.concurrent.Executor;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.SynchronousQueue;
022import java.util.concurrent.ThreadFactory;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicLong;
027
028/**
029 * Manages the thread pool for long running tasks. Long running tasks are not
030 * always active but when they are active, they may need a few iterations of
031 * processing for them to become idle. The manager ensures that each task is
032 * processes but that no one task overtakes the system. This is kinda like
033 * cooperative multitasking.
034 *
035 * @org.apache.xbean.XBean
036 */
037public class TaskRunnerFactory implements Executor {
038
039    private ExecutorService executor;
040    private int maxIterationsPerRun;
041    private String name;
042    private int priority;
043    private boolean daemon;
044    private AtomicLong id = new AtomicLong(0);
045    private boolean dedicatedTaskRunner;
046    private AtomicBoolean initDone = new AtomicBoolean(false);
047
048    public TaskRunnerFactory() {
049        this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
050    }
051
052    private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
053        this(name,priority,daemon,maxIterationsPerRun,false);
054    }
055
056    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
057        this.name = name;
058        this.priority = priority;
059        this.daemon = daemon;
060        this.maxIterationsPerRun = maxIterationsPerRun;
061        this.dedicatedTaskRunner = dedicatedTaskRunner;
062    }
063
064    public void init() {
065        if (initDone.compareAndSet(false, true)) {
066            // If your OS/JVM combination has a good thread model, you may want to
067            // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
068            if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
069                executor = null;
070            } else if (executor == null) {
071                executor = createDefaultExecutor();
072            }
073        }
074    }
075
076    public void shutdown() {
077        if (executor != null) {
078            executor.shutdownNow();
079        }
080        initDone.set(false);
081    }
082
083    public TaskRunner createTaskRunner(Task task, String name) {
084        init();
085        if (executor != null) {
086            return new PooledTaskRunner(executor, task, maxIterationsPerRun);
087        } else {
088            return new DedicatedTaskRunner(task, name, priority, daemon);
089        }
090    }
091
092    public void execute(Runnable runnable) {
093        execute(runnable, "ActiveMQ Task");
094    }
095
096    public void execute(Runnable runnable, String name) {
097        init();
098        if (executor != null) {
099            executor.execute(runnable);
100        } else {
101            new Thread(runnable, name + "-" + id.incrementAndGet()).start();
102        }
103    }
104
105    protected ExecutorService createDefaultExecutor() {
106        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
107            public Thread newThread(Runnable runnable) {
108                Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
109                thread.setDaemon(daemon);
110                thread.setPriority(priority);
111                return thread;
112            }
113        });
114        return rc;
115    }
116
117    public ExecutorService getExecutor() {
118        return executor;
119    }
120
121    public void setExecutor(ExecutorService executor) {
122        this.executor = executor;
123    }
124
125    public int getMaxIterationsPerRun() {
126        return maxIterationsPerRun;
127    }
128
129    public void setMaxIterationsPerRun(int maxIterationsPerRun) {
130        this.maxIterationsPerRun = maxIterationsPerRun;
131    }
132
133    public String getName() {
134        return name;
135    }
136
137    public void setName(String name) {
138        this.name = name;
139    }
140
141    public int getPriority() {
142        return priority;
143    }
144
145    public void setPriority(int priority) {
146        this.priority = priority;
147    }
148
149    public boolean isDaemon() {
150        return daemon;
151    }
152
153    public void setDaemon(boolean daemon) {
154        this.daemon = daemon;
155    }
156
157    public boolean isDedicatedTaskRunner() {
158        return dedicatedTaskRunner;
159    }
160
161    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
162        this.dedicatedTaskRunner = dedicatedTaskRunner;
163    }
164}