Pergunta

Estou construindo um pipeline de processamento com o NServiceBus, mas estou tendo problemas com a configuração dos distribuidores para tornar escalável cada etapa do processo. Aqui estão algumas informações:

  • O oleoduto terá um processo mestre que diz "OK, hora de começar" para um WorkItem, que iniciará um processo como um fluxograma.
  • Cada etapa do fluxograma pode ser computacionalmente caro, por isso quero a capacidade de ampliar cada etapa. Isso me diz que cada etapa precisa de um distribuidor.
  • Quero poder conectar atividades adicionais aos eventos mais tarde. Isso me diz que eu preciso publicar () mensagens quando for feito, não envia ().
  • Um processo pode precisar ramificar com base em uma condição. Isso me diz que um processo deve ser capaz de publicar mais de um tipo de mensagem.
  • Um processo pode precisar se juntar aos garfos. Eu imagino que devo usar sagas para isso.

Espero que essas suposições sejam boas, caso contrário, estou com mais problemas do que pensava.

Por uma questão de simplicidade, vamos esquecer de marcar ou ingressar e considerar um pipeline simples, com a etapa A seguida pela etapa B, e terminando com a etapa C. Cada etapa recebe seu próprio distribuidor e pode ter muitos nós de processamento de mensagens.

  • Os trabalhadores de nodea contêm um processador iHandleMessages e publicam Eventa
  • Os trabalhadores do NodeB contêm um processador iHandleMessages e publicam Evento B
  • Os trabalhadores do NODEC contêm um processador iHandleMessages e, em seguida, o oleoduto está completo.

Aqui estão as partes relevantes dos arquivos de configuração, onde # indica o número do trabalhador (ou seja, há entradas que as filas nodea.1 e nodea.2):

NodeA:
<MsmqTransportConfig InputQueue="NodeA.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeA.Distrib.Control" DistributorDataAddress="NodeA.Distrib.Data" >
    <MessageEndpointMappings>
    </MessageEndpointMappings>
</UnicastBusConfig>

NodeB:
<MsmqTransportConfig InputQueue="NodeB.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeB.Distrib.Control" DistributorDataAddress="NodeB.Distrib.Data" >
    <MessageEndpointMappings>
        <add Messages="Messages.EventA, Messages" Endpoint="NodeA.Distrib.Data" />
    </MessageEndpointMappings>
</UnicastBusConfig>

NodeC:
<MsmqTransportConfig InputQueue="NodeC.#" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5" />
<UnicastBusConfig DistributorControlAddress="NodeC.Distrib.Control" DistributorDataAddress="NodeC.Distrib.Data" >
    <MessageEndpointMappings>
        <add Messages="Messages.EventB, Messages" Endpoint="NodeB.Distrib.Data" />
    </MessageEndpointMappings>
</UnicastBusConfig>

E aqui estão as partes relevantes das configurações do distribuidor:

Distributor A:
<add key="DataInputQueue" value="NodeA.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeA.Distrib.Control"/>
<add key="StorageQueue" value="NodeA.Distrib.Storage"/>

Distributor B:
<add key="DataInputQueue" value="NodeB.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeB.Distrib.Control"/>
<add key="StorageQueue" value="NodeB.Distrib.Storage"/>

Distributor C:
<add key="DataInputQueue" value="NodeC.Distrib.Data"/>
<add key="ControlInputQueue" value="NodeC.Distrib.Control"/>
<add key="StorageQueue" value="NodeC.Distrib.Storage"/>

Estou testando usando duas instâncias de cada nó, e o problema parece surgir no meio do nó B. Existem basicamente duas coisas que podem acontecer:

  1. Ambas as instâncias do nó B relatam que está assinando a Eventa, e também que Nodec.distrib.data@mycomputer está assinando o EventB que o Node B publica. Nesse caso, tudo funciona muito bem.
  2. Ambas as instâncias do nó B relatam que está assinando Eventa, No entanto, um trabalhador diz que Nodec.distrib.data@mycomputer está assinando duas vezes, enquanto o outro trabalhador não o menciona.

No segundo caso, que parece ser controlado apenas pela maneira como o distribuidor rotula as mensagens de assinatura, se o nó "Overachiever" processar um evento, tudo estiver bem. Se o "Underachiever" processar eventos, a publicação do EventB não terá assinantes e o fluxo de trabalho morrer.

Então, minhas perguntas:

  1. Esse tipo de configuração é possível?
  2. A configuração está correta? É difícil encontrar exemplos de configuração com distribuidores além de uma configuração simples de editor de um nível/2 trabalhadores.
  3. Faria mais sentido ter um processo de corretor central que faça todas as operações de COP de trânsito não computadas intensivas e envia apenas mensagens para processos por trás dos distribuidores quando a tarefa é de longa duração e deve ser equilibrada?
    • Em seguida, os nós balanceados de carga podem simplesmente responder de volta ao corretor central, o que parece mais fácil.
    • Por outro lado, isso parece em desacordo com a descentralização que é a força de NServicebus.
    • E se essa é a resposta, e o processo do processo de longa execução é uma resposta, como você mantém a publicação que permite extensibilidade posterior nos eventos publicados?
Foi útil?

Solução

O problema que você tem é que seus nós não vêem uma lista de assinantes um do outro. A razão pela qual você está tendo esse problema é que você está experimentando um cenário de produção (escala) no perfil NServiceBus padrão (LITE), que não suporta a expansão, mas torna o desenvolvimento de uma máquina única muito produtiva.

Para resolver o problema, execute o host NServiceBus usando o perfil de produção, conforme descrito nesta página:

http://docs.paricular.net/nservicebus/hosting/nservicebus-host/profiles

Isso permitirá que diferentes nós compartilhem a mesma lista de assinantes.

Fora isso, sua configuração está certa.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top