欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apache Kafka应用程序中的机器学习和实时分析(第二部分)

程序员文章站 2022-04-08 08:45:29
...

在优锐课的java学习分享中,这篇文章探讨了构建ML基础结构的特定部分:在Kafka应用程序中部署分析模型以进行实时预测。

文章的第一部分在这啦,记得先看哦,不然看不懂哈,更有连贯性。

Apache Kafka应用程序中的机器学习和实时分析(第一部分)

嵌入式模型的流处理

除了使用模型服务器和RPC通信之外,你还可以将模型直接嵌入到Kafka应用程序中。 这可以是利用Kafka Streams或KSQL的Kafka本机流处理应用程序,也可以使用Java,Scala,Python或Go之类的Kafka客户端API。

在这种情况下,不依赖于外部模型服务器。 该模型已在应用程序中加载,例如使用Kafka Streams应用程序中的TensorFlow Java API:
同样,实现Kafka应用程序很简单。 以下是将TensorFlow模型嵌入到Kafka Streams应用程序中以进行实时预测的代码段:
1.导入Kafka和TensorFlow API:
Java

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;

2.从数据存储区(例如,Amazon S3链接)或内存(例如,从Kafka主题接收)中加载TensorFlow模型:
Java

// Step 1: Load Keras TensorFlow Model using DeepLearning4J API
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
System.out.println(simpleMlp.toString());
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);

3.配置Kafka Streams应用程序:
Java

// Configure Kafka Streams Application
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
// Specify default (de)serializers for record keys and for record values
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

4.将TensorFlow模型应用于流数据:
Java

final KStream<String, String> inputEvents = builder.stream(inputTopic);
inputEvents.foreach((key, value) -> {
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
// Model inference in real time:
output = model.output(input);
prediction = output.toString();
});

5.启动Kafka应用程序:
Java

final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();

你甚至可以使用众所周知的测试库编写单元测试,如使用JUnit和Kafka Streams测试库的单元测试的示例所示。

以下是使用KSQL用户定义函数(UDF)进行模型部署的示例:

你所需要做的就是实现KSQL UDF Java接口,然后将UDF部署到KSQL服务器。 先前的博客文章中详细描述了如何构建自己的KSQL UDF。 使用这种方法,最终用户可以编写SQL查询以实时应用分析模型。

应将哪些模型直接嵌入到应用程序中?

并非每种模型都适合嵌入到应用程序中。 在决定嵌入是否有意义时应考虑的因素包括:
模型性能:越快越好
模型二进制格式:最好是编译的Java字节码
型号大小:较少的MB和较少的内存是首选
模型服务器功能:开箱即用,自己构建或不需要

用Python代码编写的模型比较慢,因为它是一种动态语言,必须在运行时解释许多变量和命令。

H2O Java类(例如决策树)执行速度非常快,通常只需几微秒。

小型TensorFlow Protobuf神经网络,只需几个MB或更少的负载即可快速完成。

拥有100 MB或更多内存的大型TensorFlow Protobuf神经网络需要大量内存,并且执行速度相对较慢。

基于标准的模型(例如,基于PMML或ONNX的XML / JSON)包括模型处理(例如数据预处理)以外的其他步骤。 使用这些标准通常会带来组织挑战和技术限制/约束,并且性能通常比TensorFlow的SavedModel等本地序列化模型差。

最终,是否应该将模型直接嵌入到你的应用程序中取决于模型本身,硬件基础结构以及项目的需求。

在Kafka应用程序中重建模型服务器的功能并不困难

将模型嵌入到应用程序中意味着你没有开箱即用的模型服务器功能。 你将必须自己实现它们。 问自己的第一个问题是:我需要模型服务器的功能吗? 我需要动态更新模型吗? 那版本控制呢? A / B测试? 金丝雀?
好消息是实现这些功能并不难。 根据你的要求和工具集,你可以:

启动新版本的应用程序(例如Kubernetes窗格)
通过Kafka主题发送和使用模型或权重
通过API(例如TensorFlow Java API)动态加载新版本
利用服务网格(例如Envoy,Linkerd或Istio)而不是用于A / B测试,绿色/蓝色部署,黑暗发射等的模型服务器。

让我们评估一下在Kafka应用程序中利用分析模型的两种方法之间的权衡。

模型服务器与嵌入模型的权衡

你可以将分析模型部署到模型服务器中并使用RPC通信,也可以将它们直接嵌入到应用程序中。 没有最佳选择,因为它取决于你的基础结构,要求和功能。

为什么将模型服务器和RPC与事件流应用程序一起使用?

与现有技术和组织流程的简单集成
更容易理解,如果你来自非事件流世界
以后可以迁移到真实流
内置模型管理功能,可用于不同模型,版本控制和A / B测试
内置监控

为什么将模型嵌入事件流应用程序?

借助本地推理实现更好的延迟,而无需进行远程呼叫
脱机推断(设备,边缘处理等)
Kafka Streams应用程序的可用性,可伸缩性和延迟/吞吐量与RPC接口的SLA之间没有耦合
无副作用(例如在发生故障的情况下)-卡夫卡处理涵盖了所有内容(例如仅发生一次)

两种选择各有优缺点,根据情况在不同情况下建议使用。

使用Kubernetes进行云原生模型部署

在云原生基础架构中,有可能获得两种方法的好处。 让我们使用Kubernetes作为我们的云原生环境,尽管其他云原生技术也可以提供类似的功能。

如果将分析模型嵌入到Kafka应用程序中,则将获得单独的pod的所有优点,该pod具有用于流处理和模型推理的容器。 对模型服务器没有外部依赖性。

在以下示例中,你可以使用嵌入式模型独立地扩展Kafka Streams应用程序,启动新版本,进行A / B测试或其他路由,并使用Envoy或Linkerd等云原生代理执行错误处理:

如果你仍然想获得模型服务器的优点和功能,则可以使用sidecar设计模式。 Kubernetes支持将具有特定任务的其他容器添加到你的Pod中。 在以下示例中,Kafka Streams应用程序部署在一个容器中,模型服务器作为Sidecar部署在同一吊舱内的另一个容器中。

这样就可以利用模型服务器的功能以及单个容器的健壮性和可扩展性。 它仍然具有在每个容器之间使用RPC的缺点。 通过将两个容器部署在同一容器中,可以最大程度地减少等待时间和潜在的错误。

边缘模型部署

模型并不总是部署在云或数据中心中。 在某些情况下,模型会部署在边缘。 边缘部署可能意味着:

边缘数据中心或边缘设备/机器
边缘有一个Kafka集群,一个Kafka经纪人或一个Kafka客户
功能强大的客户端(例如KSQL或Java)或轻量级客户端(例如C或JavaScript)
嵌入式或RPC模型推断
本地或远程培训
法律和合规性影响

对于某些电信提供商来说,边缘计算的定义是超低延迟,且端到端通信时间不到100毫秒。 这是通过诸如开源云基础架构软件堆栈StarlingX之类的框架实现的,该框架需要完整的OpenStack和Kubernetes集群以及对象存储。 对于其他人来说,edge表示你在其中部署非常小的轻量级C应用程序和模型的移动设备,轻量级板或传感器。

从卡夫卡的角度来看,有很多选择。 你可以使用librdkafka(本机Kafka C / C ++客户端库)完全构建轻量级的边缘应用程序,该库由Confluent完全支持。 还可以使用JavaScript并利用REST代理或WebSocket集成进行Kafka通信,将模型嵌入移动应用程序中。

Kafka的技术独立模型部署

在过程和技术上,模型部署可以与模型训练完全分开。部署基础架构可以处理不同的模型,甚至可以使用不同的机器学习框架训练模型。 Kafka还为构建机器学习监控提供了良好的基础,包括基础设施的技术监控和特定于模型的监控,例如性能或模型准确性。

无论你是否要使用Kafka实施所有功能(包括数据集成,预处理,模型部署和监视),或者是否仅使用Kafka客户端将模型嵌入到真实模型中,Kafka都是机器学习基础设施的理想选择和补充工具时间的Kafka客户端(与数据预处理和模型训练完全分开)。

对于模型部署,存在两种选择:模型服务器(RPC)和嵌入式模型。了解每种方法的利弊将有助于你为项目做出正确的决定。实际上,将分析模型嵌入到Kafka应用程序中很简单,而且非常有用。

喜欢这篇文章的可以点个赞,欢迎大家留言评论,记得关注我,每天持续更新技术干货、职场趣事、海量面试资料等等
如果你对java技术很感兴趣也可以加入我的java学习群 V–(ddmsiqi)来交流学习,里面都是同行,验证【CSDN2】有资源共享。
不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代
Apache Kafka应用程序中的机器学习和实时分析(第二部分)