Introduction to MBassador – MBassador简介

最后修改: 2017年 9月 9日

中文/混合/英文(键盘快捷键:t)

 

1. Overview

1.概述

Simply put, MBassador is a high-performance event bus utilizing the publish-subscribe semantics.

简单地说,MBassador利用发布-订阅语义的高性能事件总线。

Messages are broadcasted to one or more peers without the prior knowledge of how many subscribers there are, or how they use the message.

消息被广播给一个或多个对等体,而事先不知道有多少订户,或他们如何使用该消息。

2. Maven Dependency

2.Maven的依赖性

Before we can use the library, we need to add the mbassador dependency:

在使用该库之前,我们需要添加mbassador依赖性。

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>1.3.1</version>
</dependency>

3. Basic Event Handling

3.基本事件处理

3.1. Simple Example

3.1.简单的例子

We’ll start with a simple example of publishing a message:

我们将从一个简单的发布消息的例子开始。

private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched_thenHandleString() {
    dispatcher.post("TestString").now();
 
    assertNotNull(messageString);
    assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
    messageString = message;
}

At the top of this test class, we see the creation of a MBassador with its default constructor. Next, in the @Before method, we call subscribe() and pass in a reference to the class itself.

在这个测试类的顶部,我们看到用其默认构造函数创建了一个MBassador。接下来,在 @Before方法中,我们调用subscribe()并传递给类本身的引用。

In subscribe(), the dispatcher inspects the subscriber for @Handler annotations.

subscribe()中,分配器检查了订阅者的@Handler注解。

And, in the first test, we call dispatcher.post(…).now() to dispatch the message – which results in handleString() being called.

而且,在第一个测试中,我们调用dispatcher.post(..).now()来分派消息–这导致handleString()被调用。

This initial test demonstrates several important concepts. Any Object can be a subscriber, as long as it has one or more methods annotated with @Handler. A subscriber can have any number of handlers.

这个初始测试展示了几个重要的概念。任何对象都可以是一个订阅者,只要它有一个或多个用@Handler注释的方法。

We’re using test objects that subscribe to themselves for the sake of simplicity, but in most production scenarios, message dispatchers will in different classes from consumers.

为了简单起见,我们使用了对自己进行订阅的测试对象,但在大多数生产场景中,消息调度器将与消费者处于不同的类中。

Handler methods have only one input parameter – the message, and can’t throw any checked exceptions.

处理程序方法只有一个输入参数–消息,并且不能抛出任何检查过的异常。

Similar to the subscribe() method, the post method accepts any Object. This Object is delivered to subscribers.

subscribe()方法类似,post方法接受任何Object。这个Object将被传递给订阅者。

When a message is posted, it is delivered to any listeners that have subscribed to the message type.

当一条信息被发布时,它将被传递给任何订阅了该信息类型的听众。

Let’s add another message handler and send a different message type:

让我们添加另一个消息处理程序并发送一个不同的消息类型。

private Integer messageInteger; 

@Test
public void whenIntegerDispatched_thenHandleInteger() {
    dispatcher.post(42).now();
 
    assertNull(messageString);
    assertNotNull(messageInteger);
    assertTrue(42 == messageInteger);
}

@Handler
public void handleInteger(Integer message) {
    messageInteger = message;
}

As expected, when we dispatch an Integer, handleInteger() is called, and handleString() is not. A single dispatcher can be used to send more than one message type.

正如预期的那样,当我们调度一个Integer时,handleInteger()被调用,而handleString()没有被调用。一个调度器可以用来发送一个以上的消息类型。

3.2. Dead Messages

3.2.死亡信息

So where does a message go when there is no handler for it? Let’s add a new event handler and then send a third message type:

那么,在没有处理程序的情况下,消息会去哪里呢?让我们添加一个新的事件处理程序,然后发送第三个消息类型。

private Object deadEvent; 

@Test
public void whenLongDispatched_thenDeadEvent() {
    dispatcher.post(42L).now();
 
    assertNull(messageString);
    assertNull(messageInteger);
    assertNotNull(deadEvent);
    assertTrue(deadEvent instanceof Long);
    assertTrue(42L == (Long) deadEvent);
} 

@Handler
public void handleDeadEvent(DeadMessage message) {
    deadEvent = message.getMessage();
}

In this test, we dispatch a Long instead of an Integer. Neither handleInteger() nor handleString() are called, but handleDeadEvent() is.

在这个测试中,我们派发了一个Long而不是Integer.既没有调用handleInteger()也没有调用handleString(),但调用了handleDeadEvent()

When there are no handlers for a message, it gets wrapped in a DeadMessage object. Since we added a handler for Deadmessage, we capture it.

当一个消息没有处理程序时,它会被包裹在一个DeadMessage对象中。由于我们为Deadmessage添加了一个处理程序,所以我们捕获它。

DeadMessage can be safely ignored; if an application does not need to track dead messages, they can be allowed to go nowhere.

DeadMessage可以被安全地忽略;如果一个应用程序不需要跟踪死信息,可以允许它们无处可去。

4. Using an Event Hierarchy

4.使用事件层次结构

Sending String and Integer events is limiting. Let’s create a few message classes:

发送StringInteger事件是有限的。让我们创建几个消息类。

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
    int code;

    // setters and getters
}

We have a simple base class and two classes that extend it.

我们有一个简单的基类和两个扩展它的类。

4.1. Sending a Base Class Message

4.1.发送一个基类消息

We’ll start with Message events:

我们将从Message事件开始。

private MBassador<Message> dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched_thenMessageHandled() {
    dispatcher.post(new Message()).now();
    assertNotNull(message);
    assertNull(ackMessage);
    assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
    this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
   rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
    ackMessage = message;
}

Discover MBassador – a high-performance pub-sub event bus. This limits us to using Messages but adds an additional layer of type safety.

发现MBassador – 一个高性能的pub-sub事件总线。这限制了我们使用Messages,但增加了一层额外的类型安全。

When we send a Message, handleMessage() receives it. The other two handlers do not.

当我们发送一个Message时,handleMessage()接收它。其他两个处理程序则没有。

4.2. Sending a Subclass Message

4.2.发送子类消息

Let’s send a RejectMessage:

让我们发送一个RejectMessage

@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(message);
    assertNotNull(rejectMessage);
    assertNull(ackMessage);
}

When we send a RejectMessage both handleRejectMessage() and handleMessage() receive it.

当我们发送一个RejectMessage时,handleRejectMessage()handleMessage()都会收到它。

Since RejectMessage extends Message, the Message handler received it, in addition to the RejectMessage handler.

由于RejectMessage扩展了Message,Message处理程序收到了它,除了RejectMessage处理程序之外。

Let’s verify this behavior with an AckMessage:

让我们用一个AckMessage来验证这个行为。

@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
    dispatcher.post(new AckMessage()).now();
 
    assertNotNull(message);
    assertNotNull(ackMessage);
    assertNull(rejectMessage);
}

Just as we expected, when we send an AckMessage, both handleAckMessage() and handleMessage() receive it.

正如我们预期的那样,当我们发送一个AckMessage时,handleAckMessage()handleMessage()都会收到它。

5. Filtering Messages

5.过滤信息

Organizing messages by type is already a powerful feature, but we can filter them even more.

按类型组织信息已经是一个强大的功能,但我们可以更多地过滤它们。

5.1. Filter on Class and Subclass

5.1.对类和子类的过滤

When we posted a RejectMessage or AckMessage, we received the event in both the event handler for the particular type and in the base class.

当我们发布一个RejectMessageAckMessage时,我们在特定类型的事件处理程序和基类中都收到了该事件。

We can solve this type hierarchy issue by making Message abstract and creating a class such as GenericMessage. But what if we don’t have this luxury?

我们可以通过使Message抽象化并创建一个诸如GenericMessage的类来解决这个类型层次的问题。但如果我们没有这个条件呢?

We can use message filters:

我们可以使用消息过滤器。

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched_thenMessageFiltered() {
    dispatcher.post(new Message()).now();
 
    assertNotNull(baseMessage);
    assertNull(subMessage);
}

@Test
public void whenRejectDispatched_thenRejectFiltered() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(subMessage);
    assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
    this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
    this.subMessage = message;
}

The filters parameter for the @Handler annotation accepts a Class that implements IMessageFilter. The library offers two examples:

@Handler注解的filters参数接受一个实现IMessageFilterClass。该库提供了两个例子。

The Filters.RejectSubtypes does as its name suggests: it will filter out any subtypes. In this case, we see that RejectMessage is not handled by handleBaseMessage().

Filters.RejectSubtypes如其名:它将过滤掉任何子类型。在这种情况下,我们看到RejectMessage没有被handleBaseMessage()所处理。

The Filters.SubtypesOnly also does as its name suggests: it will filter out any base types. In this case, we see that Message is not handled by handleSubMessage().

Filters.SubtypesOnly也如其名字所示:它将过滤掉任何基本类型。在这种情况下,我们看到Message没有被handleSubMessage().所处理。

5.2. IMessageFilter

5.2.IMessageFilter

The Filters.RejectSubtypes and the Filters.SubtypesOnly both implement IMessageFilter.

Filters.RejectSubtypesFilters.SubtypesOnly都实现IMessageFilter

RejectSubTypes compares the class of the message to its defined message types and will only allow through messages that equal one of its types, as opposed to any subclasses.

RejectSubTypes将消息的类别与其定义的消息类型进行比较,只允许通过与其类型相同的消息,而不是任何子类。

5.3. Filter With Conditions

5.3.带条件的过滤器

Fortunately, there is an easier way of filtering messages. MBassador supports a subset of Java EL expressions as conditions for filtering messages.

幸运的是,有一种更简单的过滤消息的方法。MBassador支持Java EL表达式的一个子集作为过滤消息的条件。

Let’s filter a String message based on its length:

让我们根据长度来过滤一个String消息。

private String testString;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

The “foobar!” message is seven characters long and is filtered. Let’s send a shorter String:

“foobar!”信息有七个字符长,并被过滤。让我们发送一个更短的String


@Test
public void whenShortStringDispatched_thenStringHandled() {
    dispatcher.post("foobar").now();
 
    assertNotNull(testString);
}

Now, the “foobar” is only six characters long and is passed through.

现在,”foobar “只有六个字符长,并被通过了。

Our RejectMessage contains a field with an accessor. Let’s write a filter for that:

我们的RejectMessage包含一个带访问器的字段。让我们为它写一个过滤器。

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {

    RejectMessage testReject = new RejectMessage();
    testReject.setCode(-1);

    dispatcher.post(testReject).now();
 
    assertNull(rejectMessage);
    assertNotNull(subMessage);
    assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
    this.rejectMessage = rejectMessage;
}

Here again, we can query a method on an object and either filter the message or not.

在这里,我们可以再次查询一个对象上的方法,并且可以过滤信息或不过滤。

5.4. Capture Filtered Messages

5.4.捕获经过过滤的信息

Similar to DeadEvents, we may want to capture and process filtered messages. There is a dedicated mechanism for capturing filtered events too. Filtered events are treated differently from “dead” events.

DeadEvents类似,我们可能想要捕获和处理过滤的消息。也有一个专门的机制用于捕获过滤的事件。过滤后的事件与 “死亡 “事件的处理方式不同

Let’s write a test that illustrates this:

让我们写一个测试来说明这一点。

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
    assertNotNull(filteredMessage);
    assertTrue(filteredMessage.getMessage() instanceof String);
    assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
    this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
    this.deadMessage = deadMessage;
}

With the addition of a FilteredMessage handler, we can track Strings that are filtered because of their length. The filterMessage contains our too-long String while deadMessage remains null.

通过添加一个FilteredMessage处理程序,我们可以追踪那些因为长度而被过滤的字符串filterMessage包含了我们太长的String,而deadMessage仍然是null。

6. Asynchronous Message Dispatch and Handling

6.异步消息调度和处理

So far all of our examples have used synchronous message dispatch; when we called post.now() the messages were delivered to each handler in the same thread we called post() from.

到目前为止,我们所有的例子都使用了同步消息调度;当我们调用post.now()时,消息被传递到我们调用post()的同一线程中的每个处理程序。

6.1. Asynchronous Dispatch

6.1.异步调度

The MBassador.post() returns a SyncAsyncPostCommand. This class offers several methods, including:

MBassador.post()返回一个SyncAsyncPostCommand。这个类提供了几个方法,包括。

  • now() – dispatch messages synchronously; the call will block until all messages have been delivered
  • asynchronously() – executes the message publication asynchronously

Let’s use asynchronous dispatch in a sample class. We’ll use Awaitility in these tests to simplify the code:

让我们在一个示例类中使用异步调度。我们将在这些测试中使用Awaitility来简化代码。

private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched_thenMessageReceived() {
    dispatcher.post("foobar").asynchronously();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
    this.testString = message;
    ready.set(true);
}

We call asynchronously() in this test, and use an AtomicBoolean as a flag with await() to wait for the delivery thread to deliver the message.

我们在这个测试中调用asynchronously(),并使用AtomicBoolean作为await()的标志来等待交付线程交付消息。

If we comment out the call to await(), we risk the test failing, because we check testString before the delivery thread completes.

如果我们注释掉对await()的调用,我们就会有测试失败的风险,因为我们在交付线程完成之前检查testString

6.2. Asynchronous Handler Invocation

6.2.异步处理程序的调用

Asynchronous dispatch allows the message provider to return to message processing before the messages are delivered to each handler, but it still calls each handler in order, and each handler has to wait for the previous to finish.

异步调度允许消息提供者在消息传递给每个处理程序之前返回到消息处理,但它仍然按顺序调用每个处理程序,每个处理程序必须等待前一个处理程序完成。

This can lead to problems if one handler performs an expensive operation.

如果一个处理程序执行了一个昂贵的操作,这可能会导致问题。

MBassador provides a mechanism for asynchronous handler invocation. Handlers configured for this receive messages in their thread:

MBassador提供了一个异步处理程序调用的机制。为之配置的处理程序在其线程中接收消息。

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync_thenHandled() {
    dispatcher.post(42).now();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testInteger);
    assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
 
    this.invocationThreadName = Thread.currentThread().getName();
    this.testInteger = message;
    ready.set(true);
}

Handlers can request asynchronous invocation with the delivery = Invoke.Asynchronously property on the Handler annotation. We verify this in our test by comparing the Thread names in the dispatching method and the handler.

处理程序可以通过delivery = Invoke.Asynchronously属性在Handler注解上请求异步调用。我们在测试中通过比较调度方法和处理程序中的Thread名称来验证这一点。

7. Customizing MBassador

7.定制MBassador

So far we’ve been using an instance of MBassador with its default configuration. The dispatcher’s behavior can be modified with annotations, similar to those we have seen so far; we’ll cover a few more to finish this tutorial.

到目前为止,我们一直在使用MBassador的一个实例,它的默认配置。调度器的行为可以用注解来修改,与我们到目前为止所看到的注解类似;我们将多讲一些来完成本教程。

7.1. Exception Handling

7.1.异常处理

Handlers cannot define checked exceptions. Instead, the dispatcher can be provided with an IPublicationErrorHandler as an argument to its constructor:

处理程序不能定义检查的异常。相反,可以向调度器提供一个IPublicationErrorHandler作为其构造函数的参数。

public class MBassadorConfigurationTest
  implements IPublicationErrorHandler {

    private MBassador dispatcher;
    private String messageString;
    private Throwable errorCause;

    @Before
    public void prepareTests() {
        dispatcher = new MBassador<String>(this);
        dispatcher.subscribe(this);
    }

    @Test
    public void whenErrorOccurs_thenErrorHandler() {
        dispatcher.post("Error").now();
 
        assertNull(messageString);
        assertNotNull(errorCause);
    }

    @Test
    public void whenNoErrorOccurs_thenStringHandler() {
        dispatcher.post("Error").now();
 
        assertNull(errorCause);
        assertNotNull(messageString);
    }

    @Handler
    public void handleString(String message) {
        if ("Error".equals(message)) {
            throw new Error("BOOM");
        }
        messageString = message;
    }

    @Override
    public void handleError(PublicationError error) {
        errorCause = error.getCause().getCause();
    }
}

When handleString() throws an Error, it is saved to errorCause.

handleString()抛出一个错误时,它被保存到errorCause.

7.2. Handler Priority

7.2 处理程序优先级

Handlers are called in reverse order of how they are added, but this isn’t behavior we want to rely on. Even with the ability to call handlers in their threads, we may still need to know what order they will be called in.

处理程序的调用顺序与它们被添加的顺序相反,但这并不是我们想要依赖的行为。即使有了在其线程中调用处理程序的能力,我们可能仍然需要知道它们将以何种顺序被调用。

We can set handler priority explicitly:

我们可以明确地设置处理程序的优先级。

private LinkedList<Integer> list = new LinkedList<>();

@Test
public void whenRejectDispatched_thenPriorityHandled() {
    dispatcher.post(new RejectMessage()).now();

    // Items should pop() off in reverse priority order
    assertTrue(1 == list.pop());
    assertTrue(3 == list.pop());
    assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
    list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
    list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage) 
    logger.error("Reject handler #3");
    list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
    list.push(1);
}

Handlers are called from highest priority to lowest. Handlers with the default priority, which is zero, are called last. We see that the handler numbers pop() off in reverse order.

处理程序从最高优先级到最低被调用。默认优先级为零的处理程序被最后调用。我们看到处理程序的编号pop()以相反的顺序关闭。

7.3. Reject Subtypes, the Easy Way

7.3.拒绝子类型,简单的方法

What happened to handleMessage() in the test above? We don’t have to use RejectSubTypes.class to filter our sub types.

上面的测试中的handleMessage()发生了什么?我们不需要使用RejectSubTypes.class来过滤我们的子类型。

RejectSubTypes is a boolean flag that provides the same filtering as the class, but with better performance than the IMessageFilter implementation.

RejectSubTypes是一个布尔标志,它提供了与该类相同的过滤功能,但比IMessageFilter实现具有更好的性能。

We still need to use the filter-based implementation for accepting subtypes only, though.

不过,我们仍然需要使用基于过滤器的实现,只接受子类型。

8. Conclusion

8.结论

MBassador is a simple and straightforward library for passing messages between objects. Messages can be organized in a variety of ways and can be dispatched synchronously or asynchronously.

MBassador是一个简单明了的库,用于在对象之间传递消息。消息可以以各种方式组织,并可以同步或异步地进行调度。

And, as always, the example is available in this GitHub project.

而且,像往常一样,这个例子可以在这个GitHub项目中找到。