Question

Le problème

Bien que le code dont je vais parler ici je l'ai écrit en F #, il est basé sur le framework .NET 4, ne dépendant pas spécifiquement sur une particularité de F # (au moins il semble donc!).

J'ai quelques morceaux de données sur mon disque que je mettre à jour à partir du réseau, sauvegarde de la dernière version sur le disque:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Le problème est que pour loadAndSaveAndUpdate toutes mes données, je dois exécuter la fonction nombre fois:

{1 .. 5000} |> loadAndSaveAndUpdate

Chaque étape ferait

  • un certain disque IO,
  • certains crissement de données,
  • un certain réseau IO (avec possibilité de beaucoup de temps de latence),
  • plus de données crissement,
  • et certains disque IO.

Ne serait-il pas agréable d'avoir ce fait en parallèle, dans une certaine mesure? Malheureusement, aucun de mes fonctions de lecture et d'analyse syntaxique sont « async-workflows prêts ».

Les premiers (pas très bon) solutions je suis venu avec

Tâches

La première chose que je l'ai fait était de mettre en place un Task[] et les recommencer:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Ensuite, j'appuyez sur CTRL + ESC juste pour voir le nombre de threads qu'il utilisait. 15, 17, ..., 35, ..., 170, ... jusqu'à ce que l'application tué! Quelque chose allait mal.

parallèle

Je l'ai fait presque la même chose, mais en utilisant Parallel.ForEach(...) et les résultats sont les mêmes: beaucoup. Et beaucoup, beaucoup de discussions

Une solution qui fonctionne ... sorte de

Alors j'ai décidé de commencer les discussions que de n, Task.WaitAll(of them), puis d'autres n, jusqu'à ce qu'il n'y avait plus de tâches disponibles.

Cela fonctionne, mais le problème est que quand il a terminé le traitement, disons, les tâches de n-1, il attendra, attendez, attendez que la fichue dernière tâche qui insistent sur le blocage en raison de beaucoup de latence du réseau. Ce n'est pas bon!

comment ce problème serait vous attaquer ? Je vous en serais reconnaissant de voir différentes solutions, impliquant soit Async Workflows (et dans ce cas comment adapter mes fonctions non async), extensions parallèles, des motifs parallèles étranges, etc.

Merci.

Était-ce utile?

La solution

Êtes-vous sûr que vos tâches individuelles sont Achever en temps opportun? Je crois que les deux Parallel.ForEach et la classe Task utilisent déjà la threadpool .NET. Les tâches doivent généralement être des éléments de travail de courte durée, auquel cas le threadpool ne frayer un petit nombre de threads réels, mais si vos tâches ne progressent pas et il y a d'autres tâches en attente, le nombre de fils utilisés augmentera progressivement jusqu'à le maximum (qui par défaut est 250 / processeur dans le SP1 .NET 2.0, mais est différent sous différentes versions du framework). Il est également intéressant de noter que (au moins dans .NET 2.0 SP1) nouvelle création de fil est étranglé à 2 nouveaux fils par seconde, obtenant ainsi au nombre de fils que vous voyez indique que les tâches ne terminent pas dans un court laps de le temps (il ne peut pas être tout à fait exact de rejeter la faute sur Parallel.ForEach).

Je pense que la suggestion de Brian à utiliser les flux de travail de async est bonne, surtout si la source des tâches de longue durée est IO, puisque async retournera vos fils à la threadpool jusqu'à ce que l'IO complète. Une autre option est d'accepter simplement que vos tâches ne terminent pas rapidement et permettre la reproduction de nombreux fils (qui peut être contrôlée dans une certaine mesure à l'aide System.Threading.ThreadPool.SetMaxThreads) - en fonction de votre situation, il ne peut pas être un gros problème que vous utilisez beaucoup de fils.

Autres conseils

ParallelOptions.MaxDegreeOfParallelism limites le nombre d'opérations simultanées géré par des appels de méthode parallèle

En utilisant donc ce serait ma première suggestion « async de vous permettre de faire le travail lié O I / sans fils brûler tandis que les différents appels d'E / S sont « en mer »,. Il devrait être facile de convertir le code à Async, généralement le long des lignes de

  • envelopper chaque corps de la fonction dans async{...}, ajoutez return si nécessaire
  • créer des versions Async de toutes les primitives E / S qui ne sont pas déjà dans la bibliothèque via Async.FromBeginEnd
  • appels de commutation de la forme let r = Foo() à let! r = AsyncFoo()
  • Utilisez Async.Parallel pour convertir les 5000 objets async en un seul Async qui fonctionne en parallèle

Il y a plusieurs tutoriels pour ce faire; une telle diffusion sur le Web est .

Vous pouvez toujours utiliser un ThreadPool.

http://msdn.microsoft.com/en -nous / bibliothèque / system.threading.threadpool.aspx

essentiellement:

  1. Créer un pool de threads
  2. Définir le nombre maximum de fils
  3. file d'attente toutes les tâches à l'aide QueueUserWorkItem(WaitCallback)
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top