Angabe ThreadPoolExecutor Problem
-
07-07-2019 - |
Frage
Gibt es eine Möglichkeit Executor zu schaffen, haben immer mindestens 5 Threads und maximal 20 Threads und unbeschränkte Warteschlange für Aufgaben (dh keine Aufgabe abgelehnt wird)
Ich habe versucht, neue ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue)
mit allen Möglichkeiten, die ich dachte, der für Warteschlange:
new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected
und keiner arbeitete als wollte.
Lösung
Vielleicht so etwas wie dies würde für Sie arbeiten? Ich peitschte es nur, so stecken Sie sie an. Im Grunde ist es einen Überlauf implementiert Thread-Pools, der verwendet wird, um die darunter liegenden ThreadPoolExecutor
einzuzuspeisen
Es gibt zwei Haupt ziehen Rücken ich es sehe:
- Das Fehlen einer zurück Zukunft Objekt auf
submit()
. Aber vielleicht ist das kein Problem für Sie. - Die sekundäre Warteschlange wird nur in den
ThreadPoolExecutor
leer, wenn Aufträge vorgelegt werden. Es hat bekam eine elegante Lösung sein, aber ich weiß es jetzt noch nicht sehen. Wenn Sie, dass es wissen eine Stelle Strom von Aufgaben in dieStusMagicExecutor
sein wird, dann kann dies kein Problem sein. ( „Kann“ das Schlüsselwort zu sein.) Eine Option könnte sein, Ihre eingereichten Aufgaben amStusMagicExecutor
Sack zu haben, nachdem sie vollständig?
Stu Magic Zieher:
public class StusMagicExecutor extends ThreadPoolExecutor {
private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE.
public StusMagicExecutor() {
super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());
}
public void queueRejectedTask(Runnable task) {
try {
secondaryQueue.put(task);
} catch (InterruptedException e) {
// do something
}
}
public Future submit(Runnable newTask) {
//drain secondary queue as rejection handler populates it
Collection<Runnable> tasks = new ArrayList<Runnable>();
secondaryQueue.drainTo(tasks);
tasks.add(newTask);
for (Runnable task : tasks)
super.submit(task);
return null; //does not return a future!
}
}
class RejectionHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
((StusMagicExecutor)executor).queueRejectedTask(runnable);
}
}
Andere Tipps
Die javadocs für ThreadPoolExecutor
sind ziemlich klar, dass, sobald corePoolSize
Threads erstellt wurden, werden neue Themen nur einmal erstellt werden die Warteschlange voll ist. Also, wenn Sie core
bis 5 und max
bis 20 gesetzt, werden Sie nie Ihr gewünschtes Verhalten bekommen.
Wenn Sie jedoch beide core
und max
auf 20 gesetzt ist, dann werden Aufgaben nur die Warteschlange hinzugefügt bekommen, wenn alle 20 Fäden beschäftigt sind. Natürlich ist dies Ihre „5 Threads Minimum“ Anforderung ein bisschen sinnlos macht, da alle 20 am Leben gehalten werden (bis sie im Leerlauf aus, sowieso).
Ich denke, das Problem ist ein Mangel der Klasse ist und sehr irreführend die Konstruktor Parameterkombinationen angegeben. Hier ist eine Lösung von Swingworker Innen ThreadPoolExecutor genommen, die ich in eine Top-Level-Klasse gemacht. Es ist nicht ein Minimum hat, aber nicht zumindest eine obere Schranke verwenden. Das einzige, was ich nicht weiß, ist, was Leistung schlagen Sie erhalten aus der Sperr auszuführen.
public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = pauseLock.newCondition();
private boolean isPaused = false;
private final ReentrantLock executeLock = new ReentrantLock();
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
@Override
public void execute(Runnable command) {
executeLock.lock();
try {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
setCorePoolSize(getMaximumPoolSize());
super.execute(command);
setCorePoolSize(0);
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
} finally {
executeLock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
pauseLock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException ignore) {
} finally {
pauseLock.unlock();
}
}
}