Difference between revisions of "Spring RabbitMQ cheat sheet"

From DarkWiki
Jump to: navigation, search
(Listening for events)
(Listening for events)
 
(One intermediate revision by the same user not shown)
Line 60: Line 60:
 
         // ...
 
         // ...
 
     }
 
     }
 +
</source>
 +
 +
This pattern is fine for one-offs, but generally it is better to declare your queues elsewhere. This allows them to be properly configured at some point in the future.
 +
 +
===Different topic listener arrangements===
 +
 +
<source lang="java">
 +
import org.slf4j.Logger;
 +
import org.slf4j.LoggerFactory;
 +
import org.springframework.amqp.rabbit.annotation.Exchange;
 +
import org.springframework.amqp.rabbit.annotation.Queue;
 +
import org.springframework.amqp.rabbit.annotation.QueueBinding;
 +
import org.springframework.amqp.rabbit.annotation.RabbitListener;
 +
import org.springframework.stereotype.Service;
 +
 +
/**
 +
* All messages are sent via the TopicExchange named 'eventExchange'.
 +
*/
 +
@Service
 +
public class EventConsumer {
 +
 +
    private static Logger logger = LoggerFactory.getLogger(EventConsumer.class);
 +
 +
    /*
 +
        receive1 and receive2 both listen on the same queue. Therefore, only one endpoint will be called for each
 +
        message as Rabbit will effectively Round-Robin them.
 +
    */
 +
 +
    @RabbitListener(queues="inboundEventQueue")
 +
    public void receive1(EventMessage message) {
 +
        logger.info("receive1 Received message '{}'", message.getName());
 +
    }
 +
 +
    @RabbitListener(queues="inboundEventQueue")
 +
    public void receive2(EventMessage message) {
 +
        logger.info("receive2 Received message '{}'", message.getName());
 +
    }
 +
 +
    /*
 +
        selective1 and selective2 both get their own queues (generated by Spring with a random name). They are not
 +
        persistent and will auto-remove when this stops. selective1 will receive all messages sent with a routing
 +
        key of "event.", while selective2 will only receive those events that match "event.<anything>.two".
 +
 +
        event.test.one -> selective1
 +
        event.test.two -> selective1, selective2
 +
    */
 +
 +
    @RabbitListener(bindings = @QueueBinding(
 +
            value = @Queue,
 +
            exchange = @Exchange(value = "eventExchange", type = "topic"),
 +
            key = "event.#"))
 +
    public void selective1(EventMessage message) {
 +
        logger.info("selective1 Received message '{}'", message.getName());
 +
    }
 +
 +
    @RabbitListener(bindings = @QueueBinding(
 +
            value = @Queue,
 +
            exchange = @Exchange(value = "eventExchange", type = "topic"),
 +
            key = "event.#.two"))
 +
    public void selective2(EventMessage message) {
 +
        logger.info("selective2 Received message '{}'", message.getName());
 +
    }
 +
 +
    /*
 +
        selective3 and selective4 both share a named queue. This means that Rabbit will Round-Robin those whose route
 +
        key satisfies the pattern. So, event.something.four could be handled by either selective3 or selective4, but
 +
        event.something.three will only arrive at selective3.
 +
    */
 +
 +
    @RabbitListener(bindings = @QueueBinding(
 +
            value = @Queue("inboundSharedEventQueue"),
 +
            exchange = @Exchange(value = "eventExchange", type = "topic"),
 +
            key = "event.#"))
 +
    public void selective3(EventMessage message) {
 +
        logger.info("selective3 Received message '{}'", message.getName());
 +
    }
 +
 +
    @RabbitListener(bindings = @QueueBinding(
 +
            value = @Queue("inboundSharedEventQueue"),
 +
            exchange = @Exchange(value = "eventExchange", type = "topic"),
 +
            key = "event.#.four"))
 +
    public void selective4(EventMessage message) {
 +
        logger.info("selective4 Received message '{}'", message.getName());
 +
    }
 +
 +
    /*
 +
        This is final example that will declare it's own named queue (without any other configuration being
 +
        necessary). While convenient, it is prone to mistakes and can be tricky to reconfigure at a later date.
 +
    */
 +
 +
    @RabbitListener(bindings = @QueueBinding(
 +
            value = @Queue("annotationDefinedQueue"),
 +
            exchange = @Exchange(value = "eventExchange", type = "topic"),
 +
            key = "event.#.five"))
 +
    public void selective5(EventMessage message) {
 +
        logger.info("selective5 Received message '{}'", message.getName());
 +
    }
 +
 +
}
 
</source>
 
</source>

Latest revision as of 13:52, 3 April 2020

Sending to topics

On the Producer side (the one that raises the events) we create our TopicExchange, giving it a sensible name:

import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class EventProducerConfig {

    @Bean("eventExchange")
    public TopicExchange eventExchange() {
        return new TopicExchange("eventExchange");
    }

}

It can then be used by our Producer to send event notifications to anyone who subscribes to the exchange:

@Component
public class EventProducer {
    private final RabbitTemplate rabbitTemplate;
    private final Exchange exchange;

    public EventProducer(RabbitTemplate rabbitTemplate,@Qualifier("eventExchange") Exchange exchange) {
        this.rabbitTemplate = rabbitTemplate;
        this.exchange = exchange;
    }

    public void signal(String eventId,EventMessage message) {
        rabbitTemplate.convertAndSend(exchange.getName(), eventId, message);
    }
}

You can then send EventMessage payloads with event ids such as "event.user.created" or "event.user.deleted" using the signal method.

Listening for events

This shows how to use two dynamically created queues (one for created and one for deleted events). They will not be persistent (won't survive restart).

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#.created"))
    public void consumeCreated(EventMessage message) {
        // ...
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#.deleted"))
    public void consumeDeleted(EventMessage message) {
        // ...
    }

This pattern is fine for one-offs, but generally it is better to declare your queues elsewhere. This allows them to be properly configured at some point in the future.

Different topic listener arrangements

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * All messages are sent via the TopicExchange named 'eventExchange'.
 */
@Service
public class EventConsumer {

    private static Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    /*
        receive1 and receive2 both listen on the same queue. Therefore, only one endpoint will be called for each
        message as Rabbit will effectively Round-Robin them.
     */

    @RabbitListener(queues="inboundEventQueue")
    public void receive1(EventMessage message) {
        logger.info("receive1 Received message '{}'", message.getName());
    }

    @RabbitListener(queues="inboundEventQueue")
    public void receive2(EventMessage message) {
        logger.info("receive2 Received message '{}'", message.getName());
    }

    /*
        selective1 and selective2 both get their own queues (generated by Spring with a random name). They are not
        persistent and will auto-remove when this stops. selective1 will receive all messages sent with a routing
        key of "event.", while selective2 will only receive those events that match "event.<anything>.two".

        event.test.one -> selective1
        event.test.two -> selective1, selective2
     */

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#"))
    public void selective1(EventMessage message) {
        logger.info("selective1 Received message '{}'", message.getName());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#.two"))
    public void selective2(EventMessage message) {
        logger.info("selective2 Received message '{}'", message.getName());
    }

    /*
        selective3 and selective4 both share a named queue. This means that Rabbit will Round-Robin those whose route
        key satisfies the pattern. So, event.something.four could be handled by either selective3 or selective4, but
        event.something.three will only arrive at selective3.
     */

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("inboundSharedEventQueue"),
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#"))
    public void selective3(EventMessage message) {
        logger.info("selective3 Received message '{}'", message.getName());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("inboundSharedEventQueue"),
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#.four"))
    public void selective4(EventMessage message) {
        logger.info("selective4 Received message '{}'", message.getName());
    }

    /*
        This is final example that will declare it's own named queue (without any other configuration being
        necessary). While convenient, it is prone to mistakes and can be tricky to reconfigure at a later date.
     */

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("annotationDefinedQueue"),
            exchange = @Exchange(value = "eventExchange", type = "topic"),
            key = "event.#.five"))
    public void selective5(EventMessage message) {
        logger.info("selective5 Received message '{}'", message.getName());
    }

}