我正在使用 NServiceBus 构建处理管道,但为了使流程中的每个步骤都可扩展,我在配置分发器时遇到了麻烦。这是一些信息:

  • 管道将有一个主进程,它对 WorkItem 说“好的,该开始了”,然后它将启动一个类似于流程图的进程。
  • 流程图中的每个步骤的计算成本可能很高,因此我希望能够横向扩展每个步骤。这告诉我每一步都需要一个Distributor。
  • 我希望稍后能够将其他活动挂接到事件上。这告诉我,完成后我需要 Publish() 消息,而不是 Send() 消息。
  • 流程可能需要根据条件进行分支。这告诉我一个进程必须能够发布多种类型的消息。
  • 一个进程可能需要加入分叉。我想我应该为此使用 Sagas。

希望这些假设是好的,否则我的麻烦比我想象的要多。

为了简单起见,让我们忘记分叉或加入,并考虑一个简单的管道,其中步骤 A 后面是步骤 B,最后是步骤 C。每个步骤都有自己的分配器,并且可以有许多节点处理消息。

  • NodeA的workers包含一个IHandleMessages处理器,并发布EventA
  • NodeB的workers包含一个IHandleMessages处理器,并发布Event B
  • NodeC的workers包含一个IHandleMessages处理器,然后管道就完成了。

以下是配置文件的相关部分,其中#表示工作人员的数量(即有输入队列 NodeA.1 和 NodeA.2):

NodeA:
<MsmqTransportConfig InputQueue="NodeA.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeA.Distrib.Control" DistributorDataAddress="NodeA.Distrib.Data" >
    <MessageEndpointMappings>
    </MessageEndpointMappings>
</UnicastBusConfig>

NodeB:
<MsmqTransportConfig InputQueue="NodeB.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeB.Distrib.Control" DistributorDataAddress="NodeB.Distrib.Data" >
    <MessageEndpointMappings>
        <add Messages="Messages.EventA, Messages" Endpoint="NodeA.Distrib.Data" />
    </MessageEndpointMappings>
</UnicastBusConfig>

NodeC:
<MsmqTransportConfig InputQueue="NodeC.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeC.Distrib.Control" DistributorDataAddress="NodeC.Distrib.Data" >
    <MessageEndpointMappings>
        <add Messages="Messages.EventB, Messages" Endpoint="NodeB.Distrib.Data" />
    </MessageEndpointMappings>
</UnicastBusConfig>

以下是分发器配置的相关部分:

Distributor A:
<add key="DataInputQueue" value="NodeA.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeA.Distrib.Control"/>
<add key="StorageQueue" value="NodeA.Distrib.Storage"/>

Distributor B:
<add key="DataInputQueue" value="NodeB.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeB.Distrib.Control"/>
<add key="StorageQueue" value="NodeB.Distrib.Storage"/>

Distributor C:
<add key="DataInputQueue" value="NodeC.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeC.Distrib.Control"/>
<add key="StorageQueue" value="NodeC.Distrib.Storage"/>

我正在使用每个节点的 2 个实例进行测试,问题似乎出现在节点 B 的中间位置。基本上有两种情况可能发生:

  1. 节点 B 的两个实例都报告它正在订阅 EventA,并且 NodeC.Distrib.Data@MYCOMPUTER 正在订阅节点 B 发布的 EventB。在这种情况下,一切都很好。
  2. 节点 B 的两个实例都报告它正在订阅事件 A, 然而,一名工作人员说 NodeC.Distrib.Data@MYCOMPUTER 订阅了两次,而另一名工作人员则没有提及。

在第二种情况下,这似乎仅由分发者路由订阅消息的方式控制,如果“优秀”节点处理 EventA,则一切都很好。如果“成绩不佳者”处理 EventA,则 EventB 的发布没有订阅者,工作流程就会终止。

所以,我的问题是:

  1. 这种设置可以吗?
  2. 配置是否正确?除了简单的一级发布者/2 个工作人员设置之外,很难找到任何分销商配置示例。
  3. 使用一个中央代理进程来执行所有非计算密集型流量警察操作,并且仅在任务长时间运行且必须进行负载平衡时才将消息发送到分配器后面的进程是否更有意义?
    • 然后负载平衡的节点可以简单地回复中央代理,这看起来更容易。
    • 另一方面,这似乎与 NServiceBus 的去中心化优势相矛盾。
    • 如果这就是答案,并且长时间运行的流程的完成事件是回复,那么如何保留发布,以实现发布事件的以后可扩展性?
有帮助吗?

解决方案

您遇到的问题是您的节点看不到彼此的订阅者列表。您遇到该问题的原因是您在默认 NServiceBus 配置文件(lite)下尝试生产场景(横向扩展),该配置文件不支持横向扩展,但使单机开发非常高效。

要解决该问题,请使用生产配置文件运行 NServiceBus 主机,如下页所述:

http://docs.pspecial.net/nservicebus/hosting/nservicebus-host/profiles

这将使不同的节点共享相同的订阅者列表。

除此之外,你的配置是正确的。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top