欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

MLSQL Stack如何让流调试更加简单详解

程序员文章站 2023-11-28 10:12:10
前言 有一位同学正在调研mlsql stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点: 能随时查看最新固定条数的kafka数据...

前言

有一位同学正在调研mlsql stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:

  • 能随时查看最新固定条数的kafka数据
  • 调试结果(sink)能打印在web控制台
  • 流程序能自动推测json schema(现在spark是不行的)

实现这三个点之后,我发现调试确实就变得简单很多了。

流程

首先我新建了一个kaf_write.mlsql,里面方便我往kafka里写数据:

set abc='''
{ "x": 100, "y": 200, "z": 200 ,"datatype":"a group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
{ "x": 120, "y": 100, "z": 260 ,"datatype":"b group"}
''';
load jsonstr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

这样我每次运行,数据就能写入到kafka.

接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:

!kafkatool sampledata 10 records from "127.0.0.1:9092" wow;

这句话表示,我要采样kafka 10条kafka数据,该kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:

MLSQL Stack如何让流调试更加简单详解

没有什么问题。接着我写一个非常简单的流式程序:

-- the stream name, should be uniq.
set streamname="streamexample";

-- use kafkatool to infer schema from kafka
!kafkatool registerschema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select * from newkafkatable1
as table21;


-- print in webconsole instead of terminal console.
save append table21 
as webconsole.`` 
options mode="append"
and duration="15"
and checkpointlocation="/tmp/s-cpl4";

运行结果如下:

MLSQL Stack如何让流调试更加简单详解

在终端我们也可以看到实时效果了。

补充

当然,mlsql stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批sql进行处理,最后入库。参看如下脚本:

-- the stream name, should be uniq.
set streamname="streamexample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestamptype":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestamptype":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestamptype":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestamptype":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestamptype":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestamptype":0}
''';

-- load data as table
load jsonstr.`data` as datasource;

-- convert table as stream source
load mockstream.`datasource` options 
stepsizerange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21 
as custom.`` 
options mode="append"
and duration="15"
and sourcetable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointlocation="/tmp/cpl15";

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。