1. Introduction
1.介绍
In this follow-up to our tutorial on using Couchbase in a Spring application, we explore the asynchronous nature of the Couchbase SDK and how it may be used to perform persistence operations in batches, thus allowing our application to achieve optimal use of Couchbase resources.
1.1. CrudService Interface
1.1.CrudService 接口
First, we augment our generic CrudService interface to include batch operations:
首先,我们增强了我们的通用CrudService接口,以包括批量操作。
public interface CrudService<T> {
...
List<T> readBulk(Iterable<String> ids);
void createBulk(Iterable<T> items);
void updateBulk(Iterable<T> items);
void deleteBulk(Iterable<String> ids);
boolean exists(String id);
}
1.2. CouchbaseEntity Interface
1.2.CouchbaseEntity 接口
We define an interface for the entities that we want to persist:
我们为我们想要持久化的实体定义一个接口。
public interface CouchbaseEntity {
String getId();
void setId(String id);
}
1.3. AbstractCrudService Class
1.3.AbstractCrudService 类
Then we will implement each of these methods in a generic abstract class. This class is derived from the PersonCrudService class that we used in the previous tutorial and begins as follows:
然后我们将在一个通用的抽象类中实现这些方法中的每一个。这个类派生于我们在上一个教程中使用的PersonCrudService类,其开头如下。
public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
private BucketService bucketService;
private Bucket bucket;
private JsonDocumentConverter<T> converter;
public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
this.bucketService = bucketService;
this.converter = converter;
}
protected void loadBucket() {
bucket = bucketService.getBucket();
}
...
}
2. The Asynchronous Bucket Interface
2.异步水桶接口
The Couchbase SDK provides the AsyncBucket interface for performing asynchronous operations. Given a Bucket instance, you can obtain its asynchronous version via the async() method:
Couchbase SDK提供了AsyncBucket接口来执行异步操作。给定一个Bucket实例,你可以通过async()方法获得其异步版本。
AsyncBucket asyncBucket = bucket.async();
3. Batch Operations
3.批量操作
To perform batch operations using the AsyncBucket interface, we employ the RxJava library.
为了使用AsyncBucket接口执行批处理操作,我们采用了RxJava/em>库。
3.1. Batch Read
3.1.批量读取
Here we implement the readBulk method. First we use the AsyncBucket and the flatMap mechanism in RxJava to retrieve the documents asynchronously into an Observable<JsonDocument>, then we use the toBlocking mechanism in RxJava to convert these to a list of entities:
这里我们实现了readBulk方法。首先我们使用RxJava中的AsyncBucket和flatMap机制,将文件异步检索到Observable<JsonDocument>中,然后我们使用RxJava中的toBlocking机制,将这些转换为实体的列表。
@Override
public List<T> readBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable<JsonDocument> asyncOperation = Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(String key) {
return asyncBucket.get(key);
}
});
List<T> items = new ArrayList<T>();
try {
asyncOperation.toBlocking()
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument doc) {
T item = converter.fromDocument(doc);
items.add(item);
}
});
} catch (Exception e) {
logger.error("Error during bulk get", e);
}
return items;
}
3.2. Batch Insert
3.2.批量插入
We again use RxJava’s flatMap construct to implement the createBulk method.
我们再次使用RxJava的flatMap结构来实现createBulk方法。
Since bulk mutation requests are produced faster than their responses can be generated, sometimes resulting in an overload condition, we institute a retry with exponential delay whenever a BackpressureException is encountered:
由于批量突变请求的产生速度快于其响应的产生速度,有时会导致过载情况,因此只要遇到BackpressureException,我们就会以指数级的延迟重试。
@Override
public void createBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
if(t.getId() == null) {
t.setId(UUID.randomUUID().toString());
}
JsonDocument doc = converter.toDocument(t);
return asyncBucket.insert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
3.3. Batch Update
3.3.批量更新
We use a similar mechanism in the updateBulk method:
我们在updateBulk方法中使用了一个类似的机制。
@Override
public void updateBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
JsonDocument doc = converter.toDocument(t);
return asyncBucket.upsert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
3.4. Batch Delete
3.4.批量删除
And we write the deleteBulk method as follows:
而我们写的deleteBulk方法如下。
@Override
public void deleteBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(String key) {
return asyncBucket.remove(key)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
4. PersonCrudService
4.PersonCrudService
Finally, we write a Spring service, PersonCrudService, that extends our AbstractCrudService for the Person entity.
最后,我们编写一个Spring服务,PersonCrudService,它为Person实体扩展了我们的AbstractCrudService。
Since all of the Couchbase interaction is implemented in the abstract class, the implementation for an entity class is trivial, as we only need to ensure that all our dependencies are injected and our bucket loaded:
由于所有的Couchbase交互都是在抽象类中实现的,所以实体类的实现是微不足道的,因为我们只需要确保所有的依赖被注入和我们的桶被加载。
@Service
public class PersonCrudService extends AbstractCrudService<Person> {
@Autowired
public PersonCrudService(
@Qualifier("TutorialBucketService") BucketService bucketService,
PersonDocumentConverter converter) {
super(bucketService, converter);
}
@PostConstruct
private void init() {
loadBucket();
}
}
5. Conclusion
5.结论
The source code shown in this tutorial is available in the github project.
本教程中显示的源代码可在github项目中找到。
You can learn more about the Couchbase Java SDK at the official Couchbase developer documentation site.
您可以在官方的Couchbase开发者文档网站上了解更多关于Couchbase Java SDK的信息。