欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink学习笔记-常用Source和Sink简单示例

程序员文章站 2022-07-14 14:16:15
...

Flink 1.10.0 版本目前支持以下的Connector

 

 

Maven配置Flink依赖

<flink.version>1.10.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

Collection Source

def collectionSource(env: StreamExecutionEnvironment): Unit = {
    val collection_stream = env.fromCollection(List(("zhang li zhang"), ("wang zhang li")))
    val stream = collection_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    stream.print()
}

Text File Source And Sink

def textFileSourceSink(env: StreamExecutionEnvironment):Unit = {
    val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\wc.txt")
    val stream = file_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    stream.print()
    stream.writeAsText("G:\\workspace\\flink\\src\\main\\resources\\r1")
    stream.writeAsCsv("G:\\workspace\\flink\\src\\main\\resources\\r2")
}

Socket Source And Sink

def socketSourceSink(env: StreamExecutionEnvironment):Unit = {
    val socket_stream:DataStream[String] = env.socketTextStream("192.168.0.1", 7000)
    val stream = socket_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    stream.print()
    stream.writeToSocket(hostname = "192.168.0.1", port = 7001, new SerializationSchema[(String, Int)] {
      override def serialize(t: (String, Int)): Array[Byte] = {
        var oos: ObjectOutputStream = null
        var baos: ByteArrayOutputStream = null
        try {
          baos = new ByteArrayOutputStream()
          oos = new ObjectOutputStream(baos)
          oos.writeBytes(t._1)
          oos.writeInt(t._2)
          return baos.toByteArray
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          baos.close()
          oos.close()
        }
        null
      }
    })
}

Kafka Source And Sink

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
def kafkaSourceSink(env: StreamExecutionEnvironment):Unit = {
    val properties1 = new Properties()
    properties1.setProperty("bootstrap.servers", "192.168.0.1:9092")
    properties1.setProperty("zookeeper.connect", "192.168.0.1:2181/kafka")
    properties1.setProperty("group.id", "tmp-1")
    properties1.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties1.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties1.setProperty("auto.offset.reset", "latest")
    val kafka_stream = env.addSource(new FlinkKafkaConsumer[String]("tmp1", new SimpleStringSchema(), properties1))
    val stream = kafka_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1).map(_.toString())
    stream.print("kafka consumer")
    val properties2 = new Properties()
    properties2.put("bootstrap.servers", "192.168.0.1:9092")
    stream.addSink(new FlinkKafkaProducer[String]("tmp2", new SimpleStringSchema(), properties2))
  
    stream.addSink(new FlinkKafkaProducer[String]("tmp2", new KafkaSerializationSchema[String] {
      override def serialize(value: String, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        var oos: ObjectOutputStream = null
        var baos: ByteArrayOutputStream = null
        try {
          baos = new ByteArrayOutputStream()
          oos = new ObjectOutputStream(baos)
          oos.writeBytes(value)
          return new ProducerRecord[Array[Byte], Array[Byte]]("tmp2", baos.toByteArray)
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          baos.close()
          oos.close()
        }
        null
      }
    }, properties2, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
    
}

Redis Sink

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
def redisSink(env: StreamExecutionEnvironment):Unit = {
    val jedisPoolConfig: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("192.168.0.1").setPort(6379).build()
    val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\wc.txt")
    val stream = file_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    stream.print()
    stream.addSink(new RedisSink[(String, Int)](jedisPoolConfig, new RedisMapper[(String, Int)] {
      override def getCommandDescription: RedisCommandDescription = {
        return new RedisCommandDescription(RedisCommand.HSET, "tmp1")
      }

      override def getKeyFromData(t: (String, Int)): String = t._1

      override def getValueFromData(t: (String, Int)): String = t._2.toString

    }))
}


def redisClusterSink(env: StreamExecutionEnvironment):Unit = {
    val nodes = new util.HashSet[InetSocketAddress]()
    nodes.add(new InetSocketAddress( "192.168.0.1",6373))
    nodes.add(new InetSocketAddress( "192.168.0.1",6374))
    nodes.add(new InetSocketAddress( "192.168.0.1",6375))
    nodes.add(new InetSocketAddress( "192.168.0.1",6376))
    nodes.add(new InetSocketAddress( "192.168.0.1",6377))
    nodes.add(new InetSocketAddress( "192.168.0.1",6378))
    val jedisClusterConfig: FlinkJedisClusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build()
    val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\wc.txt")
    val stream = file_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    stream.print()
    stream.addSink(new RedisSink[(String, Int)](jedisClusterConfig, new RedisMapper[(String, Int)] {
      override def getCommandDescription: RedisCommandDescription = {
        return new RedisCommandDescription(RedisCommand.HSET, "tmp1")
      }

      override def getKeyFromData(t: (String, Int)): String = t._1

      override def getValueFromData(t: (String, Int)): String = t._2.toString

    }))
}

Elasticsearch Sink

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_2.12</artifactId>
    <version>1.8.3</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
    <version>1.10.0</version>
</dependency>
def elasticsearch5Sink(env: StreamExecutionEnvironment): Unit = {
    val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\users.txt")
    file_stream.print()

    val config = new java.util.HashMap[String, String]
    config.put("cluster.name", "my-application")
    config.put("bulk.flush.max.actions", "1")

    val transportAddresses = new java.util.ArrayList[InetSocketAddress]
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 9300))

    val elasticsearchSink = new ElasticsearchSink[String](config, transportAddresses,
      new ElasticsearchSinkFunction[String] {
        override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val datas: Array[String] = element.split(",")
          val data = new util.HashMap[String, String]()
          data.put("id", datas(0))
          data.put("name", datas(1))
          data.put("phone", datas(2))
          data.put("age", datas(3))
          val indexRequest = Requests.indexRequest().index("user").source(data)
          requestIndexer.add(indexRequest)
        }
      },
      new ActionRequestFailureHandler() {
        @throws(classOf[Throwable])
        override def onFailure(actionRequest: ActionRequest, throwable: Throwable, restStatusCode: Int,
                               requestIndexer: RequestIndexer): Unit = {
          if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent()) {
            requestIndexer.add(actionRequest)
          } else if (ExceptionUtils.findThrowable(throwable, classOf[ElasticsearchParseException]).isPresent()) {

          } else {
            throw throwable
          }
        }
      }
    )

    file_stream.addSink(elasticsearchSink)
}


def elasticsearch7Sink(env: StreamExecutionEnvironment): Unit = {
    val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\users.txt")
    file_stream.print()
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("192.168.0.1", 9200, "http"))
    val elasticsearchSinkBuilder = new ElasticsearchSink.Builder[String](httpHosts,
      new ElasticsearchSinkFunction[String] {
        override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val datas: Array[String] = element.split(",")
          val data = new util.HashMap[String, String]()
          data.put("id", datas(0))
          data.put("name", datas(1))
          data.put("phone", datas(2))
          data.put("age", datas(3))
          val indexRequest = Requests.indexRequest().index("user").source(data)
          requestIndexer.add(indexRequest)
        }
      })
    elasticsearchSinkBuilder.setBulkFlushMaxActions(1)
    elasticsearchSinkBuilder.setRestClientFactory(new RestClientFactory {
      override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
        /**
        restClientBuilder.setPathPrefix()
        restClientBuilder.setDefaultHeaders()
        restClientBuilder.setRequestConfigCallback()
        restClientBuilder.setHttpClientConfigCallback()
        restClientBuilder.setFailureListener()
        **/
      }
    })
    elasticsearchSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
      @throws(classOf[Throwable])
      override def onFailure(actionRequest: ActionRequest, throwable: Throwable, restStatusCode: Int,
                             requestIndexer: RequestIndexer): Unit = {
        if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent()) {
          requestIndexer.add(actionRequest)
        } else if (ExceptionUtils.findThrowable(throwable, classOf[ElasticsearchParseException]).isPresent()) {

        } else {
          throw throwable
        }
      }
    })

    file_stream.addSink(elasticsearchSinkBuilder.build())
}

自定义Source

class CustomSource() extends SourceFunction[String] {

    var runFlag: Boolean = true

    val values: Array[String] = Array[String]("zhang", "wang", "li", "zhao", "qi", "chu", "yan", "han")

    override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
      val random = new Random()
      while (runFlag) {
        sourceContext.collect(values(random.nextInt(values.length)))
        Thread.sleep(2000)
      }
    }

    override def cancel(): Unit = {
      runFlag = false
    }

  }

def customSource(env: StreamExecutionEnvironment):Unit = {
    val custom_stream = env.addSource(new CustomSource())
    val stream = custom_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    stream.print()
}

自定义Sink,JdbcSink

class JdbcSink() extends RichSinkFunction[String] {
    
    var connection: Connection = _
    var insertStmt: PreparedStatement = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      connection = DriverManager.getConnection("jdbc:mysql://192.168.0.1:3306/test", "root", "")
      insertStmt = connection.prepareStatement("INSERT INTO T_USER(ID,NAME,PHONE) VALUES (?,?,?)")
    }

    override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
      val datas: Array[String] = value.split(",")
      insertStmt.setInt(1, datas(0).toInt)
      insertStmt.setString(2, datas(1))
      insertStmt.setString(3, datas(2))
      insertStmt.execute()
      print(insertStmt.getUpdateCount)
    }

    override def close(): Unit = {
      insertStmt.close()
      connection.close()
    }

  }

def jdbcSink(env: StreamExecutionEnvironment): Unit = {
    val collection_stream = env.fromCollection(List(("21,zhangsan1,13100000001"), ("22,zhangsan2,13100000002")))
    collection_stream.addSink(new JdbcSink())
}