A Guide to Message Driven Beans in EJB – EJB中的消息驱动Bean指南

最后修改: 2018年 7月 22日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

Simply put, an Enterprise JavaBean (EJB) is a JEE component that runs on an application server.

简单地说,企业JavaBean(EJB)是一个在应用服务器上运行的JEE组件。

In this tutorial, we’ll discuss Message Driven Beans (MDB), responsible for handling message processing in an asynchronous context.

在本教程中,我们将讨论消息驱动Bean(MDB),负责处理异步上下文中的消息处理。

MDBs are part of JEE since EJB 2.0 specification; EJB 3.0 introduced the use of annotations, making it easier to create those objects. Here, we’ll focus on annotations.

自从EJB 2.0规范以来,MDB就是JEE的一部分;EJB 3.0引入了注解的使用,使得创建这些对象变得更加容易。在这里,我们将重点讨论注解。

2. Some Background

2.一些背景

Before we dive into the Message Driven Beans details, let’s review some concepts related to messaging.

在我们深入研究消息驱动Bean的细节之前,让我们回顾一下与消息传递有关的一些概念。

2.1. Messaging

2.1.信息传递

Messaging is a communication mechanism. By using messaging, programs can exchange data even if they’re written in different program languages or reside in different operational systems.

消息传递是一种通信机制。通过使用消息传递,程序可以交换数据,即使它们是用不同的程序语言编写的,或位于不同的操作系统中。

It offers a loosely coupled solution; neither the producer or the consumer of the information need to know details about each other.

它提供了一个松散耦合的解决方案;信息的生产者和消费者都不需要知道彼此的细节

Therefore, they don’t even have to be connected to the messaging system at the same time (asynchronous communication).

因此,他们甚至不需要同时连接到信息传递系统(异步通信)。

2.2. Synchronous and Asynchronous Communication

2.2.同步和异步通信

During synchronous communication, the requester waits until the response is back. In the meantime, the requester process stays blocked.

在同步通信期间,请求者一直等待,直到响应回来。在此期间,请求者进程保持阻塞状态。

In asynchronous communication, on the other hand, the requester initiates the operation but isn’t blocked by it; the requester can move on to other tasks and receive the response later.

另一方面,在异步通信中,请求者发起了操作,但并没有被其阻断;请求者可以继续进行其他任务,并在以后收到响应。

2.3. JMS

2.3.JMS

Java Message Services (“JMS”) is a Java API that supports messaging.

Java消息服务(”JMS”)是一个支持消息传递的Java API。

JMS provides peer to peer and publish/subscribe messaging models.

JMS提供点对点和发布/订阅的消息传递模式。

3. Message Driven Beans

3.消息驱动的Bean

An MDB is a component invoked by the container every time a message arrives on the messaging system. As a result, this event triggers the code inside this bean.

MDB是一个组件,每次有消息到达消息传递系统时都会被容器调用。因此,这个事件会触发这个Bean里面的代码。

We can perform a lot of tasks inside an MDB onMessage() method, since showing the received data on a browser or parsing and saving it to a database.

我们可以在MDB的onMessage()方法中执行很多任务,比如在浏览器上显示收到的数据,或者解析并保存到数据库。

Another example is submitting data to another queue after some processing. It all comes down to our business rules.

另一个例子是在一些处理后将数据提交给另一个队列。这一切都归结于我们的业务规则。

3.1. Message Driven Beans Lifecycle

3.1.消息驱动Bean的生命周期

An MDB has only two states:

一个MDB只有两种状态。

  1. It doesn’t exist on the container
  2. created and ready to receive messages

The dependencies, if present, are injected right after the MDB is created.

如果存在依赖关系,将在创建MDB后立即注入。

To execute instructions before receiving messages, we need to annotate a method with @javax.ejb.PostConstruct.

为了在接收消息之前执行指令,我们需要用@javax.ejb.PostConstruct来注释一个方法。

Both dependency injection and @javax.ejb.PostConstruct execution happen only once.

依赖性注入和@javax.ejb.PostConstruct执行都只发生一次。

After that, the MDB is ready to receive messages.

之后,MDB就可以接收信息了。

3.2. Transaction

3.2.事务

A message can be delivered to an MDB inside a transaction context.

一个消息可以被传递到一个事务上下文中的MDB。

Meaning that all operations within the onMessage() method are part of a single transaction.

意味着onMessage()方法内的所有操作都是单一事务的一部分。

Therefore, if a rollback happens, message system redelivers the data.

因此,如果发生回滚,消息系统会重新提供数据。

4. Working With Message Driven Beans

4.使用消息驱动的Bean工作

4.1. Creating the Consumer

4.1.创建消费者

To create a Message Driven Bean, we use @javax.ejb.MessageDriven annotation before the class name declaration.

为了创建一个消息驱动Bean,我们在类名声明前使用@javax.ejb.MessageDriven注解。

To handle the incoming message, we must implement the onMessage() method of the MessageListener interface:

为了处理收到的消息,我们必须实现MessageListener接口的onMessage()方法。

@MessageDriven(activationConfig = { 
    @ActivationConfigProperty(
      propertyName = "destination", 
      propertyValue = "tutorialQueue"), 
    @ActivationConfigProperty(
      propertyName = "destinationType", 
      propertyValue = "javax.jms.Queue")
})
public class ReadMessageMDB implements MessageListener {

    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("Message received: " + textMessage.getText());
        } catch (JMSException e) {
            System.out.println(
              "Error while trying to consume messages: " + e.getMessage());
        }
    }
}

Since this article focus on annotations instead of .xml descriptors we’ll use @ActivationConfigProperty rather than <activation-config-property>.

由于本文的重点是注释而不是.xml描述符,我们将使用@ActivationConfigProperty而不是

@ActivationConfigProperty is a key-value property that represents that configuration. We’ll use two properties inside activationConfig, setting the queue and the type of object the MDB will consume.

@ActivationConfigProperty是一个代表该配置的键值属性。我们将在activationConfig中使用两个属性,设置队列和MDB将消费的对象的类型。

Inside onMessage() method we can cast message parameter to TextMessage, BytesMessage, MapMessage StreamMessage or ObjectMessage.

onMessage()方法中,我们可以将消息参数转换为TextMessage、BytesMessage、MapMessage StreamMessageObjectMessage

However, for this article, we’ll only look at the message content on standard output.

然而,在这篇文章中,我们只看标准输出的信息内容。

4.2. Creating the Producer

4.2.创建生产者

As covered in section 2.1, producer and consumer services are completely independent and can even be written in different programming languages!

正如第2.1节所述,生产者和消费者服务是完全独立的,甚至可以用不同的编程语言编写!

We’ll produce our messages using Java Servlets:

我们将使用Java Servlets制作我们的信息。

@Override
protected void doGet(
  HttpServletRequest req, 
  HttpServletResponse res) 
  throws ServletException, IOException {
 
    String text = req.getParameter("text") != null ? req.getParameter("text") : "Hello World";

    try (
        Context ic = new InitialContext();
 
        ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
        Queue queue = (Queue) ic.lookup("queue/tutorialQueue");
 
        Connection connection = cf.createConnection();
    ) {
        Session session = connection.createSession(
          false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer publisher = session
          .createProducer(queue);
 
        connection.start();

        TextMessage message = session.createTextMessage(text);
        publisher.send(message);
 
    } catch (NamingException | JMSException e) {
        res.getWriter()
          .println("Error while trying to send <" + text + "> message: " + e.getMessage());
    } 

    res.getWriter()
      .println("Message sent: " + text);
}

After obtaining the ConnectionFactory and Queue instances, we must create a Connection and Session.

在获得ConnectionFactoryQueue实例之后,我们必须创建一个ConnectionSession

To create a session, we call the createSession method.

为了创建一个会话,我们调用createSession方法。

The first parameter in createSession is a boolean which defines whether the session is part of a transaction or not.

createSession中的第一个参数是一个boolean,它定义了会话是否是交易的一部分。

The second parameter is only used when the first is false. It allows us to describe the acknowledgment method that applies to incoming messages and takes the values of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE and Session.DUPS_OK_ACKNOWLEDGE.

第二个参数仅在第一个参数为false时使用。它允许我们描述适用于传入消息的确认方法,并采取Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGESession.DUPS_OK_ACKNOWLEDGE的值。

We can now start the connection, create a text message on the session object and send our message.

现在我们可以启动连接,在会话对象上创建一个文本信息,并发送我们的信息。

A consumer, bound to the same queue will receive a message and perform its asynchronous task.

绑定到同一队列的消费者将收到一个消息并执行其异步任务。

Also, apart from looking up JNDI objects, all actions in our try-with-resources block make sure the connection is closed if JMSException encounters an error, such as trying to connect to a non-existing queue or specifying a wrong port number to connect.

此外,除了查找JNDI对象外,我们的try-with-resources块中的所有操作都确保在JMSException遇到错误时关闭连接,例如试图连接到一个不存在的队列或指定错误的端口号来连接。

5. Testing the Message Driven Bean

5.测试消息驱动Bean

Send a message through the GET method on SendMessageServlet, as in:

通过SendMessageServlet上的GET方法发送一个消息,如:。

http://127.0.0.1:8080/producer/SendMessageServlet?text=Text to send

http://127.0.0.1:8080/producer/SendMessageServlet?text=Text,发送

Also, the servlet sends “Hello World” to the queue if we don’t send any parameters, as in http://127.0.0.1:8080/producer/SendMessageServlet.

另外,如果我们不发送任何参数,servlet会向队列发送“Hello World”,如http://127.0.0.1:8080/producer/SendMessageServlet。

6. Conclusion

6.结论

Message Driven Beans allow simple creation of a queue based application.

消息驱动Bean允许简单地创建一个基于队列的应用程序。

Therefore, MDBs allow us to decouple our applications into smaller services with localized responsibilities, allowing a much more modular and incremental system that can recover from system failures.

因此,MDB允许我们将应用程序解耦为具有本地化责任的小型服务,允许一个更加模块化和增量的系统,可以从系统故障中恢复。

As always the code is over on GitHub.

像往常一样,代码在GitHub上