Introduction to Apache Pulsar – 阿帕奇-脉冲星简介

最后修改: 2018年 10月 22日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

Apache Pulsar is a distributed open source Publication/Subscription based messaging system developed at Yahoo.

Apache Pulsar是Yahoo开发的一个分布式开源的基于出版/订阅的消息传递系统

It was created to power Yahoo’s critical applications like Yahoo Mail, Yahoo Finance, Yahoo Sports etc. Then, in 2016, it was open sourced under the Apache Software Foundation.

它是为了支持雅虎的关键应用程序,如雅虎邮箱、雅虎财经、雅虎体育等。然后,在2016年,它在Apache软件基金会下被开源了。

2. Architecture

2.建筑

Pulsar is a multi-tenant, high-performance solution for server-to-server messaging. It’s composed of a set of brokers and bookies along with an inbuilt Apache ZooKeeper for configuration and management. The bookies are from Apache BookKeeper which provide storage for the messages until they are consumed.

Pulsar是一个多用户、高性能的服务器到服务器的消息传输解决方案。它由一组经纪人和簿记员以及一个内置的Apache ZooKeeper组成,用于配置和管理。簿记员来自Apache BookKeeper,为消息提供存储,直到它们被消耗。

In a cluster we’ll have:

在一个集群中,我们会有。

  • Multiple cluster brokers to handle the incoming message from producers and dispatch the message to consumers
  • Apache BookKeeper to support message persistence
  • Apache ZooKeeper to store the cluster configuration

To better understand this, let’s have a look at the architecture diagram from the documentation:

为了更好地理解这一点,让我们看一下文档中的架构图。

pulsar system architecture

3. Key Features

3.主要特点

Let’s start with a quick look at some of the key features:

让我们先来看看一些关键的功能。

  • Inbuilt support for multiple clusters
  • Support for Geo-replication of messages across multiple clusters
  • Multiple subscription modes
  • Scalable to millions of topics
  • Uses Apache BookKeeper to guarantee message delivery.
  • Low latency

Now, let’s discuss some of the key features in detail.

现在,让我们详细讨论一些关键功能。

3.1. Messaging Model

3.1.信息传递模式

The framework provides a flexible messaging model. In general messaging architectures have two messaging models i.e. queuing and publisher/subscriber. Publisher/Subscriber is a broadcast messaging system in which the message is sent to all consumers. On the other hand, queuing is a point to point communication.

该框架提供了一个灵活的消息传递模型。一般来说,消息架构有两种消息传递模式,即排队和发布者/订阅者。发布者/订阅者是一个广播式的消息传递系统,其中消息被发送到所有的消费者。另一方面,排队是一种点对点的通信。

Pulsar combines both concepts in one generalized API. The publisher publishes the messages to different topics. Then these messages are broadcasted to all subscriptions.

Pulsar在一个通用的API中结合了这两个概念。发布者将消息发布到不同的主题。然后,这些消息被广播给所有的订阅者。

The consumers subscribe to get messages. The library allows consumers to choose the different ways to consume messages in the same subscription which includes exclusive, shared and failover. We’ll discuss these subscription types in detail in the later sections.

消费者通过订阅来获取消息。该库允许消费者在同一个订阅中选择不同的方式来消费消息,包括独占、共享和故障转移。我们将在后面的章节中详细讨论这些订阅类型。

3.2. Deployment Modes

3.2.部署模式

Pulsar has inbuilt support for deployment in different environments. This means we can use it on standard on-premise machines, or deploy it in a Kubernetes cluster, Google or AWS Cloud.

Pulsar内置了对不同环境下部署的支持。这意味着我们可以在标准的内部机器上使用它,或者在Kubernetes集群、谷歌或AWS云中部署它。

It can be executed as a single node for development and testing purposes. In this case, all the components (broker, BookKeeper, and ZooKeeper) run in a single process.

它可以作为一个单一的节点执行,用于开发和测试目的。在这种情况下,所有的组件(broker、BookKeeper和ZooKeeper)都在一个单一进程中运行。

3.3. Geo-Replication

3.3.地理复制

The library provides out of the box support for geo-replication of data. We can enable replication of messages between multiple clusters by configuring different geographical regions.

该库为数据的地理复制提供了开箱即用的支持。我们可以通过配置不同的地理区域来实现多个集群之间的消息复制。

Message data is replicated in near real time. In case of network failure across clusters, the data is always safe and stored in the BookKeeper. The replication system continues to retry until the replication is successful.

消息数据是以近乎实时的方式复制的。在跨集群的网络故障情况下,数据始终是安全的,并存储在BookKeeper中。复制系统继续重试,直到复制成功。

The geo-replication feature also allows the organization to deploy Pulsar across different cloud providers and replicate the data. This helps them to avoid the use of proprietary cloud provider APIs.

地理复制功能还允许企业在不同的云提供商之间部署Pulsar并复制数据。这有助于他们避免使用专有云提供商的API。

3.4. Permanence

永久性

After Pulsar reads and acknowledges the data, it guarantees no data loss. Data durability is related to the number of disks configured to store the data.

在Pulsar读取并确认数据后,它保证没有数据丢失。数据的耐久性与配置用于存储数据的磁盘数量有关。

Pulsar ensures durability by using bookies (Apache BookKeeper instance) running in storage nodes. Whenever a bookie receives a message, it saves a copy in memory and also writes the data to a WAL (Write Ahead Log). This log works in the same way as a database WAL. Bookies operate on database transaction principle and ensure that data is not lost even in case of machine failure.

Pulsar通过使用在存储节点中运行的bookies(Apache BookKeeper实例)来确保耐久性。每当簿记员收到一条消息,它就会在内存中保存一份副本,同时将数据写入WAL(提前写入日志)。这个日志的工作方式与数据库的WAL相同。赌徒根据数据库交易原则运作,确保即使在机器故障的情况下,数据也不会丢失。

Apart from the above, Pulsar can also withstand multiple node failures. The library replicates data to multiple bookies, then sends an acknowledgment message to the producer. This mechanism guarantees that zero data loss even in case of multiple hardware failures.

除上述情况外,Pulsar还能承受多个节点的故障。该库将数据复制到多个预订者,然后向生产者发送确认信息。这种机制保证了即使在多个硬件故障的情况下,数据也不会丢失。

4. Single Node Setup

4.单一节点设置

Now let’s see how to set up a single node cluster of Apache Pulsar.

现在让我们来看看如何建立一个Apache Pulsar的单节点集群。

Apache also provides a simple client API with bindings for Java, Python, and C++. We’ll later create a simple Java producer and subscription example.

Apache还提供了一个简单的客户端API,并为Java、Python和C++提供绑定。我们稍后将创建一个简单的Java生产者和订阅实例。

4.1. Installation

4.1.安装

Apache Pulsar is available as a binary distribution. Let’s start by downloading it:

Apache Pulsar是以二进制发行版的形式提供的。让我们开始下载它吧。

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-2.1.1-incubating-bin.tar.gz

When the download is complete, we can unarchive the zip file. The unarchived distribution will contain bin, conf, example, licenses and lib folder.

当下载完成后,我们可以解压缩文件。解压缩后的分发文件将包含bin, conf, example, licenseslib文件夹。

After that, we need to download the inbuilt connectors. These now ship as a separate package:

之后,我们需要下载内置的连接器。这些现在作为一个单独的软件包发货。

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz

Let’s unarchive the connectors and copy the Connectors folder in the Pulsar folder.

让我们解除连接器的存档,并在Pulsar文件夹中复制Connectors文件夹。

4.2. Starting an Instance

4.2.启动一个实例

To start a standalone instance we can execute:

要启动一个独立的实例,我们可以执行。

bin/pulsar standalone

5. Java Client

5.JAVA客户端

Now we’ll create a Java project to produce and consume messages. We’ll also create examples for different subscription types.

现在我们将创建一个Java项目来生产和消费消息。我们还将为不同的订阅类型创建实例。

5.1. Setting up the Project

5.1.设置项目

We’ll start by adding the pulsar-client dependency to our project:

我们将首先把pulsar-client依赖性添加到我们的项目中。

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.1.1-incubating</version>
</dependency>

5.2. Producer

5.2.生产者

Let’s continue by creating a Producer example. Here, we’ll create a topic and a producer.

让我们继续创建一个Producer例子。在这里,我们将创建一个主题和一个生产者。

First, we need to create a PulsarClient which will connect to a Pulsar service on a specific host and port, using its own protocol. Many producers and consumers can share a single client object.

首先,我们需要创建一个PulsarClient,它将在一个特定的主机和端口上连接到Pulsar服务,使用它自己的协议。许多生产者和消费者可以共享一个客户端对象。

Now, we’ll create a Producer with the specific topic name:

现在,我们将用特定的主题名称创建一个Producer

private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
PulsarClient client = PulsarClient.builder()
  .serviceUrl(SERVICE_URL)
  .build();

Producer<byte[]> producer = client.newProducer()
  .topic(TOPIC_NAME)
  .compressionType(CompressionType.LZ4)
  .create();

The producer will send 5 messages:

生产者将发送5条信息。

IntStream.range(1, 5).forEach(i -> {
    String content = String.format("hi-pulsar-%d", i);

    Message<byte[]> msg = MessageBuilder.create()
      .setContent(content.getBytes())
      .build();
    MessageId msgId = producer.send(msg);
});

5.3. Consumer

5.3.消费者

Next, we’ll create the consumer to get the messages created by the producer. The consumer also requires the same PulsarClient  to connect with our server:

接下来,我们将创建消费者来获取生产者所创建的消息。消费者也需要相同的PulsarClient来与我们的服务器连接。

Consumer<byte[]> consumer = client.newConsumer()
  .topic(TOPIC_NAME)
  .subscriptionType(SubscriptionType.Shared)
  .subscriptionName(SUBSCRIPTION_NAME)
  .subscribe();

Here we’ve created the client with a Shared subscription typeThis allows multiple consumers to attach to the same subscription and get messages.

在这里,我们创建了具有共享订阅类型的客户端。这允许多个消费者附加到同一个订阅上并获得消息。

5.4. Subscription Types for Consumer

5.4.消费者的订阅类型

In the above example of the consumer, we have created a subscription with shared type. We can also create exclusive and failover subscriptions.

在上述消费者的例子中,我们创建了一个shared类型的订阅。我们还可以创建专属失效的订阅。

The exclusive subscription allows only one consumer to be subscribed.

独家 订阅只允许一个消费者被订阅。

On the other hand, a failover subscription allows the user to define the fallback consumer, in case one consumer fails, as shown in this Apache diagram:

另一方面,failover subscription允许用户定义后备消费者,以防一个消费者失败,如这个Apache图所示。

pulsar subscription modes

6. Conclusion

6.结语

In this article, we’ve highlighted the features of the Pulsar messaging system such as the messaging model, geo-replication and strong durability guarantees.

在这篇文章中,我们强调了Pulsar信息传递系统的特点,如信息传递模式、地理复制和强大的耐久性保证。

We also learned how to set up a single node and how to use the Java client.

我们还学习了如何建立一个单一的节点以及如何使用Java客户端。

As always, the full implementation of this tutorial can be found over on Github.

一如既往,本教程的完整实现可以在Github上找到over