第 17 节 DataStream之partition-scala
程序员文章站
2022-06-16 16:38:14
...
上篇:第 16 节 DataStream之算子操作(scala语言)
1、redis实现sink存储数据基本操作
(1)具体代码实现:
package xuwei.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object StreamingDataToRedisScala {
def main(args: Array[String]): Unit = {
//获取socket端口号
val port =9000
//获取flink的运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//链接socket获取输入数字
val text= env.socketTextStream("flink102",port,'\n')
//注意:必须要添加这一行隐式转换,否者下面的flatMap方法执行会报错
import org.apache.flink.api.scala._
val word_Data = text.map(line => ("1_word_scala", line))
val config = new FlinkJedisPoolConfig.Builder().setHost("flink102").setPort(6379).build()
val redisSink = new RedisSink[Tuple2[String, String]](config, new MyRedisMapper)
word_Data.addSink(redisSink)
//执行任务
env.execute("Socket window count")
}
class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{
override def getCommandDescription = {
new RedisCommandDescription(RedisCommand.LPUSH)
}
override def getKeyFromData(t: (String, String)): String = {
t._1
}
override def getValueFromData(t: (String, String)): String = {
t._2
}
}
}
(2)启动虚拟机,执行nc -l 9000
[root@flink102 ~]# nc -l 9000
(3)启动redis的服务,进入redis的客户端
//启动redis的服务
[root@flink102 ~]# service redisd start
Starting Redis server...
5067:C 09 Mar 11:44:10.848 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
5067:C 09 Mar 11:44:10.848 # Redis version=4.0.6, bits=64, commit=00000000, modified=0, pid=5067, just started
5067:C 09 Mar 11:44:10.848 # Configuration loaded
[root@flink102 ~]#
//进入redis的客户端
[root@flink102 src]# redis-cli
127.0.0.1:6379>
(3)启动idea的代码程序
(4)在虚拟机的nc -l 9000
命令下,输入一些数据
[root@flink102 ~]# nc -l 9000
//输入数据
flink
hadoop
(5)在redis查看数据信息
127.0.0.1:6379> keys *
1) "1_word_scala" //数据已经进来了
2) "1_words"
127.0.0.1:6379>
//查看数据信息
127.0.0.1:6379> lrange 1_word_scala 0 -1
1) "hadoop"
2) "flink"
//监控状态
127.0.0.1:6379> monitor
OK
//发送信息
[root@flink102 ~]# nc -l 9000
123
kill
kafka
//接收信息
1583754814.495235 [0 192.168.219.1:56879] "LPUSH" "1_word_scala" "123"
1583754820.331190 [0 192.168.219.1:56881] "LPUSH" "1_word_scala" "kill"
1583754823.049288 [0 192.168.219.1:56791] "LPUSH" "1_word_scala" "kafka"
上一篇: 前后台校验示例及讲解
下一篇: Apache配置 CentOS7