Using a Spring Cloud App Starter – 使用Spring Cloud App Starter

最后修改: 2018年 2月 2日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In this article, we’ll demonstrate how to use the Spring Cloud App starters – which provide bootstrapped and ready-to-go applications – that can serve as starting points for future development.

在这篇文章中,我们将演示如何使用Spring Cloud App starters–它提供了引导性的、随时可以使用的应用程序,可以作为未来开发的起点。

Simply put, Task App Starters are dedicated for use-cases like database migration and distributed testing, and Stream App Starters provide integrations with external systems.

简单地说,任务应用程序启动器专门用于数据库迁移和分布式测试等使用情况,而流应用程序启动器提供与外部系统的集成。

Overall, there are over 55 starters; check out the official documentation here and here for more information about these two.

总的来说,有超过55个启动器;查看官方文档这里这里,了解有关这两个的更多信息。

Next, we’ll build a small distributed Twitter application that will stream Twitter posts into a Hadoop Distributed File System.

接下来,我们将建立一个小型的分布式Twitter应用程序,将Twitter的帖子流向Hadoop分布式文件系统。

2. Getting Setup

2.获取设置

We’ll use the consumer-key and access-token to create a simple Twitter app.

我们将使用consumer-keyaccess-token来创建一个简单的Twitter应用。

Then, we’ll set up Hadoop so we can persist our Twitter Stream for future Big Data purposes.

然后,我们将设置Hadoop,这样我们就可以为未来的大数据目的持久化我们的Twitter流。

Lastly, we have the option to either use the supplied Spring GitHub repositories to compile and assemble standalone components of the sourcesprocessors-sinks architecture pattern using Maven or combine sources, processors, and sinks through their Spring Stream binding interfaces.

最后,我们可以选择使用提供的Spring GitHub资源库,使用Maven编译和组装sourcesprocessors-sinks架构模式的独立组件,或者通过其Spring Stream绑定接口组合sourcesprocessorssinks

We’ll take a look at both ways to do this.

我们来看看这两种方法。

It’s worth noting that, formerly, all Stream App Starters were collated into one large repo at github.com/spring-cloud/spring-cloud-stream-app-starters. Each Starter has been simplified and isolated.

值得注意的是,以前,所有流应用启动程序都被整理到一个大的 repo 中,地址是 github.com/spring-cloud/spring-cloud-stream-app-starters。每个启动器都被简化和隔离了。

3. Twitter Credentials

3.Twitter凭证

First, let’s set up our Twitter Developer credentials. To get Twitter developer credentials, follow the steps to set up an app and create an access token from the official Twitter developer documentation.

首先,让我们来设置我们的Twitter开发者凭证。要获得Twitter开发者凭证,请按照Twitter官方开发者文档中的步骤来设置应用程序并创建访问令牌

Specifically, we’ll need:

具体来说,我们将需要。

  1. Consumer Key
  2. Consumer Key Secret
  3. Access Token Secret
  4. Access Token

Make sure to keep that window open or jot those down since we’ll be using those below!

请确保保持窗口开放或记下这些内容,因为我们将在下面使用这些内容。

4. Installing Hadoop

4.安装Hadoop

Next, let’s install Hadoop! We can either follow the official documentation or simply leverage Docker:

接下来,让我们来安装Hadoop!我们可以遵循官方文档或简单地利用Docker。

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. Compiling Our App Starters

5.编译我们的应用程序启动程序

To use freestanding, fully individual components, we can download and compile desired Spring Cloud Stream App Starters individually from their GitHub repositories.

要使用独立的、完全独立的组件,我们可以从GitHub仓库下载并单独编译所需的Spring Cloud Stream App Starters。

5.1. Twitter Spring Cloud Stream App Starter

5.1.Twitter Spring Cloud Stream App Starter

Let’s add the Twitter Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.twitterstream.source) to our project:

让我们把Twitter Spring Cloud Stream App Starter(org.springframework.cloud.stream.app.twitterstream.source)加入我们的项目。

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

Then, we run Maven:

然后,我们运行Maven。

./mvnw clean install -PgenerateApps

The resulting compiled Starter App will be available in ‘/target’ of the local project root.

编译后的Starter App将在本地项目根的’/target’中可用。

Then we can run that compiled .jar and pass in the relevant application properties like so:

然后,我们可以运行编译后的.jar,并像这样传入相关的应用程序属性。

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

We can also pass our credentials using the familiar Spring application.properties:

我们也可以使用熟悉的Spring application.properties:来传递我们的凭证。

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2. HDFS Spring Cloud Stream App Starter

5.2.HDFS Spring Cloud Stream App Starter

Now (with Hadoop already set up), let’s add the HDFS Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.hdfs.sink) dependency to our project.

现在(Hadoop已经设置好了),让我们把HDFS Spring Cloud Stream App Starter(org.springframework.cloud.stream.app.hdfs.sink)依赖性添加到我们的项目。

First, clone the relevant repo:

首先,克隆相关的 repo。

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

Then, run the Maven job:

然后,运行Maven作业。

./mvnw clean install -PgenerateApps

The resulting compiled Starter App will be available in ‘/target’ of the local project root. We can then run that compiled .jar and pass in relevant application properties:

编译后的Starter App将在本地项目根的’/target’中可用。然后我们可以运行编译后的.jar,并传入相关的应用程序属性。

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

hdfs://127.0.0.1:50010/‘ is the default for Hadoop but your default HDFS port may vary depending on how you configured your instance.

hdfs:/127.0.0.1:50010/“是Hadoop的默认端口,但你的默认HDFS端口可能会有所不同,这取决于你如何配置你的实例。

We can see the list of data nodes (and their current ports) at ‘http://0.0.0.0:50070‘given the configured we passed in previously.

我们可以在’http://0.0.0.0:50070‘看到数据节点的列表(以及它们当前的端口),鉴于我们之前传入的配置。

We can also pass our credentials using the familiar Spring application.properties before compilation – so we don’t have to always pass these in via CLI.

我们还可以在编译前使用熟悉的Spring application.properties 来传递我们的证书 – 所以我们不必总是通过CLI来传递这些证书。

Let’s configure our application.properties to use the default Hadoop port:

让我们来配置我们的应用.属性以使用默认的Hadoop端口:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. Using AggregateApplicationBuilder

6.使用AggregateApplicationBuilder

Alternatively, we can combine our Spring Stream Source and Sink through the org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder into a simple Spring Boot application!

另外,我们可以通过org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder将我们的Spring Stream Source Sink 结合到一个简单的Spring Boot应用中

First, we’ll add the two Stream App Starters to our pom.xml:

首先,我们将两个流应用启动器添加到我们的pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

Then we’ll begin combining our two Stream App Starter dependencies by wrapping them into their respective sub-applications.

然后,我们将开始结合我们的两个Stream App Starter依赖,把它们包装成各自的子应用。

6.1. Building Our App Components

6.1.构建我们的应用程序组件

Our SourceApp specifies the Source to be transformed or consumed:

我们的SourceApp指定了要被转换或消费的Source

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

Note that we bind our SourceApp to org.springframework.cloud.stream.messaging.Source and inject the appropriate configuration class to pick up the needed settings from our environmental properties.

请注意,我们将我们的SourceApporg.springframework.cloud.stream.messaging.Source绑定,并注入适当的配置类以从我们的环境属性中获取所需的设置。

Next, we set up a simple org.springframework.cloud.stream.messaging.Processor binding:

接下来,我们设置了一个简单的org.springframework.cloud.stream.messaging.Processor 绑定

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

Then, we create our consumer (Sink):

然后,我们创建我们的消费者(Sink)。

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

Here, we bind our SinkApp to org.springframework.cloud.stream.messaging.Sink and again inject the correct configuration class to use our specified Hadoop settings.

在这里,我们将我们的SinkApporg.springframework.cloud.stream.messaging.Sink绑定,并再次注入正确的配置类以使用我们指定的Hadoop设置。

Lastly, we combine our SourceApp, ProcessorApp, and our SinkApp using the AggregateApplicationBuilder in our AggregateApp main method:

最后,我们在AggregateApp主方法中使用AggregateApplicationBuilder将我们的SourceAppProcessorAppSinkApp结合起来。

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

As with any Spring Boot application, we can inject specified settings as environmental properties through application.properties or programmatically.

与任何Spring Boot应用程序一样,我们可以通过application.properties或编程方式将指定的设置作为环境属性注入。

Since we’re using the Spring Stream framework we can also pass our arguments into the AggregateApplicationBuilder constructor.

由于我们使用的是Spring Stream框架,我们也可以将我们的参数传递给AggregateApplicationBuilder构造函数。

6.2. Running the Completed App

6.2.运行已完成的应用程序

We can then compile and run our application using the following command line instructions:

然后,我们可以使用以下命令行指令编译和运行我们的应用程序。

    $ mvn install
    $ java -jar twitterhdfs.jar

Remember to keep each @SpringBootApplication class in a separate package (otherwise, several different binding exceptions will be thrown)! For more information about how to use the AggregateApplicationBuilder – have a look at the official docs.

记住要把每个@SpringBootApplication类放在一个单独的包里(否则,将抛出几个不同的绑定异常)!这时,你就会发现,在你的应用程序中,有很多不同的类。关于如何使用AggregateApplicationBuilder的更多信息 – 请看官方文档

After we compile and run our app we should see something like the following in our console (naturally the contents will vary by Tweet):

编译并运行我们的应用程序后,我们应该在控制台中看到类似以下的内容(自然,内容会因Tweet而异)。

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

Those demonstrate the correct operation of our Processor and Sink on receiving data from the Source! In this example, we haven’t configured our HDFS Sink to do much – it will simply print the message “Payload received!”

这些展示了我们的ProcessorSink在接收Source的数据时的正确操作!在这个例子中,我们没有配置我们的HDFS Sink来做什么–它将简单地打印消息 “收到有效载荷!”

7. Conclusion

7.结论

In this tutorial, we’ve learned how to combine two awesome Spring Stream App Starters into one sweet Spring Boot example!

在本教程中,我们已经学会了如何将两个很棒的Spring Stream App Starters结合到一个可爱的Spring Boot例子中去

Here are some other great official articles on Spring Boot Starters and how to create a customized starter!

这里还有一些关于Spring Boot Starters以及如何创建customized starter的官方优秀文章!

As always, the code used in the article can be found over on GitHub.

一如既往,文章中使用的代码可以在GitHub上找到over