flink sink
程序员文章站
2022-07-14 14:01:58
...
Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作
kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.0</version>
</dependency>
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")
sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)
Redis
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
object MyRedisUtil {
val conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop1").setPort(6379).build()
def getRedisSink(): RedisSink[(String,String)] ={
new RedisSink[(String,String)](conf,new MyRedisMapper)
}
class MyRedisMapper extends RedisMapper[(String,String)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "channel_count")
// new RedisCommandDescription(RedisCommand.SET )
}
override def getValueFromData(t: (String, String)): String = t._2
override def getKeyFromData(t: (String, String)): String = t._1
}
}
sumDstream.map( chCount=>(chCount._1,chCount._2+"" )).addSink(MyRedisUtil.getRedisSink())
Elasticsearch
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
import java.util
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
object MyEsUtil {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("hadoop1",9200,"http"))
httpHosts.add(new HttpHost("hadoop2",9200,"http"))
httpHosts.add(new HttpHost("hadoop3",9200,"http"))
def getElasticSearchSink(indexName:String): ElasticsearchSink[String] ={
val esFunc = new ElasticsearchSinkFunction[String] {
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("试图保存:"+element)
val jsonObj: JSONObject = JSON.parseObject(element)
val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
indexer.add(indexRequest)
println("保存1条")
}
}
val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)
//刷新前缓冲的最大动作量
sinkBuilder.setBulkFlushMaxActions(10)
sinkBuilder.build()
}
}
val esSink: ElasticsearchSink[String] = MyEsUtil.getElasticSearchSink("gmall0503_startup")
dstream.addSink(esSink)
上一篇: 队列的线性表实现
下一篇: 线性表、堆栈、队列的实现总结