سؤال

هل هناك أي طريقة إنشاء المنفذ الذي سوف يكون دائما ما لا يقل عن 5 مواضيع و أقصى 20 المواضيع و غير محدود طابور المهام (بمعنى لا مهمة رفض)

حاولت جديدة ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) مع كل الاحتمالات التي فكرت لرتل:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

و لا شيء يعمل كما يريد.

هل كانت مفيدة؟

المحلول

ربما شيء من هذا القبيل سوف تعمل من أجلك ؟ أنا فقط جلد عنه لذا يرجى كزة في ذلك.أساسا ، فإنه يطبق تجاوز تجمع مؤشرات الترابط التي يتم استخدامها لتغذية الأساسية ThreadPoolExecutor

هناك نوعان من كبرى رسم ظهورهم أرى مع ذلك:

  • عدم إرجاع كائن في المستقبل submit().ولكن ربما هذه ليست مشكلة بالنسبة لك.
  • الثانوية الانتظار فقط فارغة إلى ThreadPoolExecutor عندما تقدم فرص عمل.يجب ان يكون هناك حل رائع لكن أنا لا أرى ذلك فقط حتى الآن.إذا كنت تعرف أنه سيكون هناك بدلا تيار من المهام في StusMagicExecutor ثم هذا قد لا يكون مشكلة.("قد" هي الكلمة المفتاح.) خيار قد يكون لديك الخاص بك المقدمة المهام كزة في StusMagicExecutor بعد إكمال ؟

ستو السحر المنفذ:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}

نصائح أخرى

ووjavadocs لThreadPoolExecutor واضحة جدا أنه بمجرد أن تم إنشاؤها المواضيع corePoolSize، ولن يتم إنشاؤها مواضيع جديدة مرة واحدة قائمة الانتظار ممتلئ. حتى إذا قمت بتعيين core إلى 5 وmax إلى 20، فلن تحصل السلوك المطلوب.

ولكن، إذا قمت بتعيين كلا core وmax إلى 20، ثم المهام سوف فقط الحصول على إضافة إلى قائمة الانتظار إذا كان كل المواضيع 20 مشغولة. وبطبيعة الحال، وهذا يجعل الخاص بك "5 المواضيع أدنى" شرط لا معنى لها قليلا، لأن كل 20 ستبقى على قيد الحياة (حتى الخمول، على أي حال).

وأعتقد أن هذه المشكلة هي أوجه القصور في الدرجة ومضللة للغاية نظرا لمجموعات المعلمة المنشئ. وهنا الحل مأخوذ من ThreadPoolExecutor SwingWorker والداخلية التي قمت بها إلى فئة أعلى مستوى. لم يكن لديك الحد الأدنى ولكن لا يقل استخدام الحد الأعلى. الشيء الوحيد الذي لا نعرفه هو ما أصاب الأداء الذي تحصل عليه من تأمين تنفيذ.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top