Frage

Gibt es eine Möglichkeit Executor zu schaffen, haben immer mindestens 5 Threads und maximal 20 Threads und unbeschränkte Warteschlange für Aufgaben (dh keine Aufgabe abgelehnt wird)

Ich habe versucht, neue ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) mit allen Möglichkeiten, die ich dachte, der für Warteschlange:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

und keiner arbeitete als wollte.

War es hilfreich?

Lösung

Vielleicht so etwas wie dies würde für Sie arbeiten? Ich peitschte es nur, so stecken Sie sie an. Im Grunde ist es einen Überlauf implementiert Thread-Pools, der verwendet wird, um die darunter liegenden ThreadPoolExecutor einzuzuspeisen

Es gibt zwei Haupt ziehen Rücken ich es sehe:

  • Das Fehlen einer zurück Zukunft Objekt auf submit(). Aber vielleicht ist das kein Problem für Sie.
  • Die sekundäre Warteschlange wird nur in den ThreadPoolExecutor leer, wenn Aufträge vorgelegt werden. Es hat bekam eine elegante Lösung sein, aber ich weiß es jetzt noch nicht sehen. Wenn Sie, dass es wissen eine Stelle Strom von Aufgaben in die StusMagicExecutor sein wird, dann kann dies kein Problem sein. ( „Kann“ das Schlüsselwort zu sein.) Eine Option könnte sein, Ihre eingereichten Aufgaben am StusMagicExecutor Sack zu haben, nachdem sie vollständig?

Stu Magic Zieher:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}

Andere Tipps

Die javadocs für ThreadPoolExecutor sind ziemlich klar, dass, sobald corePoolSize Threads erstellt wurden, werden neue Themen nur einmal erstellt werden die Warteschlange voll ist. Also, wenn Sie core bis 5 und max bis 20 gesetzt, werden Sie nie Ihr gewünschtes Verhalten bekommen.

Wenn Sie jedoch beide core und max auf 20 gesetzt ist, dann werden Aufgaben nur die Warteschlange hinzugefügt bekommen, wenn alle 20 Fäden beschäftigt sind. Natürlich ist dies Ihre „5 Threads Minimum“ Anforderung ein bisschen sinnlos macht, da alle 20 am Leben gehalten werden (bis sie im Leerlauf aus, sowieso).

Ich denke, das Problem ist ein Mangel der Klasse ist und sehr irreführend die Konstruktor Parameterkombinationen angegeben. Hier ist eine Lösung von Swingworker Innen ThreadPoolExecutor genommen, die ich in eine Top-Level-Klasse gemacht. Es ist nicht ein Minimum hat, aber nicht zumindest eine obere Schranke verwenden. Das einzige, was ich nicht weiß, ist, was Leistung schlagen Sie erhalten aus der Sperr auszuführen.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top