欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Flink]测试用的fake温度传感器

程序员文章站 2022-11-07 20:55:00
Flink-测试用的fake温度传感器 Flink中,测试时,会用到自定义的source。 下为一例。。 该例使用温度传感器的格式演示fake日志数据源。 代码用Scala写的。 传感器... 传感器 - 样例类 SensorReads.scala: ​x 1 package sr 2 ​ 3 /* ......

 

flink-测试用的fake温度传感器

 

flink中,测试时,会用到自定义的source。

下为一例。。 该例使用温度传感器的格式演示fake日志数据源。

代码用scala写的。

 

传感器...

 

  • 传感器 - 样例类

    sensorreads.scala

     
     
     
    x
     
     
     
     
    1
    package sr
    2
    3
    /**
    4
     * 
    5
     */
    6
    case class sensorreads(id:string,
    7
                           timestap:long,
    8
                           tempture:double)
     
     

 

  • 传感器 - 数据源模拟

    snsorsrc_4096t.scala

     
     
     
    46
     
     
     
     
    1
    package sr
    2
    3
    import org.apache.flink.streaming.api.functions.source.sourcefunction
    4
    import scala.util.random
    5
    6
    /**
    7
     * period, is 4096 millis.
    8
     */
    9
    case class snsorsrc_4096t extends sourcefunction[sensorreads] {
    10
    
    
    11
        var isinrunning: boolean = true
    12
    
    
    13
        ////
    14
        override def run(sourcecontext: sourcefunction.sourcecontext[
    15
                sensorreads]): unit = {
    16
    
    
    17
    
    
    18
            val rand: random = new random
    19
    
    
    20
            var tptnow4 =
    21
                (1 to 4).map(
    22
                    "snsor_" + _.tostring -> (23 + 16 * rand.nextgaussian))
    23
    24
    
    
    25
    
    
    26
            while (isinrunning) {
    27
                tptnow4 = tptnow4.map(
    28
                    t => t._1 -> (t._2 + rand.nextgaussian))
    29
    
    
    30
    
    
    31
                val timestampnow: long = system.currenttimemillis
    32
    
    
    33
                tptnow4.foreach{
    34
                    t =>
    35
                        sourcecontext.collect( // o.u.t
    36
                            sensorreads(t._1, timestampnow, t._2) )
    37
                    thread.sleep(512)  }
    38
                //not set, is stm
    39
    
    
    40
                thread.sleep(2048)  }
    41
    
    
    42
        }
    43
    
    
    44
        override def cancel(): unit = isinrunning = false
    45
    
    
    46
    }
     
     

 

 

测试

 

snsrsrcaappli.scala

 
 
 
13
 
 
 
 
1
package applis
2
3
import org.apache.flink.streaming.api.scala._
4
import sr._
5
6
object snsrsrcaappli extends app{
7
    val env = streamexecutionenvironment.getexecutionenvironment
8

9
    env.addsource(snsorsrc_4096t() )
10
                    .print("aaa")
11

12
    env.execute()
13
}
 
 

 

数据源模拟用case-class,此处使用则可以不写new。

 

输出

 

idea控制台上run:

 
 
 
17
 
 
 
 
1
log4j:warn no appenders could be found for logger (org.apache.flink.api.scala.closurecleaner$).
2
log4j:warn please initialize the log4j system properly.
3
log4j:warn see http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
aaa:3> sensorreads(snsor_1,1573556705508,30.383394411578916)
5
aaa:4> sensorreads(snsor_2,1573556705508,21.397405872448672)
6
aaa:5> sensorreads(snsor_3,1573556705508,20.598086139457727)
7
aaa:6> sensorreads(snsor_4,1573556705508,18.30066983735531)
8
aaa:7> sensorreads(snsor_1,1573556709627,30.120955223032546)
9
aaa:8> sensorreads(snsor_2,1573556709627,22.38746867201145)
10
aaa:1> sensorreads(snsor_3,1573556709627,20.45357507067989)
11
aaa:2> sensorreads(snsor_4,1573556709627,17.18467261133715)
12
aaa:3> sensorreads(snsor_1,1573556713729,31.686487593592904)
13
aaa:4> sensorreads(snsor_2,1573556713729,20.67106361911623)
14
aaa:5> sensorreads(snsor_3,1573556713729,21.27724215221553)
15
aaa:6> sensorreads(snsor_4,1573556713729,16.84273306583804)
16
17
process finished with exit code -1
 
 

 

...

如果snsorsrc_4096t.scala中,「当前温度」.foreach这样写:

 
 
 
5
 
 
 
 
1
tptnow4.foreach{
2
    t =>
3
        sourcecontext.collect( // o.u.t
4
            sensorreads(t._1, system.currenttimemillis, t._2) )
5
    thread.sleep(512)  }
 
 

 

那么结果就会是:

 
 
 
25
 
 
 
 
1
log4j:warn no appenders could be found for logger (org.apache.flink.api.scala.closurecleaner$).
2
log4j:warn please initialize the log4j system properly.
3
log4j:warn see http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
aaa:5> sensorreads(snsor_1,1573561932216,20.427373784204445)
5
aaa:6> sensorreads(snsor_2,1573561932739,19.043151948599565)
6
aaa:7> sensorreads(snsor_3,1573561933251,16.506314894849734)
7
aaa:8> sensorreads(snsor_4,1573561933764,42.18791135873409)
8
aaa:1> sensorreads(snsor_1,1573561936326,20.216273863226476)
9
aaa:2> sensorreads(snsor_2,1573561936838,19.77488458362011)
10
aaa:3> sensorreads(snsor_3,1573561937351,17.49661332626548)
11
aaa:4> sensorreads(snsor_4,1573561937864,42.37076203420432)
12
aaa:5> sensorreads(snsor_1,1573561940425,19.582646754534526)
13
aaa:6> sensorreads(snsor_2,1573561940938,18.148182987020572)
14
aaa:7> sensorreads(snsor_3,1573561941451,17.028248074961432)
15
aaa:8> sensorreads(snsor_4,1573561941963,42.969281620777075)
16
aaa:1> sensorreads(snsor_1,1573561944525,20.659855873131406)
17
aaa:2> sensorreads(snsor_2,1573561945038,19.437515708059177)
18
aaa:3> sensorreads(snsor_3,1573561945550,18.336847248220565)
19
aaa:4> sensorreads(snsor_4,1573561946063,43.58727112744526)
20
aaa:5> sensorreads(snsor_1,1573561948625,19.317498008380674)
21
aaa:6> sensorreads(snsor_2,1573561949137,21.86602577501872)
22
aaa:7> sensorreads(snsor_3,1573561949650,19.109322091177216)
23
aaa:8> sensorreads(snsor_4,1573561950163,43.48043890977487)
24
25
process finished with exit code -1
 
 

这样一来事件时间就都不一样了。 可根据需要模拟的情况改动....