Integrazione AMQP di primavera - Riconoscimento manuale del consumatore
-
21-12-2019 - |
Domanda
Sto provando Spring-AMQP
con supporto Spring-Integration
, ho seguito la configurazione e il test:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
<int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>
<int-amqp:inbound-channel-adapter
channel="consumingChannel"
queue-names="durableQ"
connection-factory="connectionFactory"
concurrent-consumers="1"
acknowledge-mode="AUTO"
/>
public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");
PollableChannel consumingChannel = context.getBean("consumingChannel",
PollableChannel.class);
int count = 0;
while (true) {
Message<?> msg = consumingChannel.receive(1000);
System.out.println((count++) + " \t -> " + msg);
try { //sleep to check number of messages in queue
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
.
In questa configurazione è stato evidente che non appena il messaggio arriva a consumingChannel
, sono stati riconosciuti e quindi rimossi dalla coda. Ho convalidato questo posizionando un sleep
elevato dopo receive
e controlla queue-size
. Non ci sono ulteriori controlli su di esso.
Ora se ho impostato acknowledge-mode=MANUAL
, non ci sono modi per eseguire il ack
manuale tramite l'integrazione della molla.
La mia esigenza è di elaborare il messaggio e dopo l'elaborazione, eseguire un manual-ack
, quindi fino a quando il messaggio ack
rimane persiste in durableQ
.
C'è un modo per gestire MANUAL
Ack con spring-amqp-integration
? Voglio evitare di passare ChannelAwareMessageListener
a inbound-channel-adapter
poiché voglio avere il controllo del receive
del consumatore.
Aggiornamento:
Anche non sembra essere possibile quando si utilizza il proprio listener-container
con inbound-channel-adapter
:
// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" />
<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="durableQ" />
<property name="acknowledgeMode" value="MANUAL" />
// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
<property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>
.
Sopra la configurazione Getta Errore come la proprietà messageListener
non è consentita, vedere Commento in linea sul tag. Quindi lo scopo di utilizzare listner-container
è stato sconfitto (per esporre channel
tramite ChannelAwareMessageListener
).
A me spring-integration
non può essere utilizzato per manual-acknowledgement
(lo so, questo è un detto difficile!), Qualcuno può aiutarmi a convalidare questo o c'è qualche approccio / configurazione specifico richiesto per questo che mi manca?
Soluzione
Il problema è perché si utilizza ASYNC HANDOFF usando un QueueChannel
.Generalmente è meglio controllare la concorrenza nel contenitore (concurrent-consumers="2"
) e non eseguire alcun handoff async nel flusso (utilizzare DirectChannel
s).In questo modo, Auto Ack funzionerà bene.Invece di ricevere dal PollableChannel
, iscriviti un new MessageHandler()
a un SubscribableChannel
.
Aggiornamento:
Normalmente non è necessario affrontare i messaggi in un'applicazione SI, ma l'equivalente del tuo test con una DirectChannel sarebbe ...
SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);
channel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("Got " + message);
}
});
. Altri suggerimenti
MANUAL ACK è consentito solo tramite Channel.basicAck()
.Quindi, dovresti avere un accesso al Channel
, su cui è stato ricevuto il tuo messaggio.
Prova a giocare con advice-chain
of <int-amqp:inbound-channel-adapter>
:
- .
- Implementare alcuni
Advice
comeMethodBeforeAdvice
- Il
advice-chain
sul contenitore è applicato perContainerDelegate#invokeListener
- Il primo argomento di quel metodo è esattamente un
Channel
- Supponiamo che sia possibile posizionare il
MessageProperties.headers
cheChannel
all'interno di quelAdvice
- e configura
<int-amqp:inbound-channel-adapter>
conmapped-request-headers
in quelChannel
. - e alla fine tenta di richiamare
basicAck()
su quell'intestazioneChannel
dal messaggio di integrazione della molla in qualsiasi luogo del flusso a valle.