プロデューサーの消費者キューは処分しません
-
28-09-2019 - |
質問
プロデューサーの消費者キューを構築しました。.NET4.0の同時レッツを作成しました。
public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
private bool _IsActive=true;
public int Count
{
get
{
return this._workerQueue.Count;
}
}
public bool IsActive
{
get { return _IsActive; }
set { _IsActive = value; }
}
public event Dequeued<T> OnDequeued = delegate { };
public event LoggedHandler OnLogged = delegate { };
private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();
private object _locker = new object();
Thread[] _workers;
#region IDisposable Members
int _workerCount=0;
ManualResetEventSlim _mres = new ManualResetEventSlim();
public void Dispose()
{
_IsActive = false;
_mres.Set();
LogWriter.Write("55555555555");
for (int i = 0; i < _workerCount; i++)
// Wait for the consumer's thread to finish.
{
_workers[i].Join();
}
LogWriter.Write("6666666666");
// Release any OS resources.
}
public ProducerConsumerQueue(int workerCount)
{
try
{
_workerCount = workerCount;
_workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(_workers[i] = new Thread(Work)).Start();
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
}
}
#endregion
#region IProducerConsumerQueue<T> Members
public void EnqueueTask(T task)
{
if (_IsActive)
{
_workerQueue.Enqueue(task);
//Monitor.Pulse(_locker);
_mres.Set();
}
}
public void Work()
{
while (_IsActive)
{
try
{
T item = Dequeue();
if (item != null)
OnDequeued(item);
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
}
}
}
#endregion
private T Dequeue()
{
try
{
T dequeueItem;
//if (_workerQueue.Count > 0)
//{
_workerQueue.TryDequeue(out dequeueItem);
if (dequeueItem != null)
return dequeueItem;
//}
if (_IsActive)
{
_mres.Wait();
_mres.Reset();
}
//_workerQueue.TryDequeue(out dequeueItem);
return dequeueItem;
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
T dequeueItem;
//if (_workerQueue.Count > 0)
//{
_workerQueue.TryDequeue(out dequeueItem);
return dequeueItem;
}
}
public void Clear()
{
_workerQueue = new ConcurrentQueue<T>();
}
}
}
処分すると、それが結合でブロックされることがあり(1つのスレッドが消費されます)、処分方法が詰まっています。リセットイベントの待ち時間で立ち往生していると思いますが、そのためには、廃棄のセットを呼び出します。助言がありますか?
解決
アップデート: :私は内部的にキューを必要とすることについてのあなたの主張を理解しています。使用するための私の提案 BlockingCollection<T>
コードには、ブロッキング動作を提供するための多くのロジックが含まれているという事実に基づいています。そのような論理を自分で書くことは非常にバグになりやすいです(私はこれを経験から知っています)。したがって、少なくとも実行するフレームワーク内に既存のクラスがある場合 いくつかの あなたのための仕事のうち、それに伴うことが一般的に望ましいです。
このクラスを使用してこのクラスを実装する方法の完全な例 BlockingCollection<T>
この答えに含めるには少し大きすぎるので、私は仕事を投稿しました Pastebin.comの例;気軽に見て、あなたの考えを見てください。
また、上記の例を示す例プログラムも書きました ここ.
私のコードは正しいですか?私はイエスとは言いません それも 多くの自信。結局のところ、私はユニットテストを書いたり、診断を実行したりしていません。 BlockingCollection<T>
それ以外の ConcurrentQueue<T>
(私の意見では)あなたの論理の多くをクリーンアップし、メインに集中しやすくします 目的 クラスの(キューからアイテムを消費し、購読者に通知する)の幾分難しい側面ではなく、 実装 (内部キューのブロッキング動作)。
コメントで提起された質問:
あなたが使用していない理由
BlockingCollection<T>
?
あなたの答え:
...]キューが必要でした。
から のデフォルトコンストラクターに関するMSDNドキュメント BlockingCollection<T>
クラス:
デフォルトの基礎となるコレクションはaです
ConcurrentQueue<T>
.
場合 それだけ 使用する代わりに自分のクラスを実装することを選択した理由 BlockingCollection<T>
FIFOキューが必要だということです。それから...あなたはあなたの決定を再考したいかもしれません。 a BlockingCollection<T>
デフォルトのパラメーターレスコンストラクターを使用してインスタンス化されました は FIFOキュー。
とはいえ、投稿したコードの包括的な分析を提供できるとは思わないが、少なくともいくつかのポインターを提供できる:
- 私は、このようなトリッキーなマルチスレッドの動作を扱うクラスのために、あなたがここにいるようにイベントを使用することを非常にためらいます。呼び出しコードは、必要なイベントハンドラーを添付できます。これらは、例外(キャッチしない)、長期間ブロック、またはおそらくコントロールの外側に完全にブロックすることさえできます。ブロッキングキューの場合。
- あなたには人種状態があります
Dequeue
とDispose
方法。
あなたのこれらの線を見てください Dequeue
方法:
if (_IsActive) // point A
{
_mres.Wait(); // point C
_mres.Reset(); // point D
}
そして今、これらの2行を見てください Dispose
:
_IsActive = false;
_mres.Set(); // point B
3つのスレッドがあるとしましょう、t1, 、t2, 、およびt3. 。 t1 そしてt2 どちらもポイントです a, 、各チェック _IsActive
そして見つけます true
. 。それで Dispose
呼ばれ、t3 セット _IsActive
に false
(しかしt1 そしてt2 すでにポイントを通過しています a)そしてポイントに到達します b, 、それが呼び出す場所 _mres.Set()
. 。次にt1 ポイントになります c, 、ポイントに移動します d, 、および電話 _mres.Reset()
. 。今t2 ポイントに到達します c それ以来、永遠に立ち往生します _mres.Set
再び呼び出されません(すべてのスレッドが実行されます Enqueue
見つけます _IsActive == false
すぐに戻り、スレッドが実行されます Dispose
すでにポイントを通過しています b).
私はこのレースの状態を解決するためにいくつかの助けを提供してくれてうれしいですが、私はそれを懐疑的です BlockingCollection<T>
実際、これに必要なクラスではありません。これが事実ではないことを私に納得させるためにいくつかの情報を提供できるなら、私はもう一度見てみることができます。
他のヒント
以来 _IsActive
としてマークされていません volatile
そして、いません lock
すべてのアクセスの周りでは、各コアにはこの値に対して個別のキャッシュがあり、そのキャッシュが更新されない可能性があります。だからマーキング _IsActive
偽りに Dispose
実際には、すべての実行スレッドに影響しません。
http://igoro.com/archive/volatile-keyword-in-cmemory-model-explained/
private volatile bool _IsActive=true;