Cassandra Batch in Cassandra Query Language and Java – 卡桑德拉查询语言和Java中的卡桑德拉批处理

最后修改: 2022年 1月 20日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll learn about the Cassandra batch query and its different use cases. We’ll analyze both the single partition and multiple partition tables batch queries.

在本教程中,我们将了解Cassandra批量查询及其不同的使用情况。我们将分析单分区和多分区表的批量查询。

We’ll explore batching in the Cqlsh as well as in Java applications.

我们将在Cqlsh以及Java应用程序中探索批处理。

2. Cassandra Batch Fundamentals

2.卡桑德拉批处理基础知识

A distributed database like Cassandra does not support ACID (Atomicity, Consistency, Isolation, and Durability) properties, unlike relational databases. Still, in some cases, we need multiple data modifications to be an atomic or/and isolated operation.

Cassandra这样的分布式数据库不支持ACID(原子性、一致性、隔离性和持久性)属性,与关系型数据库不同。但是,在某些情况下,我们仍然需要多个数据的修改是一个原子性或/和隔离性的操作。

The batch statement combines multiple data modification language statements (such as INSERT, UPDATE, and DELETE) to achieve atomicity and isolation when targeting a single partition or only atomicity when targeting multiple partitions.

批量语句结合了多个数据修改语言语句(如INSERT、UPDATE和DELETE),在针对单个分区时实现原子性和隔离性,在针对多个分区时只实现原子性。

Here’s the syntax for batch query:

以下是批量查询的语法。

BEGIN [ ( UNLOGGED | COUNTER ) ] BATCH
[ USING TIMESTAMP [ epoch_microseconds ] ]
dml_statement [ USING TIMESTAMP [ epoch_microseconds ] ] ;
[ dml_statement [ USING TIMESTAMP [ epoch_microseconds ] ] [ ; ... ] ]
APPLY BATCH;

Let’s go through the above syntax with an example:

让我们通过一个例子来了解上述语法。

BEGIN BATCH 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f3,'banana'); 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f5,'banana'); 

APPLY BATCH;

First, we use the BEGIN BATCH statement without any optional parameters like UNLOGGED or USING TIMESTAMP to initiate the batch query, and then include all the DML operations, i.e., the insert statements for the product table.

首先,我们使用BEGIN BATCH语句,没有任何可选的参数,如UNLOGGEDUSING TIMESTAMP来启动批量查询,然后包括所有DML操作,即product表的插入语句。

Finally, we use the APPLY BATCH statement to execute the batch.

最后,我们使用APPLY BATCH语句来执行该批程序。

We should note that we’ll not be able to undo any batch query since the batch query does not support rollback functionality.

我们应该注意,我们将无法撤销任何批处理查询,因为t批处理查询不支持回滚功能。

2.1. Single Partition

2.1.单一分区

A batch statement applies all the DML statements within a single partition, ensuring atomicity and isolation.

批处理语句在一个分区内应用所有的DML语句,确保原子性和隔离性。

A well-designed batch targeting a single partition can reduce client-server traffic and more efficiently update a table with a single row mutation. This is because the batch isolation occurs only if the batch operation is writing to a single partition.

一个精心设计的以单个分区为目标的批处理可以减少客户端-服务器的流量,并更有效地更新一个有单行突变的表。这是因为只有当批处理操作写到单一分区时,才会发生批处理隔离。

A single partition batch can also involve two different tables having the same partition key and present in the same keyspace.

一个分区批处理也可以涉及两个不同的表,它们具有相同的分区键,并存在于同一个键空间。

The single partition batch operations are unlogged by default and thus, do not suffer from performance penalties due to logging.

默认情况下,单分区批处理操作是不记录的,因此,不会因为记录而受到性能上的影响。

The below diagram depicts the single partition batch request flow from the coordination node H to the partition node B and its replication nodes C, D:

下图描述了从协调节点H到分区节点B及其复制节点CD的单一分区批量请求流。

single partition

Courtesy: Datastax

提供:Datastax

2.2. Multiple Partitions

2.2.多重分区

The batch involving multiple partitions needs to be well-designed as it involves coordination between multiple nodes. The best use case for a multi-partition batch is to write the same data into two related tables, i.e., two tables having the same columns with different partition keys.

涉及多个分区的批处理需要精心设计,因为它涉及多个节点之间的协调。多分区批处理的最佳用例是将相同的数据写入两个相关的表,即两个具有相同列但分区键不同的表。

Multiple partition batch operation uses the batchlog mechanism to ensure atomicity. The coordination node sends batch log requests to batch log nodes, and once it gets a confirmed receipt, it executes the batch statements. Then, it removes the batchlog from the nodes and sends a confirmation to the client.

多分区批处理操作使用批处理日志机制来确保原子性。协调节点向批处理日志节点发送批处理日志请求,一旦它得到确认的接收,它就执行批处理语句。然后,它从节点上删除batchlog,并向客户端发送确认。

It is recommended to avoid using multiple partitions batch queries. This is because such queries put huge pressure on the coordination node and severely affect its performance.

建议避免使用多分区批量查询。这是因为这种查询给协调节点带来了巨大的压力,并严重影响其性能。

We should only use a multiple partition batch when there is no other viable option.

只有在没有其他可行的选择时,我们才应该使用多分区批次。

The below diagram depicts the multiple partition batch request flow from the coordination node H to the partition nodes B, E and its respective replication nodes C, D, and F, G:

下图描述了从协调节点H到分区节点BE及其各自的复制节点CDFG的多分区批量请求流。

multi partition

Courtesy: Datastax

提供:Datastax

3. Batch Execution in Cqlsh

3.在Cqlsh中批量执行

First, let’s create a product table to run through some of the batch queries:

首先,让我们创建一个产品表来运行一些批量查询。

CREATE TABLE product (
  product_id UUID,
  variant_id UUID,
  product_name text,
  description text,
  price float,
  PRIMARY KEY (product_id, variant_id)
  );

 3.1. Single Partition Batch Without Timestamp

3.1.无时间戳的单分区批处理

We’ll execute the below batch query targeting a single partition of the product table and will not provide timestamp:

我们将执行下面的批处理查询,目标是product表的一个分区,不会提供时间戳。

BEGIN BATCH 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f3,'banana') IF NOT EXISTS; 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f5,'banana') IF NOT EXISTS; 

UPDATE product SET price = 7.12, description = 'banana v1' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f3; 

UPDATE product SET price = 11.90, description = 'banana v2' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f5; 

APPLY BATCH;

The above query uses compare-and-set (CAS) logic, i.e., the IF NOT EXISTS clause, and all such conditional statements must return true to execute the batch. If any such statements return false, then the entire batch is unprocessed.

上述查询使用比较和设置(CAS)逻辑,IF NOT EXISTS子句,所有此类条件语句必须返回true才能执行该批。如果任何这样的语句返回false,那么整个批处理就无法进行。

After execution of the above query, we’ll get the below successful acknowledgment:

在执行上述查询后,我们会得到下面的成功确认。

useBatchExamplesAck

Let’s now verify if the writetime of the data is the same after batch execution:

现在让我们来验证一下,批量执行后数据的写入时间是否相同。

cqlsh:testkeyspace> select product_id, variant_id, product_name, description, price, writetime(product_name) from product;

@ Row 1
-------------------------+--------------------------------------
product_id | 3a043b68-20ee-4ece-8f4b-a07e704bc9f5
variant_id | b84b9366-9998-4b2d-9a96-7e9a59a94ae5
product_name | Banana
description | banana v1
price | 12
writetime(product_name) | 1639275574653000

@ Row 2
-------------------------+--------------------------------------
product_id | 3a043b68-20ee-4ece-8f4b-a07e704bc9f5
variant_id | facc3997-299d-419b-b133-a54b5d4dfc3b
product_name | Banana
description | banana v2
price | 12
writetime(product_name) | 1639275574653000

 3.2. Single Partition Batch With Timestamp

3.2.单个分区带时间戳的批处理

We’ll now see examples of batch queries with USING TIMESTAMP option to supply timestamp in epoch time format, i.e., microseconds.

现在我们将看到使用USING TIMESTAMP选项提供epoch时间格式(即微秒)的批量查询的例子。

Below is the batch query that applies the same timestamp to all DML statements:

下面是对所有DML语句应用相同时间戳的批处理查询。

BEGIN BATCH USING TIMESTAMP 1638810270 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f3,'banana'); 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f5,'banana'); 

UPDATE product SET price = 7.12, description = 'banana v1' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f3; 

UPDATE product SET price = 11.90, description = 'banana v2' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f5; 

APPLY BATCH;

Let’s now specify custom timestamp on any of the individual DML statements:

现在让我们在任何一个单独的DML语句上指定自定义时间戳。

BEGIN BATCH 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f3,'banana');

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f5,'banana') USING TIMESTAMP 1638810270; 

UPDATE product SET price = 7.12, description = 'banana v1' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f3 USING TIMESTAMP 1638810270; 

UPDATE product SET price = 11.90, description = 'banana v2' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f5; 

APPLY BATCH;

We’ll now see an invalid batch query that has both the custom timestamp and compare-and-set (CAS) logic, i.e., IF NOT EXISTS clause:

我们现在将看到一个无效的批处理查询,它同时具有自定义时间戳和比较和设置(CAS)逻辑,即IF NOT EXISTS语句

BEGIN BATCH USING TIMESTAMP 1638810270 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f3,'banana') IF NOT EXISTS; 

INSERT INTO product (product_id, variant_id, product_name) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,0e9ef8f7-d32b-4926-9d37-27225933a5f5,'banana') IF NOT EXISTS; 

UPDATE product SET price = 7.12, description = 'banana v1' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f3; 

UPDATE product SET price = 11.90, description = 'banana v2' 
WHERE product_id = 2c11bbcd-4587-4d15-bb57-4b23a546bd7f AND variant_id=0e9ef8f7-d32b-4926-9d37-27225933a5f5; 

APPLY BATCH;

We’ll get the below error on executing the above query:

在执行上述查询时,我们会得到以下错误。

InvalidRequest: Error from server: code=2200 [Invalid query]
message="Cannot provide custom timestamp for conditional BATCH"

The above error is because the client-side timestamps are prohibited for any conditional insert or updates.

上述错误是因为客户端的时间戳被禁止用于任何条件性插入或更新。

3.3. Multiple Partition Batch Query

3.3.多分区批量查询

The best use case for batch on multiple partitions is to insert the exact data into two related tables.

在多个分区上进行批处理的最佳用例是向两个相关的表插入准确的数据

Let’s insert the same data into both product_by_name and product_by_id tables having different partition keys:

让我们在具有不同分区键的product_by_nameproduct_by_id表中插入相同数据。

BEGIN BATCH 

INSERT INTO product_by_name (product_name, product_id, description, price) 
VALUES ('banana',2c11bbcd-4587-4d15-bb57-4b23a546bd7f,'banana',12.00); 

INSERT INTO product_by_id (product_id, product_name, description, price) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,'banana','banana',12.00); 

APPLY BATCH;

Let’s now enable the UNLOGGED option to the above query:

现在让我们对上述查询启用UNLOGGED选项。

BEGIN UNLOGGED BATCH 

INSERT INTO product_by_name (product_name, product_id, description, price) 
VALUES ('banana',2c11bbcd-4587-4d15-bb57-4b23a546bd7f,'banana',12.00); 

INSERT INTO product_by_id (product_id, product_name, description, price) 
VALUES (2c11bbcd-4587-4d15-bb57-4b23a546bd7f,'banana','banana',12.00); 

APPLY BATCH;

The above UNLOGGED batch query will not ensure atomicity or isolation and does not use the batch log to write the data.

上述UNLOGGED批处理查询不会确保原子性或隔离性,也不会使用批处理日志来写入数据。

3.4. Batching on Counter Updates

3.4.对计数器更新进行分批处理

We’ll need to use the COUNTER option for any counter columns as counter updates operations are not idempotent.

我们需要对任何计数器列使用COUNTER选项,因为计数器更新操作不是等效的

Let’s create a table product_by_sales which stores sales_vol as Counter datatype:

让我们创建一个表product_by_sales ,它将sales_vol 存储为Counter 数据类型。

CREATE TABLE product_by_sales (
  product_id UUID,
  sales_vol counter,
  PRIMARY KEY (product_id)
);

The below counter batch query increases the sales_vol twice by 100:

下面的counter批处理查询将sales_vol增加两次,每次100。

BEGIN COUNTER BATCH

UPDATE product_by_sales
SET sales_vol = sales_vol + 100
WHERE product_id = 6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47;

UPDATE product_by_sales
SET sales_vol = sales_vol + 100
WHERE product_id = 6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47;

APPLY BATCH

4. Batch Operation in Java

4.Java中的批量操作

Let’s look at a few examples of building and executing the batch query in a Java application.

让我们看一下在Java应用程序中建立和执行批处理查询的几个例子。

4.1. Maven Dependency

4.1.Maven的依赖性

Firstly, we would need to include the DataStax-related Maven dependencies:

首先,我们需要包括DataStax相关的Maven依赖项。

<dependency>
    <groupId>com.datastax.oss</groupId>
    <artifactId>java-driver-core</artifactId>
    <version>4.1.0</version>
</dependency>
<dependency>
   <groupId>com.datastax.oss</groupId>
   <artifactId>java-driver-query-builder</artifactId>
   <version>4.1.0</version>
</dependency>

4.2. Single Partition Batch

4.2.单一分区批处理

Let’s look at an example to see how to execute batch into single-partition data.

让我们看一个例子,看看如何对单分区数据进行批处理。

We’ll build the batch query using the BatchStatement instance. The BatchStatement is instantiated using the DefaultBatchType enum and the BoundStatement instances.

我们将使用BatchStatementinstance来构建批处理查询。BatchStatement是使用DefaultBatchTypeenumBoundStatementinstances来实例化的。

First, we’ll create a method to get a BoundStatement instance by binding Product attributes to a PreparedStatement insert query:

首先,我们将创建一个方法,通过将Product属性绑定到PreparedStatementinsert查询来获得BoundStatement实例。

BoundStatement getProductVariantInsertStatement(Product product, UUID productId) {
    String insertQuery = new StringBuilder("") 
      .append("INSERT INTO ")
      .append(PRODUCT_TABLE_NAME)
      .append("(product_id, variant_id, product_name, description, price) ")
      .append("VALUES (")
      .append(":product_id")
      .append(", ")
      .append(":variant_id")
      .append(", ")
      .append(":product_name")
      .append(", ")
      .append(":description")
      .append(", ")
      .append(":price")
      .append(");")
      .toString();

    PreparedStatement preparedStatement = session.prepare(insertQuery);
        
    return preparedStatement.bind(
      productId, 
      UUID.randomUUID(),
      product.getProductName(), 
      product.getDescription(),
      product.getPrice());
}

Now, we’ll execute the BatchStatement for the above created BoundStatement using the same Product UUID:

现在,我们将为上述创建的BoundStatement执行BatchStatement,使用相同的ProductUUID

UUID productId = UUID.randomUUID();
BoundStatement productBoundStatement1 = this.getProductVariantInsertStatement(productVariant1, productId);
BoundStatement productBoundStatement2 = this.getProductVariantInsertStatement(productVariant2, productId);
        
BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.UNLOGGED,
            productBoundStatement1, productBoundStatement2);

session.execute(batch);

The above code inserts two product variants on the same partition key using the UNLOGGED batch.

上面的代码使用UNLOGGED批处理在同一个分区键上插入了两个产品变体。

4.3. Multiple Partition Batch

4.3.多分区批处理

Now, let’s see how to insert the same data into two related tables – product_by_id and product_by_name.

现在,让我们看看如何将相同的数据插入两个相关的表–product_by_idproduct_by_name.。

First, we’ll create a reusable method to get a BoundStatement instance for the PreparedStatement insert query:

首先,我们将创建一个可重复使用的方法,为PreparedStatement插入查询获得BoundStatement实例。

BoundStatement getProductInsertStatement(Product product, UUID productId, String productTableName) {
    String cqlQuery1 = new StringBuilder("")
      .append("INSERT INTO ")
      .append(productTableName)
      .append("(product_id, product_name, description, price) ")
      .append("VALUES (")
      .append(":product_id")
      .append(", ")
      .append(":product_name")
      .append(", ")
      .append(":description")
      .append(", ")
      .append(":price")
      .append(");")
      .toString();

    PreparedStatement preparedStatement = session.prepare(cqlQuery1);
        
    return preparedStatement.bind(
      productId,
      product.getProductName(),
      product.getDescription(),
      product.getPrice());
}

Now, we’ll execute the BatchStatement using the same Product UUID:

现在,我们将使用相同的产品UID:执行BatchStatement

UUID productId = UUID.randomUUID();
        
BoundStatement productBoundStatement1 = this.getProductInsertStatement(product, productId, PRODUCT_BY_ID_TABLE_NAME);
BoundStatement productBoundStatement2 = this.getProductInsertStatement(product, productId, PRODUCT_BY_NAME_TABLE_NAME);
        
BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.LOGGED,
            productBoundStatement1,productBoundStatement2);

session.execute(batch);

This inserts the same product data into the product_by_id and product_by_name tables using LOGGED batch.

这是在product_by_idproduct_by_name表中插入相同的产品数据,使用LOGGEDbatch。

5. Conclusion

5.总结

In this article, we’ve learned about the Cassandra batch query and how to apply it in Cqlsh and Java using  BatchStatement.

在这篇文章中,我们已经了解了Cassandra批量查询以及如何在Cqlsh和Java中使用BatchStatement来应用它。

As always, the complete source code of the examples is available over on GitHub.

一如既往,这些示例的完整源代码可在GitHub上获得