Using InfluxDB with Java – 在Java中使用InfluxDB

最后修改: 2017年 12月 28日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

InfluxDB is a high-performance store for time-series data. It supports insertion and real-time querying of data via a SQL-like query language.

InfluxDB是一个用于时间序列数据的高性能存储。它支持通过类似于SQL的查询语言插入和实时查询数据。

In this introductory article, we’ll demonstrate how to connect to an InfluxDb server, create a database, write time-series information, and then query the database.

在这篇介绍性文章中,我们将演示如何连接到InfluxDb服务器,创建一个数据库,写入时间序列信息,然后查询数据库。

2. Setup

2.设置

To connect to the database, we’ll need to add an entry to our pom.xml file:

为了连接到数据库,我们需要在我们的pom.xml文件中添加一个条目。

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.8</version>
</dependency>

The latest version of this dependency can be found on Maven Central.

该依赖的最新版本可以在Maven Central上找到。

We’ll also need an InfluxDB instance. Instructions for downloading and installing a database can be found on the InfluxData website.

我们还需要一个InfluxDB实例。关于下载和安装数据库的说明可以在InfluxData网站上找到。

3. Connecting to a Server

3.连接到服务器

3.1. Creating a Connection

3.1.创建一个连接

Creating a database connection requires passing a URL String and user credentials to a connection factory:

创建数据库连接需要向连接工厂传递一个URLString和用户凭证。

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2. Verifying the Connection

3.2.验证连接

Communications with the database are performed over a RESTful API, so they aren’t persistent.

与数据库的通信是通过RESTful API进行的,所以它们不是持久的。

The API offers a dedicated “ping” service to confirm that the connection is functional. If the connection is good, the response contains a database version. If not, it contains “unknown”.

该API提供了一个专门的 “ping “服务,以确认连接是否正常。如果连接是好的,响应包含一个数据库版本。如果不是,它就包含“未知”.

So after creating a connection, we can verify it by doing:

因此,在创建一个连接后,我们可以通过以下方式来验证它。

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
    log.error("Error pinging server.");
    return;
} 

3.3. Creating a Database

3.3.创建一个数据库

Creating an InfluxDB database is similar to creating a database on most platforms. But we need to create at least one retention policy before using it.

创建InfluxDB数据库与在大多数平台上创建数据库相似。但在使用之前,我们需要至少创建一个保留策略。

A retention policy tells the database how long a piece of data should be stored. Time series, such as CPU or memory statistics, tend to accumulate in large datasets.

保留策略告诉数据库一个数据应该被存储多长时间。时间序列,如CPU或内存统计数据,往往会在大型数据集中积累。

A typical strategy for controlling the size of time series databases is downsampling. “Raw” data is stored at a high rate, summarized, and then removed after a short time.

控制时间序列数据库规模的一个典型策略是下采样。”原始 “数据以较高的速度存储,并进行汇总,然后在短时间内删除。

Retention policies simplify this by associating a piece of data with an expiration time. InfluxData has an in-depth explanation on their site.

保留策略通过将一段数据与过期时间相关联来简化这一过程。 InfluxData在其网站上有一个深入的解释

After creating the database, we’ll add a single policy named defaultPolicy. It will simply retain data for 30 days:

创建数据库后,我们将添加一个名为defaultPolicy.的单一策略,它将简单地保留数据30天。

influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
  "defaultPolicy", "baeldung", "30d", 1, true);

To create a retention policy, we’ll need a name, the database, an interval, a replication factor (which should be 1 for a single-instance database), and a boolean indicating it’s a default policy.

要创建一个保留策略,我们需要一个名称、数据库、间隔、复制因子(对于单实例数据库来说应该是1)和一个boolean表示它是一个默认策略。

3.4. Setting a Logging Level

3.4.设置日志级别

Internally, InfluxDB API uses Retrofit and exposes an interface to Retrofit’s logging facility, via a logging interceptor.

在内部,InfluxDB API使用Retrofit,并通过logging拦截器,向Retrofit的日志设施提供了一个接口。

So, we can set the logging level using:

因此,我们可以用以下方法设置日志级别。

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

And now we can see messages when we open a connection and ping it:

现在,当我们打开一个连接并ping它时,我们可以看到信息。

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

The available levels are BASIC, FULL, HEADERS, and NONE.

可用的级别是BASICFULLHEADERS,和NONE。

4. Adding and Retrieving Data

4.添加和检索数据

4.1. Points

4.1.积分

So now we’re ready to start inserting and retrieving data.

所以现在我们已经准备好开始插入和检索数据了。

The basic unit of information in InfluxDB is a Point, which is essentially a timestamp and a key-value map.

InfluxDB的基本信息单位是一个Point,,它本质上是一个时间戳和一个键值映射。

Let’s have a look at a point holding memory utilization data:

让我们来看看一个持有内存利用率数据的点。

Point point = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743656L)
  .addField("used", 1015096L)
  .addField("buffer", 1010467L)
  .build();

We’ve created an entry that contains three Longs as memory statistics, a hostname, and a timestamp.

我们已经创建了一个条目,其中包含三个Long作为内存统计,一个主机名和一个时间戳。

Let’s see how to add this to the database.

让我们看看如何将其添加到数据库中。

4.2. Writing Batches

4.2.写作批次

Time series data tends to consist of many small points, and writing those records one at a time would be very inefficient. The preferred method is to collect records into batches.

时间序列数据往往由许多小点组成,而一次一次地写入这些记录将是非常低效的。首选方法是将记录收集成批。

The InfluxDB API provides a BatchPoint object:

InfluxDB的API提供了一个BatchPoint对象。

BatchPoints batchPoints = BatchPoints
  .database(dbName)
  .retentionPolicy("defaultPolicy")
  .build();

Point point1 = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1") 
  .addField("free", 4743656L)
  .addField("used", 1015096L) 
  .addField("buffer", 1010467L)
  .build();

Point point2 = Point.measurement("memory")
  .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743696L)
  .addField("used", 1016096L)
  .addField("buffer", 1008467L)
  .build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

We create a BatchPoint and then add Points to it. We set the timestamp for our second entry to 100 milliseconds in the past since the timestamps are a primary index. If we send two points with the same timestamp, only one will be kept.

我们创建一个BatchPoint,然后向其添加Points。我们将第二个条目的时间戳设置为过去的100毫秒,因为时间戳是一个主要索引。如果我们发送两个具有相同时间戳的点,只有一个会被保留。

Note that we must associate BatchPoints with a database and a retention policy.

请注意,我们必须将BatchPoints与一个数据库和一个保留策略联系起来。

4.3. Writing One at a Time

4.3.一次写一个

Batching may be impractical for some use-cases.

分批处理对于某些使用情况来说可能是不切实际的。

Let’s enable batch mode with a single call to an InfluxDB connection:

让我们通过对InfluxDB连接的一次调用来启用批处理模式。

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

We enabled batching of 100 for insertion into the server or sending what it has every 200 milliseconds.

我们启用了100个的批处理,以便插入服务器或每200毫秒发送它的东西。

With batch mode enabled, we can still write one at a time. However, some additional setup is required:

启用批处理模式后,我们仍然可以一次写一个。然而,需要一些额外的设置。

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

Moreover, now we can write individuals points, and they are being collected in batches by a background thread:

此外,现在我们可以写个人的点,而且这些点是由后台线程分批收集的。

influxDB.write(point);

Before we enqueue individual points, we need to set a database (similar to the use command in SQL) and set a default retention policy. Therefore, if we wish to take advantage of downsampling with multiple retention policies, creating batches is the way to go.

在我们对单个点进行排队之前,我们需要设置一个数据库(类似于SQL中的use命令)并设置一个默认的保留策略。因此,如果我们希望利用具有多个保留策略的下采样,创建批次是一种方式。

Batch mode utilizes a separate thread pool. So it’s a good idea to disable it when it’s no longer needed:

批处理模式利用了一个单独的线程池。因此,当不再需要时,禁用它是一个好主意:

influxDB.disableBatch();

Closing the connection will also shut down the thread pool:

关闭连接也将关闭线程池。

influxDB.close();

4.4. Mapping Query Results

4.4.映射查询结果

Queries return a QueryResult, which we can map to POJOs.

查询会返回一个查询结果,我们可以将其映射到POJO。

Before we look at the query syntax, let’s create a class to hold our memory statistics:

在我们看查询语法之前,让我们创建一个类来保存我们的内存统计数据。

@Measurement(name = "memory")
public class MemoryPoint {

    @Column(name = "time")
    private Instant time;

    @Column(name = "name")
    private String name;

    @Column(name = "free")
    private Long free;

    @Column(name = "used")
    private Long used;

    @Column(name = "buffer")
    private Long buffer;
}

The class is annotated with @Measurement(name = “memory”), corresponding to the Point.measurement(“memory”) we used to create our Points.

该类被注释为@Measurement(name = “memory”),对应于我们用来创建PointsPoint.measurement(“memory”)

For each field in our QueryResult, we add the @Column(name = “XXX”) annotation with the name of the corresponding field.

对于我们的QueryResult中的每个字段,我们添加@Column(name = “XXX”)注解,并加上相应字段的名称。

QueryResults are mapped to POJOs with an InfluxDBResultMapper.

QueryResults通过InfluxDBResultMapper映射到POJO。

4.5. Querying InfluxDB

4.5.查询InfluxDB

So let’s use our POJO with the points we added to the database in our two-point batch:

因此,让我们用我们的POJO与我们在两点批处理中添加到数据库的点一起使用。

QueryResult queryResult = connection
  .performQuery("Select * from memory", "baeldung");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

The query illustrates how our measurement named memory is stored as a table of Points that we can select from.

该查询说明了我们名为memory的测量是如何被存储为一个Points表的,我们可以从中选择

InfluxDBResultMapper accepts a reference to MemoryPoint.class with the QueryResult and returns a list of points.

InfluxDBResultMapper接受一个对MemoryPoint.class的引用与QueryResult并返回一个点的列表。

After we map the results, we verify that we received two by checking the length of the List we received from the query. Then we look at the first entry in the list and see the free memory size of the second point we inserted. The default ordering of query results from InfluxDB is ascending by timestamp.

在我们映射结果后,我们通过检查我们从查询中收到的List的长度来验证我们是否收到了两个。然后我们看一下列表中的第一个条目,看看我们插入的第二个点的可用内存大小。InfluxDB的查询结果的默认排序是按时间戳升序排列。

Let’s change that:

让我们改变这种状况。

queryResult = connection.performQuery(
  "Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

Adding order by time desc reverses the order of our results.

添加按时间排序,可以颠倒我们的结果顺序。

InfluxDB queries look very similar to SQL. There is an extensive reference guide on their site.

InfluxDB查询看起来与SQL非常相似。在他们的网站上有一个广泛的参考指南

5. Conclusion

5.结论

We’ve connected to an InfluxDB server, created a database with a retention policy, and then inserted and retrieved data from the server.

我们已经连接到一个InfluxDB服务器,创建了一个具有保留策略的数据库,然后从服务器上插入和检索数据。

The full source code of the examples is over on GitHub.

示例的完整源代码在GitHub上