1. Introduction
1.绪论
Kinesis is a tool for collecting, processing, and analyzing data streams in real-time, developed at Amazon. One of its main advantages is that it helps with the development of event-driven applications.
Kinesis是一个实时收集、处理和分析数据流的工具,在亚马逊开发。它的主要优点之一是有助于开发事件驱动的应用程序。
In this tutorial, we’ll explore a few libraries that enable our Spring application to produce and consume records from a Kinesis Stream. The code examples will show the basic functionality but don’t represent the production-ready code.
在本教程中,我们将探讨一些库,这些库可以让我们的Spring应用从Kinesis流中产生和消费记录。这些代码实例将展示基本功能,但并不代表可用于生产的代码。
2. Prerequisite
2.先决条件
Before we go any further, we need to do two things.
在我们进一步讨论之前,我们需要做两件事。
The first is to create a Spring project, as the goal here is to interact with Kinesis from a Spring project.
首先是创建一个Spring项目,因为这里的目标是从Spring项目中与Kinesis互动。
The second one is to create a Kinesis Data Stream. We can do this from a web browser in our AWS account. One alternative for the AWS CLI fans among us is to use the command line. Because we’ll interact with it from code, we also must have at hand the AWS IAM Credentials, the access key and secret key, and the region.
第二个是创建一个Kinesis数据流。我们可以通过AWS账户中的Web浏览器来完成这一工作。对于我们中的AWS CLI爱好者来说,有一种选择是使用命令行。由于我们将从代码中进行交互,因此我们还必须掌握AWS IAM凭证、访问密钥和秘密密钥以及区域。
All our producers will create dummy IP address records, while the consumers will read those values and list them in the application console.
我们所有的生产者将创建假的IP地址记录,而消费者将读取这些值并在应用控制台中列出。
3. AWS SDK for Java
3.适用于Java的AWS SDK
The very first library we’ll use is the AWS SDK for Java. Its advantage is that it allows us to manage many parts of working with Kinesis Data Streams. We can read data, produce data, create data streams, and reshard data streams. The drawback is that in order to have production-ready code, we’ll have to code aspects like resharding, error handling, or a daemon to keep the consumer alive.
我们要使用的第一个库是AWS SDK for Java。它的优势在于,它允许我们管理与Kinesis数据流合作的许多部分。我们可以读取数据,生产数据,创建数据流,以及重放数据流。缺点是,为了拥有可用于生产的代码,我们必须对重分片、错误处理或保持消费者活力的守护程序等方面进行编码。
3.1. Maven Dependency
3.1.Maven的依赖性
The amazon-kinesis-client Maven dependency will bring everything we need to have working examples. We’ll now add it to our pom.xml file:
amazon-kinesis-clientMaven依赖项将为我们带来工作实例所需的一切。现在我们要把它添加到我们的pom.xml文件中。
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version>
</dependency>
3.2. Spring Setup
3.2.Spring设置
Let’s reuse the AmazonKinesis object needed to interact with our Kinesis Stream. We’ll create it as a @Bean inside our @SpringBootApplication class:
让我们重新使用与Kinesis流交互所需的AmazonKinesis对象。我们将在@SpringBootApplication类中把它创建为@Bean。
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
Next, let’s define the aws.access.key and aws.secret.key, needed for the local machine, in application.properties:
接下来,让我们在application.properties中定义本地机器需要的aws.access.key和aws.secret.key。
aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here
And we’ll read them using the @Value annotation:
而我们将使用@Value注解来读取它们。
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
For the sake of simplicity, we’re going to rely on @Scheduled methods to create and consume records.
为了简单起见,我们将依靠@Scheduled方法来创建和消费记录。
3.3. Consumer
3.3.消费者
The AWS SDK Kinesis Consumer uses a pull model, meaning our code will draw records from the shards of the Kinesis data stream:
AWS SDK Kinesis Consumer使用拉动模型,这意味着我们的代码将从Kinesis数据流的碎片中提取记录。
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);
GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
recordsResult.getRecords().stream()
.map(record -> new String(record.getData().array()))
.forEach(System.out::println);
recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
recordsResult = kinesis.getRecords(recordsRequest);
}
The GetRecordsRequest object builds the request for stream data. In our example, we’ve defined a limit of 25 records per request, and we keep reading until there’s nothing more to read.
GetRecordsRequest对象建立了流数据的请求。在我们的例子中,我们定义了每个请求有25条记录的限制,我们一直读到没有什么可读为止。
We can also notice that, for our iteration, we’ve used a GetShardIteratorResult object. We created this object inside a @PostConstruct method so that we’ll begin tracking records straight away:
我们还可以注意到,对于我们的迭代,我们使用了一个GetShardIteratorResult对象。我们在@PostConstruct方法中创建了这个对象,这样我们就可以直接开始跟踪记录。
private GetShardIteratorResult shardIterator;
@PostConstruct
private void buildShardIterator() {
GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
readShardsRequest.setStreamName(IPS_STREAM);
readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
readShardsRequest.setShardId(IPS_SHARD_ID);
this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}
3.4. Producer
3.4.生产者
Let’s now see how to handle the creation of records for our Kinesis data stream.
现在我们来看看如何处理Kinesis数据流的记录创建。
We insert data using a PutRecordsRequest object. For this new object, we add a list that comprises multiple PutRecordsRequestEntry objects:
我们使用一个PutRecordsRequest对象插入数据。对于这个新对象,我们添加一个由多个PutRecordsRequestEntry对象组成的列表。
List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
entry.setPartitionKey(IPS_PARTITION_KEY);
return entry;
}).collect(Collectors.toList());
PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);
kinesis.putRecords(createRecordsRequest);
We’ve created a basic consumer and a producer of simulated IP records. All that’s left to do now is to run our Spring project and see IPs listed in our application console.
我们已经创建了一个基本的消费者和一个模拟IP记录的生产者。现在要做的就是运行我们的Spring项目,并在我们的应用控制台中看到IP的列表。
4. KCL and KPL
4.KCL和KPL
Kinesis Client Library (KCL) is a library that simplifies the consuming of records. It’s also a layer of abstraction over the AWS SDK Java APIs for Kinesis Data Streams. Behind the scenes, the library handles load balancing across many instances, responding to instance failures, checkpointing processed records, and reacting to resharding.
Kinesis客户端库(KCL)是一个简化记录消费的库。它也是Kinesis数据流的AWS SDK Java API上的一个抽象层。在幕后,该库可以处理许多实例之间的负载平衡,响应实例故障,检查已处理的记录,并对重新分片作出反应。
Kinesis Producer Library (KPL) is a library useful for writing to a Kinesis data stream. It also provides a layer of abstraction that sits over the AWS SDK Java APIs for Kinesis Data Streams. For better performance, the library automatically handles batching, multi-threading, and retry logic.
Kinesis Producer Library(KPL)是一个有助于向Kinesis数据流写入的库。它还提供了一个抽象层,位于Kinesis数据流的AWS SDK Java APIs之上。为了提高性能,该库自动处理批处理、多线程和重试逻辑。
KCL and KPL both have the main advantage that they’re easy to use so that we can focus on producing and consuming records.
KCL和KPL都有一个主要的优势,那就是它们很容易使用,这样我们就可以专注于生产和消费记录。
4.1. Maven Dependencies
4.1.Maven的依赖性
The two libraries can be brought separately in our project if needed. To include KPL and KCL in our Maven project, we need to update our pom.xml file:
如果需要,这两个库可以在我们的项目中单独拿来使用。为了将KPL和KCL纳入我们的Maven项目中,我们需要更新我们的pom.xml文件。
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version>
</dependency>
4.2. Spring Setup
4.2.Spring设置
The only Spring preparation we need is to make sure we have the IAM credentials at hand. The values for aws.access.key and aws.secret.key are defined in our application.properties file so we can read them with @Value when needed.
我们唯一需要的Spring准备是确保我们手头有IAM凭证。aws.access.key和aws.secret.key的值在我们的application.properties文件中定义,因此我们可以在需要时用@Value读取它们。
4.3. Consumer
4.3.消费者
First, we’ll create a class that implements the IRecordProcessor interface and defines our logic for how to handle Kinesis data stream records, which is to print them in the console:
首先,我们将创建一个实现IRecordProcessor接口的类,并定义我们如何处理Kinesis数据流记录的逻辑,即在控制台中打印它们。
public class IpProcessor implements IRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) { }
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.getRecords()
.forEach(record -> System.out.println(new String(record.getData().array())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) { }
}
The next step is to define a factory class that implements the IRecordProcessorFactory interface and returns a previously created IpProcessor object:
下一步是定义一个工厂类,实现IRecordProcessorFactory接口并返回先前创建的IpProcessor对象。
public class IpProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new IpProcessor();
}
}
And now for the final step, we’ll use a Worker object to define our consumer pipeline. We need a KinesisClientLibConfiguration object that will define, if needed, the IAM Credentials and AWS Region.
现在是最后一步,我们将使用一个Worker对象来定义我们的消费者管道。我们需要一个KinesisClientLibConfiguration对象,如果需要,它将定义IAM Credentials和AWS Region。
We’ll pass the KinesisClientLibConfiguration, and our IpProcessorFactory object, to our Worker and then start it in a separate thread. We keep this logic of consuming records always alive with the use of the Worker class, so we’re continuously reading new records now:
我们将把KinesisClientLibConfiguration和我们的IpProcessorFactory对象传递给我们的Worker,然后在一个单独的线程中启动它。我们通过使用Worker类来保持这种消耗记录的逻辑,所以我们现在正在不断地读取新的记录。
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
APP_NAME,
IPS_STREAM,
new AWSStaticCredentialsProvider(awsCredentials),
IPS_WORKER)
.withRegionName(Regions.EU_CENTRAL_1.getName());
final Worker worker = new Worker.Builder()
.recordProcessorFactory(new IpProcessorFactory())
.config(consumerConfig)
.build();
CompletableFuture.runAsync(worker.run());
4.4. Producer
4.4.生产者
Let’s now define the KinesisProducerConfiguration object, adding the IAM Credentials and the AWS Region:
现在我们来定义KinesisProducerConfiguration对象,添加IAM Credentials和AWS Region。
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setVerifyCertificate(false)
.setRegion(Regions.EU_CENTRAL_1.getName());
this.kinesisProducer = new KinesisProducer(producerConfig);
We’ll include the kinesisProducer object previously created in a @Scheduled job and produce records for our Kinesis data stream continuously:
我们将包括之前在@Scheduled作业中创建的kinesisProducer对象,并为我们的Kinesis数据流持续生产记录。
IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
.forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));
5. Spring Cloud Stream Binder Kinesis
5.Spring Cloud Stream Binder Kinesis
We’ve already seen two libraries, both created outside of the Spring ecosystem. We’ll now see how the Spring Cloud Stream Binder Kinesis can simplify our life further while building on top of Spring Cloud Stream.
我们已经看到了两个库,都是在 Spring 生态系统之外创建的。现在我们将看看Spring Cloud Stream Binder Kinesis如何进一步简化我们的生活,同时在Spring Cloud Stream之上进行构建。
5.1. Maven Dependency
5.1.Maven的依赖性
The Maven dependency we need to define in our application for the Spring Cloud Stream Binder Kinesis is:
我们需要在应用程序中为Spring Cloud Stream Binder Kinesis定义的Maven依赖项是。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
5.2. Spring Setup
5.2.Spring设置
When running on EC2, the required AWS properties are automatically discovered, so there is no need to define them. Since we’re running our examples on a local machine, we need to define our IAM access key, secret key, and region for our AWS account. We’ve also disabled the automatic CloudFormation stack name detection for the application:
当在EC2上运行时,所需的AWS属性会自动发现,所以不需要定义它们。由于我们在本地机器上运行我们的例子,我们需要定义我们的IAM访问密钥、秘密密钥和AWS账户的区域。我们还禁用了应用程序的自动CloudFormation堆栈名称检测。
cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false
Spring Cloud Stream is bundled with three interfaces that we can use in our stream binding:
Spring Cloud Stream捆绑了三个接口,我们可以在我们的流绑定中使用:。
- The Sink is for data ingestion
- The Source is used for publishing records
- The Processor is a combination of both
We can also define our own interfaces if we need to.
如果需要,我们也可以定义自己的接口。
5.3. Consumer
5.3.消费者
Defining a consumer is a two-part job. First, we’ll define, in the application.properties, the data stream from which we’ll consume:
定义一个消费者是一个两部分的工作。首先,我们将在application.properties中定义我们要消费的数据流。
spring.cloud.stream.bindings.input.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain
And next, let’s define a Spring @Component class. The annotation @EnableBinding(Sink.class) will allow us to read from the Kinesis stream using the method annotated with @StreamListener(Sink.INPUT):
而接下来,让我们定义一个Spring @Component类。注解@EnableBinding(Sink.class)将允许我们使用@StreamListener(Sink.INPUT)注解的方法从Kinesis流中读取。
@EnableBinding(Sink.class)
public class IpConsumer {
@StreamListener(Sink.INPUT)
public void consume(String ip) {
System.out.println(ip);
}
}
5.4. Producer
5.4.生产者
The producer can also be split in two. First, we have to define our stream properties inside application.properties:
生产者也可以一分为二。首先,我们必须在application.properties中定义我们的流属性。
spring.cloud.stream.bindings.output.destination=live-ips
spring.cloud.stream.bindings.output.content-type=text/plain
And then we add @EnableBinding(Source.class) on a Spring @Component and create new test messages every few seconds:
然后我们在Spring的@EnableBinding(Source.class)上添加@Component,并每隔几秒钟创建新的测试信息。
@Component
@EnableBinding(Source.class)
public class IpProducer {
@Autowired
private Source source;
@Scheduled(fixedDelay = 3000L)
private void produce() {
IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
.forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
}
}
That’s all we need for Spring Cloud Stream Binder Kinesis to work. We can simply start the application now.
这就是我们为Spring Cloud Stream Binder Kinesis工作所需要的一切。我们现在可以简单地启动应用程序了。
6. Conclusion
6.结语
In this article, we’ve seen how to integrate our Spring project with two AWS libraries for interacting with a Kinesis Data Stream. We’ve also seen how to use the Spring Cloud Stream Binder Kinesis library to make the implementation even easier.
在这篇文章中,我们看到了如何将我们的Spring项目与两个AWS库集成,以与Kinesis数据流进行交互。我们还看到了如何使用Spring Cloud Stream Binder Kinesis库来使实施更加简单。
The source code for this article can be found over on Github.
这篇文章的源代码可以在Github上找到over。