Parallel.ForEach modo la deposizione delle uova troppi thread
-
18-09-2019 - |
Domanda
Il problema
Anche se il codice di cui parlerò qui ho scritto in F #, si basa sul framework .NET 4, non specificamente a seconda qualsiasi particolarità di F # (almeno sembra così!).
Ho alcuni pezzi di dati sul mio disco che devo aggiornare dalla rete, salvando la versione più recente per il disco:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
Il problema è che a loadAndSaveAndUpdate
tutti i miei dati, avrei dovuto eseguire la funzione molti volte:
{1 .. 5000} |> loadAndSaveAndUpdate
Ogni passo farebbe
- un po 'IO disco,
- qualche scricchiolio di dati,
- qualche rete IO (con possibilità di un sacco di latenza),
- più dati scricchiolio,
- e un po 'IO disco.
Non sarebbe bello avere questo fatto in parallelo, in una certa misura? Purtroppo, nessuno dei miei funzioni di lettura e di analisi sono "asincrone-workflow-ready".
I primi (non molto buono) le soluzioni sono arrivato fino a
Attività
La prima cosa che ho fatto è stato quello di impostare un Task[]
e li iniziare:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
Poi mi ha colpito CTRL + ESC solo per vedere il numero di thread che stava usando. 15, 17, ..., 35, ..., 170, ... fino a quando ha ucciso l'applicazione! Qualcosa stava andando male.
Parallela
Ho fatto quasi la stessa cosa, ma utilizzando Parallel.ForEach(...)
ei risultati sono stati gli stessi:. Un sacco e un sacco di discussioni
Una soluzione che funziona ... tipo di
Poi ho deciso di iniziare solo discussioni n
, Task.WaitAll(of them)
, poi altri n
, fino a quando non c'erano più attività disponibili.
Questo funziona, ma il problema è che quando si ha terminato l'elaborazione, diciamo, compiti n-1
, si aspetta, aspetta, aspetta per il maledetto ultima attività che insistono sul blocco a causa di un sacco di latenza di rete. Questo non è buono!
Quindi, come è possibile attaccare questo problema ? Apprezzerei per visualizzare diverse soluzioni, coinvolgendo sia asincrone flussi di lavoro (e in questo caso come adattare le mie funzioni non asincrone), estensioni parallele, modelli paralleli strani, ecc.
Grazie.
Soluzione
Sei sicuro che le attività individuali stanno completando in modo tempestivo? Credo che sia Parallel.ForEach
e la classe Task
già utilizzano il ThreadPool .NET. Compiti dovrebbero generalmente essere gli elementi di lavoro di breve durata, nel qual caso il pool di thread sarà solo deporre le uova un piccolo numero di thread reali, ma se i vostri compiti non stanno facendo progressi e ci sono altri compiti in coda allora il numero di thread utilizzati aumenterà continuamente fino a il massimo (che di default è 250 / processore in .NET 2.0 SP1, ma è diverso in diverse versioni del quadro). E 'anche interessante notare che (almeno in .NET 2.0 SP1) nuova creazione filo viene strozzata a 2 nuove discussioni al secondo, in modo da ottenere fino al numero di thread che stai vedendo indica che i compiti non stanno completando in un breve lasso di tempo (in modo che non può essere completamente accurato per la colpa su Parallel.ForEach
).
Credo che il suggerimento di Brian di utilizzare flussi di lavoro async
è buona, soprattutto se la fonte dei compiti longevi è IO, dal momento che async
tornerà tuoi thread per il ThreadPool fino a quando il IO completa. Un'altra opzione è quella di accettare semplicemente che le attività non stanno completando rapidamente e consentire la deposizione delle uova di molti fili (che può essere controllato in una certa misura utilizzando System.Threading.ThreadPool.SetMaxThreads
) - a seconda della situazione che non può essere un grosso problema che si sta utilizzando un sacco di fili.
Altri suggerimenti
ParallelOptions.MaxDegreeOfParallelism limiti il numero di operazioni simultanee gestito da chiamate di metodo parallele
Usando 'asincrone di vi permetterà di fare l'I / O-bound lavoro senza fili bruciore durante le varie chiamate di I / O sono 'in mare', in modo che sarebbe il mio primo suggerimento. Dovrebbe essere semplice per convertire il codice per ASYNC, di solito sulla falsariga di
- avvolgere ogni corpo della funzione in
async{...}
, aggiungerereturn
se necessario - creare versioni asincrone di qualsiasi primitive di I / O che non sono già nella libreria tramite
Async.FromBeginEnd
- chiamate interruttore della forma
let r = Foo()
alet! r = AsyncFoo()
- Usa
Async.Parallel
per convertire gli oggetti asincroni 5000 in un unico asincrono che corre in parallelo
Ci sono vari tutorial per fare questo; Uno di questi webcast è qui .
Si può sempre utilizzare un ThreadPool
.
http://msdn.microsoft.com/en -us / library / system.threading.threadpool.aspx
in fondo:
- Crea un pool di thread
- Imposta il numero massimo di thread
- coda tutte le attività utilizzando
QueueUserWorkItem(WaitCallback)