1. Overview
1.概述
So, in a number of other tutorials, we’ve talked about BeanPostProcessor. In this tutorial, we’ll put them to use in a real-world example using Guava’s EventBus.
因此,在其他一些教程中,我们已经谈到了BeanPostProcessor。在本教程中,我们将使用Guava的EventBus在一个实际的例子中使用它们。
Spring’s BeanPostProcessor gives us hooks into the Spring bean lifecycle to modify its configuration.
Spring的BeanPostProcessor为我们提供了进入Spring Bean生命周期的钩子,以修改其配置。
BeanPostProcessor allows for direct modification of the beans themselves.
BeanPostProcessor允许直接修改Bean本身。
In this tutorial, we’re going to look at a concrete example of these classes integrating Guava’s EventBus.
在本教程中,我们将看一下这些类集成Guava的EventBus的具体例子。
2. Setup
2.设置
First, we need to set up our environment. Let’s add the Spring Context, Spring Expression, and Guava dependencies to our pom.xml:
首先,我们需要设置我们的环境。让我们将 Spring Context、Spring Expression 和 Guava 依赖关系添加到我们的 pom.xml。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
Next, let’s discuss our goals.
接下来,让我们讨论一下我们的目标。
3. Goals and Implementation
3.目标和实施
For our first goal, we want to utilize Guava’s EventBus to pass messages across various aspects of the system asynchronously.
对于我们的第一个目标,我们想利用Guava的EventBus来异步地在系统的各个方面传递消息。
Next, we want to register and unregister objects for events automatically on bean creation/destruction instead of using the manual method provided by EventBus.
接下来,我们想在Bean创建/销毁时自动为事件注册和取消注册对象,而不是使用EventBus提供的手动方法。
So, we’re now ready to start coding!
所以,我们现在准备好开始编码了!
Our implementation will consist of a wrapper class for Guava’s EventBus, a custom marker annotation, a BeanPostProcessor, a model object, and a bean to receive stock trade events from the EventBus. In addition, we’ll create a test case to verify the desired functionality.
我们的实现将由Guava的EventBus的封装类、自定义标记注解、BeanPostProcessor、模型对象和从EventBus接收股票交易事件的bean组成。 此外,我们将创建一个测试案例来验证所需的功能。
3.1. EventBus Wrapper
3.1 EventBus 封装器
To being with, we’ll define an EventBus wrapper to provide some static methods to easily register and unregister beans for events which will be used by the BeanPostProcessor:
首先,我们将定义一个EventBus包装器,以提供一些静态方法来轻松注册和取消注册BeanPostProcessor将使用的事件。
public final class GlobalEventBus {
public static final String GLOBAL_EVENT_BUS_EXPRESSION
= "T(com.baeldung.postprocessor.GlobalEventBus).getEventBus()";
private static final String IDENTIFIER = "global-event-bus";
private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());
private GlobalEventBus() {}
public static GlobalEventBus getInstance() {
return GlobalEventBus.GLOBAL_EVENT_BUS;
}
public static EventBus getEventBus() {
return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
}
public static void subscribe(Object obj) {
getEventBus().register(obj);
}
public static void unsubscribe(Object obj) {
getEventBus().unregister(obj);
}
public static void post(Object event) {
getEventBus().post(event);
}
}
This code provides static methods for accessing the GlobalEventBus and underlying EventBus as well as registering and unregistering for events and posting events. It also has a SpEL expression used as the default expression in our custom annotation to define which EventBus we want to utilize.
这段代码提供了访问GlobalEventBus和底层EventBus的静态方法,以及为事件注册和取消注册以及发布事件。它还有一个SpEL表达式,被用作我们自定义注解中的默认表达式,以定义我们要利用的EventBus。
3.2. Custom Marker Annotation
3.2.自定义标记注释
Next, let’s define a custom marker annotation which will be used by the BeanPostProcessor to identify beans to automatically register/unregister for events:
接下来,让我们定义一个自定义标记注解,它将被BeanPostProcessor用来识别要自动注册/取消注册事件的Bean。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface Subscriber {
String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
}
3.3. BeanPostProcessor
3.3.BeanPostProcessor
Now, we’ll define the BeanPostProcessor which will check each bean for the Subscriber annotation. This class is also a DestructionAwareBeanPostProcessor, which is a Spring interface adding a before-destruction callback to BeanPostProcessor. If the annotation is present, we’ll register it with the EventBus identified by the annotation’s SpEL expression on bean initialization and unregister it on bean destruction:
现在,我们将定义BeanPostProcessor,它将检查每个bean的Subscriber注解。这个类也是一个DestructionAwareBeanPostProcessor,这是一个Spring接口,为BeanPostProcessor添加一个销毁前回调。如果注解存在,我们将在Bean初始化时向注解的SpEL表达式所标识的EventBus注册,并在Bean销毁时取消注册。
public class GuavaEventBusBeanPostProcessor
implements DestructionAwareBeanPostProcessor {
Logger logger = LoggerFactory.getLogger(this.getClass());
SpelExpressionParser expressionParser = new SpelExpressionParser();
@Override
public void postProcessBeforeDestruction(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::unregister, "destruction");
}
@Override
public boolean requiresDestruction(Object bean) {
return true;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::register, "initialization");
return bean;
}
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
// See implementation below
}
}
The code above takes every bean and runs it through the process method, defined below. It processes it after the bean has been initialized and before it is destroyed. The requiresDestruction method returns true by default and we keep that behavior here as we check for the existence of the @Subscriber annotation in the postProcessBeforeDestruction callback.
上面的代码接收每个Bean并通过下面定义的process方法运行它。它在Bean被初始化后和被销毁前对其进行处理。requiresDestruction方法默认返回true,我们在这里保持这一行为,因为我们在postProcessBeforeDestruction回调中检查@Subscriber注解是否存在。
Let’s now look at the process method:
现在我们来看看process方法。
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
Object proxy = this.getTargetObject(bean);
Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
if (annotation == null)
return;
this.logger.info("{}: processing bean of type {} during {}",
this.getClass().getSimpleName(), proxy.getClass().getName(), action);
String annotationValue = annotation.value();
try {
Expression expression = this.expressionParser.parseExpression(annotationValue);
Object value = expression.getValue();
if (!(value instanceof EventBus)) {
this.logger.error(
"{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
return;
}
EventBus eventBus = (EventBus)value;
consumer.accept(eventBus, proxy);
} catch (ExpressionException ex) {
this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getName());
}
}
This code checks for the existence of our custom marker annotation named Subscriber and, if present, reads the SpEL expression from its value property. Then, the expression is evaluated into an object. If it’s an instance of EventBus, we apply the BiConsumer function parameter to the bean. The BiConsumer is used to register and unregister the bean from the EventBus.
这段代码检查我们名为Subscriber的自定义标记注解是否存在,如果存在,则从其value属性中读取SpEL表达式。然后,该表达式被评估为一个对象。如果它是EventBus的实例,我们将BiConsumer函数参数应用于bean。BiConsumer被用来从EventBus中注册和取消注册Bean。
The implementation of the method getTargetObject is as follows:
方法getTargetObject的实现如下。
private Object getTargetObject(Object proxy) throws BeansException {
if (AopUtils.isJdkDynamicProxy(proxy)) {
try {
return ((Advised)proxy).getTargetSource().getTarget();
} catch (Exception e) {
throw new FatalBeanException("Error getting target of JDK proxy", e);
}
}
return proxy;
}
3.4. StockTrade Model Object
3.4.股票事务模型对象
Next, let’s define our StockTrade model object:
接下来,让我们定义我们的StockTrade模型对象。
public class StockTrade {
private String symbol;
private int quantity;
private double price;
private Date tradeDate;
// constructor
}
3.5. StockTradePublisher Event Receiver
3.5.StockTradePublisher 事件接收者
Then, let’s define a listener class to notify us a trade was received so that we can write our test:
然后,让我们定义一个监听器类来通知我们收到了交易,这样我们就可以写出我们的测试。
@FunctionalInterface
public interface StockTradeListener {
void stockTradePublished(StockTrade trade);
}
Finally, we’ll define a receiver for new StockTrade events:
最后,我们将为新的StockTrade事件定义一个接收器。
@Subscriber
public class StockTradePublisher {
Set<StockTradeListener> stockTradeListeners = new HashSet<>();
public void addStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.add(listener);
}
}
public void removeStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.remove(listener);
}
}
@Subscribe
@AllowConcurrentEvents
void handleNewStockTradeEvent(StockTrade trade) {
// publish to DB, send to PubNub, ...
Set<StockTradeListener> listeners;
synchronized (this.stockTradeListeners) {
listeners = new HashSet<>(this.stockTradeListeners);
}
listeners.forEach(li -> li.stockTradePublished(trade));
}
}
The code above marks this class as a Subscriber of Guava EventBus events and Guava’s @Subscribe annotation marks the method handleNewStockTradeEvent as a receiver of events. The type of events it’ll receive is based on the class of the single parameter to the method; in this case, we’ll receive events of type StockTrade.
上面的代码将这个类标记为Guava EventBus事件的Subscriber,Guava的@Subscribe注解将方法handleNewStockTradeEvent标记为事件的接收器。它所接收的事件类型是基于该方法的单个参数的类别;在这种情况下,我们将接收StockTrade类型的事件。
The @AllowConcurrentEvents annotation allows the concurrent invocation of this method. Once we receive a trade we do any processing we wish then notify any listeners.
@AllowConcurrentEvents注解允许该方法的并发调用。一旦我们收到一个交易,我们会做任何我们希望的处理,然后通知任何监听器。
3.6. Testing
3.6.测试
Now let’s wrap up our coding with an integration test to verify the BeanPostProcessor works correctly. Firstly, we’ll need a Spring context:
现在让我们用一个集成测试来总结我们的编码,以验证BeanPostProcessor的正确工作。首先,我们需要一个Spring上下文。
@Configuration
public class PostProcessorConfiguration {
@Bean
public GlobalEventBus eventBus() {
return GlobalEventBus.getInstance();
}
@Bean
public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
return new GuavaEventBusBeanPostProcessor();
}
@Bean
public StockTradePublisher stockTradePublisher() {
return new StockTradePublisher();
}
}
Now we can implement our test:
现在我们可以实现我们的测试。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = PostProcessorConfiguration.class)
public class StockTradeIntegrationTest {
@Autowired
StockTradePublisher stockTradePublisher;
@Test
public void givenValidConfig_whenTradePublished_thenTradeReceived() {
Date tradeDate = new Date();
StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
AtomicBoolean assertionsPassed = new AtomicBoolean(false);
StockTradeListener listener = trade -> assertionsPassed
.set(this.verifyExact(stockTrade, trade));
this.stockTradePublisher.addStockTradeListener(listener);
try {
GlobalEventBus.post(stockTrade);
await().atMost(Duration.ofSeconds(2L))
.untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
} finally {
this.stockTradePublisher.removeStockTradeListener(listener);
}
}
boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
return Objects.equals(stockTrade.getSymbol(), trade.getSymbol())
&& Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate())
&& stockTrade.getQuantity() == trade.getQuantity()
&& stockTrade.getPrice() == trade.getPrice();
}
}
The test code above generates a stock trade and posts it to the GlobalEventBus. We wait at most two seconds for the action to complete and to be notified the trade was received by the stockTradePublisher. Furthermore, we validate the received trade was not modified in transit.
上面的测试代码生成了一笔股票交易并将其发布到GlobalEventBus。我们最多等待两秒钟来完成这个动作,并通知交易被stockTradePublisher收到。此外,我们验证收到的交易在运输过程中没有被修改。
4. Conclusion
4.总结
In conclusion, Spring’s BeanPostProcessor allows us to customize the beans themselves, providing us with a means to automate bean actions we would otherwise have to do manually.
总之,Spring的BeanPostProcessor允许我们定制Bean本身,为我们提供了一种手段来自动化Bean操作,否则我们就必须手动操作。
As always, source code is available over on GitHub.
一如既往,源代码可在GitHub上获得。