Spring Data with Reactive Cassandra – 含有反应式卡桑德拉的Spring数据

最后修改: 2018年 11月 30日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In this tutorial, we’ll learn how to use reactive data access features of Spring Data Cassandra.

在本教程中,我们将学习如何使用Spring Data Cassandra的反应式数据访问功能。

Particularly, this is the third article of the Spring Data Cassandra article series. In this one, we’ll expose a Cassandra database using a REST API.

特别是,这是Spring Data Cassandra文章系列的第三篇文章。在这一篇中,我们将使用REST API暴露一个Cassandra数据库。

We can read more about Spring Data Cassandra in the first and second articles of the series.

我们可以在第一篇第二篇系列文章中阅读更多关于Spring Data Cassandra的信息。

2. Maven Dependencies

2.Maven的依赖性

As a matter of fact, Spring Data Cassandra supports Project Reactor and RxJava reactive types. To demonstrate, we’ll use the Project reactor’s reactive types Flux and Mono in this tutorial.

事实上,Spring Data Cassandra支持Project Reactor和RxJava反应式类型。为了演示,我们将在本教程中使用Project reactor的反应式类型FluxMono

To start with, let’s add the dependencies needed for our tutorial:

首先,让我们添加我们的教程所需的依赖项。

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-cassandra</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

The latest version of the spring-data-cassandra can be found here.

最新版本的spring-data-cassandra可以在这里找到。

Now, we’re going to expose SELECT operations from the database via a REST API. So, let’s add the dependency for RestController, too:

现在,我们要通过REST API从数据库中公开SELECT操作。所以,让我们也添加RestController的依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

3. Implementing Our App

3.实施我们的应用程序

Since we will be persisting data, let’s first define our entity object:

由于我们将持久化数据,让我们首先定义我们的实体对象

@Table
public class Employee {
    @PrimaryKey
    private int id;
    private String name;
    private String address;
    private String email;
    private int age;
}

Next, its time to create an EmployeeRepository that extends from ReactiveCassandraRepository. It’s important to note that this interface enables the support for reactive types:

接下来,是时候创建一个扩展自ReactiveCassandraRepository的EmployeeRepository了。需要注意的是,这个接口能够支持反应式类型

public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
    @AllowFiltering
    Flux<Employee> findByAgeGreaterThan(int age);
}

3.1. Rest Controller for CRUD Operations

3.1.用于CRUD操作的Rest控制器

For the purpose of illustration, we’ll expose some basic SELECT operations using a simple Rest Controller:

为了说明问题,我们将使用一个简单的Rest Controller暴露一些基本的SELECT操作。

@RestController
@RequestMapping("employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;

    @PostConstruct
    public void saveEmployees() {
        List<Employee> employees = new ArrayList<>();
        employees.add(new Employee(123, "John Doe", "Delaware", "jdoe@xyz.com", 31));
        employees.add(new Employee(324, "Adam Smith", "North Carolina", "asmith@xyz.com", 43));
        employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24));
        employees.add(new Employee(643, "Mike Lauren", "New York", "mlauren@xyz.com", 41));
        employeeService.initializeEmployees(employees);
    }

    @GetMapping("/list")
    public Flux<Employee> getAllEmployees() {
        Flux<Employee> employees = employeeService.getAllEmployees();
        return employees;
    }

    @GetMapping("/{id}")
    public Mono<Employee> getEmployeeById(@PathVariable int id) {
        return employeeService.getEmployeeById(id);
    }

    @GetMapping("/filterByAge/{age}")
    public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
        return employeeService.getEmployeesFilterByAge(age);
    }
}

Finally, let’s add a simple EmployeeService:

最后,让我们添加一个简单的EmployeeService

@Service
public class EmployeeService {

    @Autowired
    EmployeeRepository employeeRepository;

    public void initializeEmployees(List<Employee> employees) {
        Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
        savedEmployees.subscribe();
    }

    public Flux<Employee> getAllEmployees() {
        Flux<Employee> employees =  employeeRepository.findAll();
        return employees;
    }

    public Flux<Employee> getEmployeesFilterByAge(int age) {
        return employeeRepository.findByAgeGreaterThan(age);
    }

    public Mono<Employee> getEmployeeById(int id) {
        return employeeRepository.findById(id);
    }
}

3.2. Database Configuration

3.2.数据库配置

Then, let’s specify the keyspace and port to use for connecting with Cassandra in application.properties:

然后,让我们在application.properties中指定用于与Cassandra连接的密钥空间和端口。

spring.data.cassandra.keyspace-name=practice
spring.data.cassandra.port=9042
spring.data.cassandra.local-datacenter=datacenter1

Note: datacenter1 is the default data center name.

注意:datacenter1是默认的数据中心名称。

4. Testing the Endpoints

4.测试端点

Finally, its time to test our API endpoints.

最后,是时候测试我们的API端点了

4.1. Manual Testing

4.1.手动测试

To begin with, let’s fetch the employee records from the database:

首先,让我们从数据库中获取雇员记录。

curl localhost:8080/employee/list

As a result, we get all the employees:

结果是,我们得到了所有的员工。

[
    {
        "id": 324,
        "name": "Adam Smith",
        "address": "North Carolina",
        "email": "asmith@xyz.com",
        "age": 43
    },
    {
        "id": 123,
        "name": "John Doe",
        "address": "Delaware",
        "email": "jdoe@xyz.com",
        "age": 31
    },
    {
        "id": 355,
        "name": "Kevin Dunner",
        "address": "Virginia",
        "email": "kdunner@xyz.com",
        "age": 24
    },
    {
        "id": 643,
        "name": "Mike Lauren",
        "address": "New York",
        "email": "mlauren@xyz.com",
       "age": 41
    }
]

Moving on, let’s try to find a specific employee by his id:

继续,让我们尝试通过他的ID找到一个具体的雇员。

curl localhost:8080/employee/643

As a result, we get Mr. Mike Lauren back:

结果,我们找回了迈克-劳伦先生。

{
    "id": 643,
    "name": "Mike Lauren",
    "address": "New York",
    "email": "mlauren@xyz.com",
    "age": 41
}

Finally, let’s see if our age filter works:

最后,让我们看看我们的年龄过滤器是否有效。

curl localhost:8080/employee/filterByAge/35

And as expected, we get all the employees whose age is greater than 35:

正如预期的那样,我们得到了所有年龄大于35岁的雇员。

[
    {
        "id": 324,
        "name": "Adam Smith",
        "address": "North Carolina",
        "email": "asmith@xyz.com",
        "age": 43
    },
    {
        "id": 643,
        "name": "Mike Lauren",
        "address": "New York",
        "email": "mlauren@xyz.com",
        "age": 41
    }
]

4.2. Integration Testing

4.2.集成测试

Additionally, let’s test the same functionality by writing a test case:

此外,让我们通过写一个测试用例来测试相同的功能。

@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {

    @Autowired
    EmployeeRepository repository;

    @Before
    public void setUp() {
        Flux<Employee> deleteAndInsert = repository.deleteAll()
          .thenMany(repository.saveAll(Flux.just(
            new Employee(111, "John Doe", "Delaware", "jdoe@xyz.com", 31),
            new Employee(222, "Adam Smith", "North Carolina", "asmith@xyz.com", 43),
            new Employee(333, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24),
            new Employee(444, "Mike Lauren", "New York", "mlauren@xyz.com", 41))));

        StepVerifier
          .create(deleteAndInsert)
          .expectNextCount(4)
          .verifyComplete();
    }

    @Test
    public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
        Mono<Long> saveAndCount = repository.count()
          .doOnNext(System.out::println)
          .thenMany(repository
            .saveAll(Flux.just(
            new Employee(325, "Kim Jones", "Florida", "kjones@xyz.com", 42),
            new Employee(654, "Tom Moody", "New Hampshire", "tmoody@xyz.com", 44))))
          .last()
          .flatMap(v -> repository.count())
          .doOnNext(System.out::println);

        StepVerifier
          .create(saveAndCount)
          .expectNext(6L)
          .verifyComplete();
    }

    @Test
    public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
        StepVerifier
          .create(repository.findByAgeGreaterThan(35))
          .expectNextCount(2)
          .verifyComplete();
    }
}

5. Conclusion

5.结论

In summary, we learned how to use reactive types using Spring Data Cassandra to build a non-blocking application.

综上所述,我们学习了如何使用Spring Data Cassandra的反应式类型来构建一个非阻塞的应用程序。

As always, check out the source code for this tutorial over on GitHub.

一如既往,请查看本教程的源代码在GitHub上