平行的。Foreach产卵的方式太多线程
-
18-09-2019 - |
题
的问题
虽然代码对此我将在这里谈我写的,它根据的。净4框架内,不具体取决于任何特殊性的F#(至少看来是如此!).
我有一些数据对我的盘,我应该更新的网络,节约的最新版本的磁盘:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
问题是, loadAndSaveAndUpdate
我所有的数据,我会执行的功能 很多 时间:
{1 .. 5000} |> loadAndSaveAndUpdate
每一步会怎么做
- 一些盘IO,
- 一些数据交钱一手交货,
- 一些网络IO(与可能性的许多延迟的),
- 更多的数据处理,
- 和一些盘IO.
不是很高兴有这个并行完成的,在一定程度?不幸的是,没有我的阅读和分析的职能是"异步的工作流程-准备"。
第(不非常好)的解决方案,我来了
任务
第一件事我所做的是设立一个 Task[]
并启动它们所有的:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
然后我打CTRL+ESC只要看看有多少线,它被使用。15, 17, ..., 35, ..., 170, ...直到杀害。东西是错误的。
平行的
我做了几乎相同的事情,但使用 Parallel.ForEach(...)
结果是相同的:很多很多的螺纹。
一个解决方案...种
然后我决定开始只 n
螺纹, Task.WaitAll(of them)
, 然后其它的 n
, 直到有没有更多的任务。
这个工作的,但问题是,当它已经处理完,说, n-1
任务时,它将等待,等待,等待该死的最后一项任务,坚持阻止由于许多网络的延迟。这不是好的!
所以, 你会如何攻击这个问题?我会更好查不同的解决方案,涉及无论是异步的工作流程(在这种情况下如何适应我非异功能),并行扩展,奇怪的并行模式,等等。
谢谢。
解决方案
你确定你的个人任务的完成以及时的方式?我认为,这两个 Parallel.ForEach
和 Task
类已经使用。净线程池.任务通常应该短暂的工作项目,在这种情况下线程池只会产生一个小型的实际人数线,但是如果你的任务没有取得进展,并有其他任务排队然后数线使用将稳步增长达到最大(它是默认 250/处理器 中。网2.0SP1,但是不同的不同版本的框架)。它还值得注意的是(至少在。网2.0SP1)新的线建立受到限制于2个新的螺纹每第二,因此获得了数线你看到表明该任务不完成,在一个短时间内(所以这可能不是完全准确的脚怪 Parallel.ForEach
).
我认为,布莱恩的建议使用 async
工作流程是一个很好,特别是如果来源的长期任务是IO,因为 async
将回到你的纹的线程池直到IO完成。另一个选择是只是接受你的任务不完成快速和允许产生的许多线程(可在某种程度上控制通过使用 System.Threading.ThreadPool.SetMaxThreads
)-根据你的情况可能不是一个大问题,你使用了很多线程。
其他提示
ParallelOptions.MaxDegreeOfParallelism 限制的数目并行运营的平行的方法叫
使用异步,就将使你能够做的I/O-开工作,没有燃烧线,同时各种I/O的电话是'在海上的',所以这将是我的第一个建议。它应该是直截了当的转换码异步的,通常沿线的
- 包裹的每个功能的身体
async{...}
, 添加return
在必要 - 创建异步版本的任何I/O元,是不是已经在图书馆通过
Async.FromBeginEnd
- 交换电话的形式
let r = Foo()
要let! r = AsyncFoo()
- 使用
Async.Parallel
把5000异物进入一个单一的异步运行的并行
有各种不同的教程,这样做;一个这样的网络广播 在这里,.
你总是可以使用 ThreadPool
.
http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx
基本上:
- 创建一个线程池
- 设定最大数量的线
- 排队的所有任务使用
QueueUserWorkItem(WaitCallback)