ディストリビューターとの NServiceBus パイプライン
-
26-09-2019 - |
質問
NServiceBus を使用して処理パイプラインを構築していますが、プロセスの各ステップをスケーラブルにするためのディストリビューターの構成で問題が発生しています。ここにいくつかの情報があります:
- パイプラインには、WorkItem に対して「OK、開始します」と通知するマスター プロセスがあり、フローチャートのようなプロセスが開始されます。
- フローチャートの各ステップは計算コストがかかる可能性があるため、各ステップをスケールアウトする機能が必要です。これは、各ステップにディストリビューターが必要であることを示しています。
- 後から追加のアクティビティをイベントにフックできるようにしたいと考えています。これは、完了時にメッセージを Send() するのではなく、Publish() する必要があることを示しています。
- プロセスは条件に基づいて分岐する必要がある場合があります。これは、プロセスが複数の種類のメッセージを発行できる必要があることを示しています。
- プロセスはフォークに参加する必要がある場合があります。これにはSagasを使用する必要があると思います。
これらの仮定が正しいといいのですが、そうでない場合は、思ったよりも問題が発生します。
わかりやすくするために、フォークや結合のことは忘れて、ステップ A の後にステップ B が続き、ステップ C で終わる単純なパイプラインを考えてみましょう。各ステップには独自のディストリビュータがあり、多くのノードでメッセージを処理できます。
- NodeA ワーカーには IHandleMessages プロセッサが含まれており、EventA を発行します
- NodeB ワーカーには IHandleMessages プロセッサが含まれており、イベント B を発行します
- NodeC ワーカーには IHandleMessages プロセッサが含まれており、パイプラインが完成します。
構成ファイルの関連部分は次のとおりです。# はワーカーの番号を示します (つまり、入力キュー NodeA.1 と NodeA.2 があります)。
NodeA:
<MsmqTransportConfig InputQueue="NodeA.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeA.Distrib.Control" DistributorDataAddress="NodeA.Distrib.Data" >
<MessageEndpointMappings>
</MessageEndpointMappings>
</UnicastBusConfig>
NodeB:
<MsmqTransportConfig InputQueue="NodeB.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeB.Distrib.Control" DistributorDataAddress="NodeB.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventA, Messages" Endpoint="NodeA.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
NodeC:
<MsmqTransportConfig InputQueue="NodeC.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeC.Distrib.Control" DistributorDataAddress="NodeC.Distrib.Data" >
<MessageEndpointMappings>
<add Messages="Messages.EventB, Messages" Endpoint="NodeB.Distrib.Data" />
</MessageEndpointMappings>
</UnicastBusConfig>
ディストリビュータ設定の関連部分は次のとおりです。
Distributor A:
<add key="DataInputQueue" value="NodeA.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeA.Distrib.Control"/>
<add key="StorageQueue" value="NodeA.Distrib.Storage"/>
Distributor B:
<add key="DataInputQueue" value="NodeB.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeB.Distrib.Control"/>
<add key="StorageQueue" value="NodeB.Distrib.Storage"/>
Distributor C:
<add key="DataInputQueue" value="NodeC.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeC.Distrib.Control"/>
<add key="StorageQueue" value="NodeC.Distrib.Storage"/>
各ノードの 2 つのインスタンスを使用してテストしていますが、ノード B の中間で問題が発生しているようです。基本的に次の 2 つのことが起こります。
- ノード B の両方のインスタンスは、ノード B が EventA をサブスクライブしていることと、NodeC.Distrib.Data@MYCOMPUTER がノード B が発行する EventB をサブスクライブしていることを報告します。この場合、すべてがうまく機能します。
- ノード B の両方のインスタンスは、EventA をサブスクライブしていることを報告します。 ただし、1 人のワーカーは NodeC.Distrib.Data@MYCOMPUTER が 2 回サブスクライブしていると述べていますが、もう 1 人のワーカーはそのことに言及していません。
2 番目のケースでは、ディストリビュータがサブスクリプション メッセージをルーティングする方法によってのみ制御されているように見えますが、「オーバーアチーバー」ノードが EventA を処理する場合は、すべて問題ありません。「成績不振者」が EventA を処理すると、EventB のパブリッシュにはサブスクライバーがなく、ワークフローが停止します。
それで、私の質問は次のとおりです。
- このような設定は可能でしょうか?
- 設定は正しいですか?単純な 1 レベルのパブリッシャー/2 ワーカーのセットアップを超えるディストリビューターの構成例を見つけるのは困難です。
- 計算集約的ではないトラフィック監視操作をすべて実行し、タスクが長時間実行され負荷分散する必要がある場合にのみ、ディストリビューターの背後にあるプロセスにメッセージを送信する 1 つの中央ブローカー プロセスを用意する方が合理的でしょうか?
- そうすれば、負荷分散されたノードは単に中央ブローカーに応答するだけで済み、そのほうが簡単に思えます。
- 一方で、それは NServiceBus の強みである分散化とは相容れないように思えます。
- そして、これが答えであり、長時間実行プロセスの完了イベントが応答である場合、公開されたイベントに対する後の拡張性を可能にする公開をどのように維持するのでしょうか?
解決
あなたが抱えている問題は、ノードがお互いのサブスクライバーのリストを認識できないことです。この問題が発生する理由は、スケールアウトをサポートしていないものの、単一マシンの開発が非常に生産的になるデフォルトの NServiceBus プロファイル (lite) で運用シナリオ (スケールアウト) を試行しているためです。
この問題を解決するには、このページで説明されているように、運用プロファイルを使用して NServiceBus ホストを実行します。
http://docs.particular.net/nservicebus/hosting/nservicebus-host/profiles
これにより、異なるノードが同じサブスクライバーのリストを共有できるようになります。
それ以外は、設定は正しく行われています。