Pergunta

Não consigo encontrar uma maneira de ouvir para novas conexões de produtor e consumidor (ou interrupções de conexão) no ActiveMQ (versão Java). Quero poder dizer aos consumidores (ou eles podem descobrir) que a conexão do produtor caiu. O contrário (o produtor descobriu que um certo consumidor desconectado) também é necessário.

Eu apreciaria alguma ajuda.

Foi útil?

Solução

Eu acho que você quer ouvir novos produtores e consumidores em um destino específico (uma fila ou tópico específico). Isso está certo?

Você pode instanciar o ConsumerEventsource e o ProductionEventsource e registrar seus próprios ouvintes, chamando o setConsumerListener e o SetProducerListener neles, respectivamente.

Então:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

Se você observar o código do ConsumerventSource ou ProductionEventsource, verá que eles são objetos simples que usam os métodos do AdvisorySupport para ouvir um tópico consultivo especial cujo negócio é transmitir notícias sobre produtores e consumidores. Você pode aprender mais lendo o código -fonte dessas classes.

Seu uso de "conexão" é potencialmente um problema; Em ActiveMQ Land (que é um subconjunto de terra JMS), uma "conexão" é um objeto de nível inferior que não está associado a um destino específico. Um cliente específico cria uma "sessão" a partir de uma conexão - ainda não específica para um destino - e, em seguida, cria um Queueueeusender, Queuereceiver específico de destino ou TopicSubscripter. Quando esses são criados, ou quando as sessões que os criaram morrem, esses são os eventos que você deseja ouvir e ouvirão se você usar o código acima.

Outras dicas

Todas as informações necessárias são publicadas nos tópicos consultivos do ActiveMQ, como "ActiveMq.advisory.Connection" ou simplesmente "ActiveMq.advisory ..>" para todos eles. Até os eventos que acontecem em uma conexão Stomp são publicados nos tópicos consultivos do ActiveMQ. O código a seguir fornece um exemplo disso (testado com um cliente flexionado por meio do Stomp):

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top