Pergunta

Estou experimentando algoritmos paralelos em Java. Comecei com a fusão e publiquei minha tentativa neste pergunta. Minha tentativa revisada está no código abaixo, onde agora tento paralelizar a classificação rápida.

Existem erros de novato na minha implementação ou abordagem multithread para esse problema? Caso contrário, não deveria esperar um aumento de velocidade de 32% entre um algoritmo sequencial e um paralelo em um núcleo de duelo (veja os horários na parte inferior)?

Aqui está o algoritmo multithreading:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Aqui está como eu começo:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

Eu testei isso contra o Arrays.Sort e um algoritmo de classificação rápida seqüencial semelhante. Aqui estão os resultados do tempo em um laptop Dell de núcleo de duelo Intel, em segundos:

Elementos: 500.000, sequencial: 0.068592, rosqueado: 0,046871, Arrays.sort: 0,079677

Elementos: 1.000.000, sequencial: 0.14416, rosqueado: 0,095492, Arrays.sort: 0.167155

Elementos: 2.000.000, sequencial: 0,301666, rosqueado: 0,205719, Arrays.sort: 0,350982

Elementos: 4.000.000, sequencial: 0,623291, rosqueado: 0,424119, Arrays.sort: 0,712698

Elementos: 8.000.000, sequencial: 1.279374, rosqueado: 0,859363, Arrays.sort: 1.487671

Cada número acima é o tempo médio de 100 testes, jogando fora os 3 casos mais baixos e 3 mais altos. Eu usei Random.NextInt (Integer.max_value) para gerar uma matriz para cada teste, que foi inicializado uma vez a cada 10 testes com a mesma semente. Cada teste consistiu em tempo do algoritmo fornecido com o System.Nanotime. Eu entrei em seis lugares decimais após a média. E obviamente, eu verifiquei se cada tipo trabalhado.

Como você pode ver, há um aumento de 32% na velocidade entre os casos seqüenciais e rosqueados em todos os conjuntos de testes. Como perguntei acima, não deveria esperar mais do que isso?

Foi útil?

Solução

Fazer o NumThreads estático pode causar problemas, é altamente provável que você termine com mais do que max_threads em algum momento.

Provavelmente, a razão pela qual você não obtém um duplo no desempenho é que seu tipo rápido não pode ser totalmente paralelo. Observe que a primeira chamada para o QuickSort fará uma passagem por toda a matriz no thread inicial antes de começar a ser executado em paralelo. Há também uma sobrecarga em paralelismo, um algoritmo na forma de transição de comutação e modo de contexto ao cultivar fios separados.

Dê uma olhada na estrutura do garfo/junção, esse problema provavelmente se encaixaria bem lá.

Alguns pontos na implementação. Implemente o Runnable em vez de estender o encadeamento. Estender um encadeamento deve ser usado apenas quando você criar uma nova versão da classe Thread. Quando você só deseja que algum trabalho seja executado em paralelo, é melhor com o Runnable. Ao implementar um executável, você também pode estender outra classe que oferece mais flexibilidade no design do OO. Use um pool de threads restrito ao número de threads que você possui disponível no sistema. Além disso, não use o NumThreads para tomar a decisão sobre o desvio de um novo thread ou não. Você pode calcular isso na frente. Use um tamanho mínimo de partição do tamanho da matriz total dividida pelo número de processadores disponíveis. Algo como:

public class ThreadedQuick implements Runnable {

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

    final int[] my_array;
    final int start, end;

    private final int minParitionSize;

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }

    public void run() {
        quicksort(my_array, start, end);
    }

    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;

        if (len <= 1)
            return;

        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];

        swap(array, pivot_index, end);

        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }

        swap(array, storeIndex, end);

        if (len > minParitionSize) {

            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);

            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}

Você pode começar fazendo:

ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();

Isso iniciará a classificação no mesmo thread, que evita um salto desnecessário de threads no início.

Advertência: Não tenho certeza se a implementação acima será mais rápida, pois não a comparei.

Outras dicas

Isso usa uma combinação de classificação rápida e classificação de mesclagem.

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ParallelSortMain {
    public static void main(String... args) throws InterruptedException {
        Random rand = new Random();
        final int[] values = new int[100*1024*1024];
        for (int i = 0; i < values.length; i++)
            values[i] = rand.nextInt();

        int threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(threads);
        int blockSize = (values.length + threads - 1) / threads;
        for (int i = 0; i < values.length; i += blockSize) {
            final int min = i;
            final int max = Math.min(min + blockSize, values.length);
            es.submit(new Runnable() {
                @Override
                public void run() {
                    Arrays.sort(values, min, max);
                }
            });
        }
        es.shutdown();
        es.awaitTermination(10, TimeUnit.MINUTES);
        for (int blockSize2 = blockSize; blockSize2 < values.length / 2; blockSize2 *= 2) {
            for (int i = 0; i < values.length; i += blockSize2) {
                final int min = i;
                final int mid = Math.min(min + blockSize2, values.length);
                final int max = Math.min(min + blockSize2 * 2, values.length);
                mergeSort(values, min, mid, max);
            }
        }
    }

    private static boolean mergeSort(int[] values, int left, int mid, int end) {
        int[] results = new int[end - left];
        int l = left, r = mid, m = 0;
        for (; l < left && r < mid; m++) {
            int lv = values[l];
            int rv = values[r];
            if (lv < rv) {
                results[m] = lv;
                l++;
            } else {
                results[m] = rv;
                r++;
            }
        }
        while (l < mid)
            results[m++] = values[l++];
        while (r < end)
            results[m++] = values[r++];
        System.arraycopy(results, 0, values, left, results.length);
        return false;
    }
}

Alguns comentários se eu entender seu código certo:

  1. Não vejo uma trava ao redor do objeto Numthreads, mesmo que ele possa ser acessado através de vários threads. Talvez você deva torná -lo um atômico.

  2. Use um pool de threads e organize as tarefas, ou seja, uma única chamada para o QuickSort, para obter o vantagem de um pool de threads. Use futuros.

Seu método atual de dividir as coisas da maneira que você está fazendo pode deixar uma divisão menor com um tópico e uma divisão maior sem um thread. Ou seja, ele não prioriza segmentos maiores com seus próprios tópicos.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top