Flink入门(二)(使用kafka作为sink和source)
在Mac电脑上安装使用kafka
使用kafka需要先安装zookeeper作为注册中心,在Mac上可以先安装homebrew然后再使用homebrew作为工具安装kafka和zookeeper
brew install kafka
brew install zookeeper
进入 /usr/local/Cellar下分别启动kafka和zookeeper
启动命令为
zookeeper命令为
zkServer start
kafka的各种命令为
Kafka 服务端启动启动
kafka-server-start /usr/local/etc/kafka/server.properties
kafka生产者启动
kafka-console-producer --broker-list localhost:9092 --topic first
kafka消费者启动
kafka-console-consumer --bootstrap-server localhost:9092 --topic pktest
kafka创建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pktest
kafka展示topic列表
kafka-topics --list --zookeeper localhost:2181
Scala 使用Kafka作为Source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
var topic ="pktest"
var properties = new Properties()
properties.setProperty("bootstrap.servers","127.0.0.1:9092")
properties.setProperty("group.id","test")
val data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties))
data.print()
env.execute("StreamingJob")
}
}
上述即为Kafka作为消费者的例子,我们在启动producer的窗口中进行输入,即可在控制台中看到输出结果
使用Kafka作为生产者
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
object KafkaConnectorProducer {
def main(args: Array[String]): Unit = {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
var data = env.socketTextStream("localhost",9999)
var topic ="pktest"
var properties = new Properties()
properties.setProperty("bootstrap.servers","127.0.0.1:9092")
//properties.setProperty("group.id","test")
//env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties))
var kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),properties)
data.addSink(kafkaSink)
env.execute("KafkaConnectorProducer")
}
}
此例子使用nc -lk 9000 作为数据源。经过Flink包装后传输到对应的Topic在 nc -lk 9000的命令行窗口输入一些字符并回车。即可在对应的kafka消费者命令窗口看到消费的数据