欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apache Kafka应用程序中的机器学习和实时分析(第一部分)

程序员文章站 2022-04-08 08:45:17
...

在优锐课的java学习分享中,这篇文章探讨了构建ML基础结构的特定部分:在Kafka应用程序中部署分析模型以进行实时预测。

Apache Kafka与机器学习(ML)之间的关系是一个有趣的话题,我在如何使用Apache Kafka在生产中构建和部署可伸缩机器学习以及使用Apache Kafka推动尖端机器学习方面写了很多。

这篇文章介绍了构建机器学习基础架构的特定部分:在Kafka应用程序中部署分析模型以进行实时预测。

模型训练和模型部署可以是两个单独的过程。但是,你还可以使用许多相同的步骤进行集成和数据预处理,因为你经常需要对模型训练和模型推理执行相同的数据集成,过滤,扩充和聚合。

我们将讨论和比较两种不同的模型部署选项:具有远程过程调用(RPC)的模型服务器以及将模型本地嵌入到Kafka客户端应用程序中。我们的示例专门使用TensorFlow,但基本原理也适用于其他机器学习/深度学习框架或产品,例如H2O.ai,Deeplearning4j,Google的云机器学习引擎和SAS。

TensorFlow —一个用于机器学习/深度学习的开源库
TensorFlow是一个用于高性能数值计算的开源软件库。 其灵活的体系结构允许在从台式机到服务器集群到移动和边缘设备的各种平台(CPU,GPU,TPU等)上轻松部署计算。 它最初由Google的AI组织中的Google Brain团队的研究人员和工程师开发,为机器学习和深度学习提供了强大的支持,并在许多领域中得到使用。 TensorFlow是一个完整的生态系统,而不仅仅是一个组件。

鉴于此博文专注于模型服务,我们主要对SavedModel对象感兴趣,该对象存储了经过训练的模型,并且TensorFlow Serving作为模型服务器:

SavedModel本质上是一个二进制文件,已使用协议缓冲区(Protobuf)进行了序列化。 用C,Python,Java等生成的类可以加载,保存和访问数据。 文件格式是人类可读的TextFormat(.pbtxt)或压缩的二进制协议缓冲区(.pb)。 图对象是TensorFlow中计算的基础。 权重保存在单独的检查点文件中。

由于我们专注于TensorFlow模型的部署,因此预先训练模型的方式并不重要。 你可以利用Cloud ML Engine及其Google Cloud Platform(GCP)生态系统等云服务和集成管道,也可以构建自己的管道进行模型训练。 Kafka不仅可以在模型部署中发挥重要作用,而且在数据集成,预处理和监视中也可以发挥关键作用。

使用模型服务器和RPC进行流处理

模型服务器可以自我管理,也可以由分析供应商或云提供商托管。 模型服务器不仅为模型推理部署和缓存模型,而且还提供附加功能,例如版本控制或A / B测试。 从应用程序到模型服务器的通信通常是通过RPC通过HTTP或gRPC完成的。 对于每一个事件,Kafka应用程序与模型服务器之间都会进行这种请求-响应通信。

有许多型号服务器可用。 你可以从Seldon Server,PredictionIO和Hydrosphere.io等开源模型服务器中进行选择,也可以利用H2O.ai,DataRobot,IBM或SAS等分析供应商的模型服务器。

本文使用TensorFlow Serving,即TensorFlow的模型服务器。 它可以是自托管的,也可以使用Cloud ML Engine服务。 TensorFlow Serving具有以下特征:

包含gRPC和HTTP端点
执行模型版本控制而无需更改任何客户端代码
计划将单个推理请求分组为批,以便联合执行
优化推理时间以最小化延迟
支持许多可服务项(可服务项是模型或用于提供与模型一起提供的数据的任务):
TensorFlow模型
嵌入
词汇查询表
功能转换
非基于TensorFlow的模型
能够canarying和A / B测试的

以下是Kafka应用程序与模型服务器之间的通信方式:

The process for implementing a Kafka application is straightforward. 实施Kafka应用程序的过程非常简单。 以下是Kafka Streams应用程序和RPC到TensorFlow服务的代码段:

1.导入Kafka和TensorFlow服务API:
Java

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser;

2.配置Kafka Streams应用程序:
Java

// Configure Kafka Streams Application
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique
// in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example");
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

3.执行RPC进行TensorFlow服务(如果RPC失败,则捕获异常)
Java

KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> {

            System.out.println("Image path: " + value);


            imagePath = value;


            TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port);


            System.out.println("Image = " + imagePath);

            InputStream jpegStream;

            try {

                jpegStream = new FileInputStream(imagePath);

                

                // Prediction of the TensorFlow Image Recognition model:

                List<Map.Entry<String, Double>> list = recogniser.recognise(jpegStream);

                String prediction = list.toString();

                System.out.println("Prediction: " + prediction);

                recogniser.close();

                jpegStream.close();

   

                                 return prediction;

            } catch (Exception e) {

                e.printStackTrace();

      

                                 return Collections.emptyList().toString();

            }


        });

4.启动Kafka应用程序:
Java

// Start Kafka Streams Application to process new incoming images from the Input Topic
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();

文章写到这里,下次再更新第二章节。

喜欢这篇文章的可以点个赞,欢迎大家留言评论,记得关注我,每天持续更新技术干货、职场趣事、海量面试资料等等
如果你对java技术很感兴趣也可以加入我的java学习群 V–(ddmsiqi)来交流学习,里面都是同行,验证【CSDN2】有资源共享。
不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代
Apache Kafka应用程序中的机器学习和实时分析(第一部分)