Вопрос

Проблема

Хотя код, о котором я здесь расскажу, я написал на F #, он основан на платформе .NET 4 framework, не зависящей конкретно от какой-либо особенности F # (по крайней мере, так кажется!).

У меня есть некоторые фрагменты данных на моем диске, которые я должен обновить из сети, сохранив последнюю версию на диске:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Проблема в том, что для loadAndSaveAndUpdate все мои данные, я должен был бы выполнить функцию многие времена:

{1 .. 5000} |> loadAndSaveAndUpdate

Каждый шаг будет делать

  • какой-нибудь дисковый ввод-вывод,
  • какой-то хруст данных,
  • некоторый сетевой ввод-вывод (с возможностью большой задержки),
  • больше обработки данных,
  • и немного ввода-вывода с диска.

Разве не было бы неплохо, чтобы это делалось параллельно, в какой-то степени?К сожалению, ни одна из моих функций чтения и синтаксического анализа не является "готовой к асинхронным рабочим процессам".

Первые (не очень хорошие) решения, которые я придумал

Задачи

Первое, что я сделал, это создал Task[] и запустите их все:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Затем я нажал CTRL + ESC, просто чтобы посмотреть, сколько потоков он использовал.15, 17, ..., 35, ..., 170, ...пока не убило приложение!Что-то шло не так.

Параллельный

Я сделал почти то же самое, но используя Parallel.ForEach(...) и результаты были те же самые:много-много-много нитей.

Решение, которое работает...вроде как

Тогда я решил начать только n Ветки, Task.WaitAll(of them), затем другие n, пока доступных задач больше не осталось.

Это работает, но проблема в том, что когда он завершит обработку, скажем, n-1 задачи, он будет ждать, ждать, ждать проклятой последней задачи, которая настаивает на блокировке из-за большой задержки в сети.Это нехорошо!

Итак, как бы вы решали эту проблему?Я был бы признателен за просмотр различных решений, включающих либо асинхронные рабочие процессы (и в данном случае, как адаптировать мои неасинхронные функции), параллельные расширения, странные параллельные шаблоны и т.д.

Спасибо.

Это было полезно?

Решение

Уверены ли вы, что ваши индивидуальные задачи выполняются своевременно?Я считаю, что оба Parallel.ForEach и тот Task класс уже использует .СЕТЕВОЙ пул потоков.Задачи, как правило, должны быть недолговечными рабочими элементами, и в этом случае threadpool будет порождать только небольшое количество реальных потоков, но если ваши задачи не выполняются и есть другие задачи в очереди, то количество используемых потоков будет неуклонно увеличиваться до максимального значения (которое по умолчанию равно 250/процессор в .NET 2.0 SP1, но отличается в разных версиях фреймворка).Также стоит отметить, что (по крайней мере, в .NET 2.0 SP1) создание нового потока ограничено до 2 новых потоков в секунду, поэтому увеличение количества потоков, которое вы видите, указывает на то, что задачи не завершаются за короткий промежуток времени (так что, возможно, не совсем точно возлагать вину на Parallel.ForEach).

Я думаю, что предложение Брайана использовать async рабочие процессы - это хороший вариант, особенно если источником долговременных задач является ввод-вывод, поскольку async вернет ваши потоки в threadpool до завершения ввода-вывода.Другой вариант - просто признать, что ваши задачи выполняются медленно, и разрешить создание множества потоков (которые можно в некоторой степени контролировать с помощью System.Threading.ThreadPool.SetMaxThreads) - в зависимости от вашей ситуации может не иметь большого значения, что вы используете много потоков.

Другие советы

Варианты параллелизма.Максимальная степень параллелизма ограничивает количество одновременных операций, выполняемых параллельными вызовами методов

Использование 'async' позволит вам выполнять работу, связанную с вводом-выводом, без записи потоков, пока различные вызовы ввода-вывода находятся "в море", так что это было бы моим первым предложением.Преобразовать код в асинхронный должно быть просто, обычно следующим образом

  • оберните тело каждой функции в async{...}, добавить return там, где это необходимо
  • создайте асинхронные версии любых примитивов ввода-вывода, которых еще нет в библиотеке, с помощью Async.FromBeginEnd
  • Переключать вызовы формы let r = Foo() Для let! r = AsyncFoo()
  • Использование Async.Parallel чтобы преобразовать 5000 асинхронных объектов в один асинхронный, который выполняется параллельно

Существуют различные учебные пособия для этого;одной из таких веб-трансляций является здесь.

Вы всегда могли бы использовать ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

в основном:

  1. Создайте пул потоков
  2. Установите максимальное количество потоков
  3. Поставьте в очередь все задачи, используя QueueUserWorkItem(WaitCallback)
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top