質問

問題

ここで説明するコードは F# で作成したものですが、.NET 4 フレームワークに基づいており、特に F# の特殊性に依存しているわけではありません (少なくともそう思われます!)。

ディスク上にいくつかのデータがあり、ネットワークから更新して、最新バージョンをディスクに保存する必要があります。

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

問題は、 loadAndSaveAndUpdate すべてのデータを使用するには、関数を実行する必要があります 多くの 回:

{1 .. 5000} |> loadAndSaveAndUpdate

各ステップで行うこと

  • いくつかのディスク IO、
  • 一部のデータ処理、
  • いくつかのネットワーク IO (大幅な遅延が発生する可能性あり)、
  • さらなるデータ処理、
  • そしていくつかのディスクIO。

これをある程度並行してやればいいのではないか。残念ながら、私の読み取りおよび解析関数はどれも「非同期ワークフロー対応」ではありません。

私が思いついた最初の(あまり良くない)解決策

タスク

私が最初にしたことは、 Task[] そしてそれらをすべて開始します:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

次に、CTRL+ESC を押して、使用されているスレッドの数を確認しました。15、17、...、35、...、170、...アプリケーションを終了するまで!何かが間違っていました。

平行

私はほぼ同じことをしましたが、使用しました Parallel.ForEach(...) 結果は同じでした:たくさんのたくさんのスレッド。

効果的な解決策...種の

それから私はただ始めることにしました n スレッド、 Task.WaitAll(of them), 、それからその他 n, 利用可能なタスクがなくなるまで。

これは機能しますが、問題は、処理が完了したときに次のようになることです。 n-1 タスクの場合、ネットワーク遅延が大きいためにブロックを要求する最後のタスクを待ち、待ち、待ち続けます。これは良くない!

それで、 この問題をどのように攻撃しますか?非同期ワークフロー (そしてこの場合は非同期関数を適応させる方法)、並列拡張機能、奇妙な並列パターンなどを含む、さまざまなソリューションをご覧いただければ幸いです。

ありがとう。

役に立ちましたか?

解決

あなたはあなたの個々のタスクをタイムリーに完了していることを確認していますか?私はParallel.ForEachTaskクラスの両方がすでに.NETのスレッドプールを使用することを考えています。使用されるスレッドの数は着実にアップに増加しますその後、キューに入れられたタスクは、一般的にスレッドプールだけで、実際のスレッドの数が少ないを起動します。その場合には短命作業項目、でなければなりませんが、あなたのタスクは進展しておらず、他のタスクが存在する場合デフォルトで 250 /プロセッサで最大値( .NET 2.0 SP1が、フレームワークの異なるバージョン)の下で異なっています。これは、新しいスレッドの作成は、毎秒2つの新しいスレッドに絞られる(少なくとも、.NET 2.0 SP1で)ので、あなたが見ているスレッドの数に起床するタスクが短い量で完成されていないことを示していることも注目に値します時間(Parallel.ForEachに責任をピンに完全に正確ではないかもしれないので)。

私はasyncワークフローを使用するには、ブライアンの提案は、IOが完了するまでasyncは、スレッドプールにあなたのスレッドを返しますので、長命のタスクのソースは、IOである場合は特に、良いものだと思います。別のオプションは、単にあなたのタスクはすぐに完了していないことを受け入れると(System.Threading.ThreadPool.SetMaxThreadsを使用することによってある程度制御することができます)多くのスレッドの産卵を可能にすることです - あなたの状況に応じて、それはあなたが使用している大したことないかもしれません多数のスレッドます。

他のヒント

ParallelOptions.MaxDegreeOfParallelism制限するパラレルメソッド呼び出しによって実行同時操作の数

「非同期」を使用すると、さまざまな I/O 呼び出しが「海上」にある間に、スレッドを燃やすことなく I/O バウンドの作業を実行できるため、それが私の最初の提案になります。コードを非同期に変換するのは、通常は次のような簡単なはずです。

  • 各関数本体をラップする async{...}, 、 追加 return 必要に応じて
  • ライブラリにまだ存在しない I/O プリミティブの非同期バージョンを作成します。 Async.FromBeginEnd
  • フォームの呼び出しを切り替える let r = Foo()let! r = AsyncFoo()
  • 使用 Async.Parallel 5000 個の非同期オブジェクトを、並列実行される単一の非同期に変換します。

これを行うためのさまざまなチュートリアルがあります。そのようなウェブキャストの 1 つは、 ここ.

いつでも使用できます ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

基本的に:

  1. スレッドプールを作成する
  2. スレッドの最大数を設定する
  3. すべてのタスクをキューに入れる QueueUserWorkItem(WaitCallback)
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top