ThreadPoolExecutor問題の特定
-
07-07-2019 - |
質問
常に少なくとも5スレッド、最大20スレッド、およびタスクの無制限キュー(タスクが拒否されないことを意味する)を持つExecutorを作成する方法はありますか
新しい ThreadPoolExecutor(5、20、60L、TimeUnit.SECONDS、queue)を試しました
キューについて考えたすべての可能性:
new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected
そして期待どおりに機能するものはありませんでした。
解決
たぶん、このような何かがあなたのために働くでしょうか?ホイップしただけなので、突いてください。基本的に、基になる ThreadPoolExecutor
2つの大きな欠点があります:
-
submit()
に返されるFutureオブジェクトの欠如。しかし、それはあなたにとって問題ではないかもしれません。 - ジョブが送信されると、セカンダリキューは
ThreadPoolExecutor
にのみ空になります。エレガントな解決策が必要ですが、まだわかりません。StusMagicExecutor
に安定したタスクストリームがあることがわかっている場合、これは問題にならない可能性があります。 (" 5月"がキーワードです。)オプションは、送信されたタスクが完了した後にStusMagicExecutor
で突くようにすることでしょうか?
Stu's Magic Executor:
public class StusMagicExecutor extends ThreadPoolExecutor {
private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE.
public StusMagicExecutor() {
super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());
}
public void queueRejectedTask(Runnable task) {
try {
secondaryQueue.put(task);
} catch (InterruptedException e) {
// do something
}
}
public Future submit(Runnable newTask) {
//drain secondary queue as rejection handler populates it
Collection<Runnable> tasks = new ArrayList<Runnable>();
secondaryQueue.drainTo(tasks);
tasks.add(newTask);
for (Runnable task : tasks)
super.submit(task);
return null; //does not return a future!
}
}
class RejectionHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
((StusMagicExecutor)executor).queueRejectedTask(runnable);
}
}
他のヒント
ThreadPoolExecutor
のjavadocは、 corePoolSize
スレッドが作成されると、キューがいっぱいになって初めて新しいスレッドが作成されることを明確にしています。したがって、 core
を5に、 max
を20に設定すると、希望する動作が得られなくなります。
ただし、 core
と max
の両方を20に設定すると、20個のスレッドすべてがビジーの場合にのみタスクがキューに追加されます。もちろん、これにより、「5スレッド最小」がレンダリングされます。 20個すべてが(とにかくアイドル状態になるまで)存続するため、要件は無意味です。
この問題はクラスの欠点であり、コンストラクターのパラメーターの組み合わせを考えると非常に誤解を招くと思います。これは、SwingWorkerの内部ThreadPoolExecutorからトップレベルクラスにしたソリューションです。最小値はありませんが、少なくとも上限を使用します。私が知らない唯一のことは、ロックの実行によってパフォーマンスが低下することです。
public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = pauseLock.newCondition();
private boolean isPaused = false;
private final ReentrantLock executeLock = new ReentrantLock();
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
@Override
public void execute(Runnable command) {
executeLock.lock();
try {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
setCorePoolSize(getMaximumPoolSize());
super.execute(command);
setCorePoolSize(0);
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
} finally {
executeLock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
pauseLock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException ignore) {
} finally {
pauseLock.unlock();
}
}
}