Domanda

Sto provando Spring-AMQP con supporto Spring-Integration, ho seguito la configurazione e il test:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
.

In questa configurazione è stato evidente che non appena il messaggio arriva a consumingChannel, sono stati riconosciuti e quindi rimossi dalla coda. Ho convalidato questo posizionando un sleep elevato dopo receive e controlla queue-size. Non ci sono ulteriori controlli su di esso.

Ora se ho impostato acknowledge-mode=MANUAL, non ci sono modi per eseguire il ack manuale tramite l'integrazione della molla.

La mia esigenza è di elaborare il messaggio e dopo l'elaborazione, eseguire un manual-ack, quindi fino a quando il messaggio ack rimane persiste in durableQ.

C'è un modo per gestire MANUAL Ack con spring-amqp-integration? Voglio evitare di passare ChannelAwareMessageListener a inbound-channel-adapter poiché voglio avere il controllo del receive del consumatore.

Aggiornamento:

Anche non sembra essere possibile quando si utilizza il proprio listener-container con inbound-channel-adapter:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>
.

Sopra la configurazione Getta Errore come la proprietà messageListener non è consentita, vedere Commento in linea sul tag. Quindi lo scopo di utilizzare listner-container è stato sconfitto (per esporre channel tramite ChannelAwareMessageListener).

A me spring-integration non può essere utilizzato per manual-acknowledgement (lo so, questo è un detto difficile!), Qualcuno può aiutarmi a convalidare questo o c'è qualche approccio / configurazione specifico richiesto per questo che mi manca?

È stato utile?

Soluzione

Il problema è perché si utilizza ASYNC HANDOFF usando un QueueChannel.Generalmente è meglio controllare la concorrenza nel contenitore (concurrent-consumers="2") e non eseguire alcun handoff async nel flusso (utilizzare DirectChannels).In questo modo, Auto Ack funzionerà bene.Invece di ricevere dal PollableChannel, iscriviti un new MessageHandler() a un SubscribableChannel.

Aggiornamento:

Normalmente non è necessario affrontare i messaggi in un'applicazione SI, ma l'equivalente del tuo test con una DirectChannel sarebbe ...

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });
.

Altri suggerimenti

MANUAL ACK è consentito solo tramite Channel.basicAck().Quindi, dovresti avere un accesso al Channel, su cui è stato ricevuto il tuo messaggio.

Prova a giocare con advice-chain of <int-amqp:inbound-channel-adapter>:

    .
  1. Implementare alcuni Advice come MethodBeforeAdvice
  2. Il advice-chain sul contenitore è applicato per ContainerDelegate#invokeListener
  3. Il primo argomento di quel metodo è esattamente un Channel
  4. Supponiamo che sia possibile posizionare il MessageProperties.headers che Channel all'interno di quel Advice
  5. e configura <int-amqp:inbound-channel-adapter> con mapped-request-headers in quel Channel.
  6. e alla fine tenta di richiamare basicAck() su quell'intestazione Channel dal messaggio di integrazione della molla in qualsiasi luogo del flusso a valle.
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top