Ohhnews

分类导航

$ cd ..
Baeldung原文

Spring Boot 中 JmsClient 使用指南

#spring boot#jms#消息队列#消息处理#activemq

1. 概述

Spring Framework 7.0 引入了 JmsClient,这是一个用于与 JMS 目的地进行交互的全新流式 API。它是 JmsTemplate 的现代替代方案,提供了一种更简洁的消息发送方式。

在本教程中,我们将构建一个简单的应用程序,通过 JmsClient 发送消息,并使用 @JmsListener 进行消费。此外,我们还将设置一个自定义的 MessageConverter 来处理 JSON 序列化,并讨论相关的测试策略。

2. 发送消息

在本文的代码示例中,我们假设正在构建一个类似于 Baeldung 的博客网站后端。我们使用 ActiveMQ Artemis 作为 JMS 代理。

首先,创建一个简单的 docker-compose.yml 以在本地启动它:

$ config
services:
  activemq:
    image: apache/activemq-artemis:2.37.0
    container_name: activemq-artemis
    ports:
      - "61616:61616"   # JMS
      - "8161:8161"     # Web Console
    environment:
      - ARTEMIS_USER=admin
      - ARTEMIS_PASSWORD=admin

然后,在 pom.xml 中添加 spring-boot-starter-activemq 依赖:

$ xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

添加此依赖后,Spring Boot 会自动实例化一个 JmsClient Bean 并将其添加到应用上下文中

接下来,我们将此客户端注入到组件中,并使用它向 articles-queue 发送消息:

$ java
@Component
class ArticlePublisher {
    private final JmsClient jmsClient;
    // 构造函数、日志记录器
    public void publish(String title, String author) {
        var article = new Article(title, author);
        log.info("发布文章: {}", article);
        jmsClient.destination("articles-queue")
          .send(article);
    }
    record Article(String title, String author) {
    }
}

此外,我们在 application.yml 中配置与 ActiveMQ Artemis 代理的连接:

$ config
spring:
  artemis:
    mode: native
    broker-url: tcp://localhost:61616
    user: admin
    password: admin

这样,配置就基本完成了!最后需要配置的是消息序列化方式。

3. 消息转换器 (Message Converters)

默认情况下,Spring JMS 使用 SimpleMessageConverter,它仅处理 Stringbyte[]Map。为了以 JSON 格式发送 POJO(例如 Article 记录),我们需要注册一个自定义的 MessageConverter

MessageConverter 接口包含两个方法:一个用于将 POJO 转换为 JMS Message,另一个用于逆向转换。在此实现中,我们将两者都委托给 Jackson 的 JsonMapper

$ java
@Component
class JsonMessageConverter implements MessageConverter {
    private final JsonMapper jsonMapper = JsonMapper.builder().build();
    @Override
    @SneakyThrows
    public Message toMessage(Object object, Session session) 
      throws JMSException, MessageConversionException {
        String json = jsonMapper.writeValueAsString(object);
        TextMessage msg = session.createTextMessage(json);
        msg.setStringProperty("_type", object.getClass().getName());
        return msg;
    }
    @Override
    @SneakyThrows
    public Object fromMessage(Message message) 
      throws JMSException, MessageConversionException {
        if (message instanceof TextMessage msg) {
            var clazz = Class.forName(msg.getStringProperty("_type"));
            return jsonMapper.readValue(msg.getText(), clazz);
        }
        throw new MessageConversionException("消息类型不是 TextMessage");
    }
}

当我们将对象序列化为 JSON TextMessage 时,会将完整的类名存储在 _type 属性中。在反序列化期间,我们读取该属性以重构正确的类型。

就是这样!Spring Boot 会自动检测 MessageConverter Bean,并将其自动装配到 JmsTemplateJmsClient 以及 @JmsListener 容器中

4. 消费消息

此时,Spring 会自动执行以下几项操作

  1. 检测 @JmsListener 注解
  2. 创建连接到代理的消息监听器容器
  3. 轮询队列
  4. 为每条传入的消息调用方法,并将反序列化后的 POJO 直接作为方法参数传递

让我们使用此注解创建一个 ArticleListener 组件,并从 articles-queue 消费消息:

$ java
@Component
class ArticleListener {
    private List<Article> receivedArticles = new CopyOnWriteArrayList<>();
    // getter、日志记录器
    @JmsListener(destination = "articles-queue")
    void onArticleReceived(ArticlePublisher.Article article) {
        log.info("收到文章: {}", article);
        receivedArticles.add(article);
    }
}

最后,让我们通过集成测试将所有内容串联起来。具体来说,我们使用 Testcontainers 启动一个真实的 ActiveMQ Artemis 代理,并端到端地验证整个流程:

$ java
@Testcontainers
@SpringBootTest(classes = SampleApplication.class)
class ArticleListenerLiveTest {
    @Container
    static ArtemisContainer activeMq = new ArtemisContainer(
      DockerImageName.parse("apache/activemq-artemis:2.37.0"))
        .withUser("admin")
        .withPassword("admin");
    @DynamicPropertySource
    static void configureProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.artemis.broker-url", activeMq::getBrokerUrl);
    }
    @Autowired
    ArticlePublisher articlePublisher;
    @Autowired
    ArticleListener articleListener;
    @Test
    void shouldReceivePublishedArticle() {
        articlePublisher.publish("Foo", "John Doe");
        articlePublisher.publish("Bar", "Jane Doe");
        await().untilAsserted(() ->
          assertThat(articleListener.getReceivedArticles())
            .map(Article::title)
            .containsExactly("Foo", "Bar"));
    }
}

如上所示,测试使用了 ArticlePublisher 发送了两篇文章,然后断言它们最终都被 ArticleListener 接收到了。

5. 结论

在本文中,我们探讨了 JmsClient(Spring 7.0 中用于发送 JMS 消息的全新流式 API)以及用于消费消息的 @JmsListener

在实践中,我们还介绍了如何装配自定义的 MessageConverter 来处理 JSON 序列化,并通过 Testcontainers 集成测试验证了完整的流程。

一如既往,所有代码示例均可在 GitHub 上找到。