Domanda

Mi imbatto in molte "parallele imbarazzanti" progetti che vorrei parallelizzare con il modulo multiprocessing . Tuttavia, spesso comportano la lettura di file di grandi dimensioni (superiori a 2 GB), l'elaborazione di riga per riga, l'esecuzione di calcoli di base e la scrittura dei risultati. Qual è il modo migliore per dividere un file ed elaborarlo utilizzando il modulo multiprocessore di Python? Queue o JoinableQueue in multiprocessing devono essere usati? O il modulo Queue stesso? Oppure, dovrei mappare il file iterabile su un pool di processi usando multiprocessing ? Ho sperimentato questi approcci ma l'overhead è immenso nella distribuzione dei dati riga per riga. Ho optato per un design leggero di filtri per tubi usando file cat | process1 --out-file out1 --num-processi 2 | process2 --out-file out2 , che passa una certa percentuale dell'input del primo processo direttamente al secondo input (vedi questo post ), ma mi piacerebbe avere una soluzione interamente contenuta in Python.

Sorprendentemente, la documentazione di Python non suggerisce un modo canonico per farlo (nonostante una lunga sezione sulle linee guida di programmazione nella documentazione multiprocessing ).

Grazie, Vince

Informazioni aggiuntive: il tempo di elaborazione per riga varia. Alcuni problemi sono veloci e quasi non associati agli I / O, altri alla CPU. Le attività non dipendenti dalla CPU guadagneranno il posto dalla parallelizzazione, in modo tale che anche i modi inefficienti di assegnare i dati a una funzione di elaborazione sarebbero ancora utili in termini di tempo di clock.

Un primo esempio è uno script che estrae i campi dalle linee, controlla una varietà di flag bit a bit e scrive le linee con determinati flag in un nuovo file in un formato completamente nuovo. Sembra un problema legato all'I / O, ma quando l'ho eseguito con la mia versione simultanea economica con pipe, era circa il 20% più veloce. Quando lo eseguo con pool e mappa, o faccio la coda in multiprocessing è sempre più lento del 100%.

È stato utile?

Soluzione

Una delle migliori architetture fa già parte del sistema operativo Linux. Non sono necessarie librerie speciali.

Desideri un " fan-out " design.

  1. A "principale" il programma crea una serie di sottoprocessi collegati da pipe.

  2. Il programma principale legge il file, scrivendo le righe nelle pipe eseguendo il filtro minimo richiesto per distribuire le righe ai sottoprocessi appropriati.

Ogni sottoprocesso dovrebbe probabilmente essere una pipeline di processi distinti che leggono e scrivono dallo stdin.

Non hai bisogno di una struttura di dati in coda, questo è esattamente ciò che è una pipeline in memoria: una coda di byte tra due processi simultanei.

Altri suggerimenti

Una strategia consiste nell'assegnare un offset a ciascun lavoratore, quindi se hai otto processi di lavoro che assegni, dai numeri da 0 a 7. Il numero di lavoratore 0 legge i primi processi di registrazione, quindi salta 7 e continua a elaborare l'ottava registrazione ecc., il lavoratore numero 1 legge il secondo record, quindi salta 7 ed elabora il nono record .........

Esistono numerosi vantaggi in questo schema. Non importa quanto sia grande il file, il lavoro è sempre diviso in modo uniforme, i processi sulla stessa macchina verranno elaborati all'incirca alla stessa velocità e utilizzeranno le stesse aree di buffer in modo da non dover sostenere un sovraccarico I / O eccessivo. Finché il file non è stato aggiornato, è possibile rieseguire singoli thread per ripristinare errori.

Non menzionate come state elaborando le linee; forse l'informazione più importante.

Ogni linea è indipendente? Il calcolo dipende da una riga che precede la successiva? Devono essere elaborati in blocchi? Quanto tempo richiede l'elaborazione per ciascuna linea? Esiste una fase di elaborazione che deve incorporare " all " i dati alla fine? O è possibile eliminare i risultati intermedi e mantenere un totale parziale? Il file può essere inizialmente diviso dividendo la dimensione del file per numero di thread? O cresce mentre lo elabori?

Se le linee sono indipendenti e il file non cresce, l'unica coordinazione che ti serve è estrarre " indirizzi iniziali " e "lunghezze" a ciascuno dei lavoratori; possono aprirsi e cercare in modo indipendente nel file e quindi devi semplicemente coordinare i loro risultati; forse aspettando che i risultati N tornino in coda.

Se le linee non sono indipendenti, la risposta dipenderà fortemente dalla struttura del file.

So che hai chiesto specificamente di Python, ma ti incoraggio a guardare Hadoop ( http: // hadoop. apache.org/ ): implementa l'algoritmo Map and Reduce che è stato specificamente progettato per affrontare questo tipo di problema.

Buona fortuna

Dipende molto dal formato del tuo file.

Ha senso dividerlo ovunque? O hai bisogno di dividerlo su una nuova linea? Oppure devi assicurarti di dividerlo alla fine della definizione di un oggetto?

Invece di dividere il file, dovresti usare più lettori sullo stesso file, usando os.lseek per saltare alla parte appropriata del file.

Aggiornamento: Poster aggiunto che vuole dividere su nuove linee. Quindi propongo quanto segue:

Supponiamo che tu abbia 4 processi. Quindi la soluzione semplice è os.lseek allo 0%, 25%, 50% e 75% del file e leggere i byte fino a quando non si raggiunge la prima nuova riga. Questo è il tuo punto di partenza per ogni processo. Non è necessario dividere il file per fare questo, basta cercare nella posizione corretta nel file di grandi dimensioni in ogni processo e iniziare a leggere da lì.

Il di Fredrik Lundh Alcune note sul benchmark Wide Finder di Tim Bray è una lettura interessante, su un caso d'uso molto simile, con molti buoni consigli. Anche altri autori hanno implementato la stessa cosa, alcuni sono collegati dall'articolo, ma potresti voler provare a cercare su Google "python wide finder" o qualcosa per trovarne ancora. (c'era anche una soluzione da qualche parte basata sul modulo multiprocessing , ma non sembra più disponibile)

Se il tempo di esecuzione è lungo, invece di far leggere a ogni processo la riga successiva attraverso un Queue , i processi leggono i batch di righe. In questo modo l'overhead viene ammortizzato su più righe (ad es. Migliaia o più).

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top