欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

第 17 节 DataStream之partition-scala

程序员文章站 2022-06-16 16:38:14
...

上篇:第 16 节 DataStream之算子操作(scala语言)


1、redis实现sink存储数据基本操作

(1)具体代码实现:

package xuwei.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}


object StreamingDataToRedisScala {
  def main(args: Array[String]): Unit = {
    //获取socket端口号
    val port =9000

    //获取flink的运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //链接socket获取输入数字
    val text=  env.socketTextStream("flink102",port,'\n')
    //注意:必须要添加这一行隐式转换,否者下面的flatMap方法执行会报错
    import org.apache.flink.api.scala._

    val word_Data = text.map(line => ("1_word_scala", line))

    val config = new FlinkJedisPoolConfig.Builder().setHost("flink102").setPort(6379).build()

    val redisSink = new RedisSink[Tuple2[String, String]](config, new MyRedisMapper)

    word_Data.addSink(redisSink)

    //执行任务
    env.execute("Socket window count")
  }
   class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{
     override def getCommandDescription = {
       new RedisCommandDescription(RedisCommand.LPUSH)

     }

     override def getKeyFromData(t: (String, String)): String = {
         t._1
     }

     override def getValueFromData(t: (String, String)): String = {
       t._2
     }
   }

}

(2)启动虚拟机,执行nc -l 9000

[root@flink102 ~]# nc -l 9000

(3)启动redis的服务,进入redis的客户端

//启动redis的服务
[root@flink102 ~]# service redisd start
Starting Redis server...
5067:C 09 Mar 11:44:10.848 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
5067:C 09 Mar 11:44:10.848 # Redis version=4.0.6, bits=64, commit=00000000, modified=0, pid=5067, just started
5067:C 09 Mar 11:44:10.848 # Configuration loaded
[root@flink102 ~]# 

//进入redis的客户端
[root@flink102 src]# redis-cli 
127.0.0.1:6379> 

(3)启动idea的代码程序
第 17 节 DataStream之partition-scala
(4)在虚拟机的nc -l 9000命令下,输入一些数据

[root@flink102 ~]# nc -l 9000

//输入数据
flink
hadoop

(5)在redis查看数据信息

127.0.0.1:6379> keys *
1) "1_word_scala"  //数据已经进来了
2) "1_words"   
127.0.0.1:6379> 

//查看数据信息
127.0.0.1:6379> lrange 1_word_scala 0 -1
1) "hadoop"
2) "flink"

//监控状态
127.0.0.1:6379> monitor
OK


//发送信息
[root@flink102 ~]# nc -l 9000
123
kill
kafka

//接收信息
1583754814.495235 [0 192.168.219.1:56879] "LPUSH" "1_word_scala" "123"
1583754820.331190 [0 192.168.219.1:56881] "LPUSH" "1_word_scala" "kill"
1583754823.049288 [0 192.168.219.1:56791] "LPUSH" "1_word_scala" "kafka"

相关标签: Flink入门实战