欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink - Sink

程序员文章站 2022-07-14 14:01:46
...

Flink - Sink

在说sink前,我们提一句flink数据传输重分区,也是DataStream内所拥有方法。

  • shuffle:设置DataStream的分区,以便输出元素随机地均匀地移至下一个操作。
  • keyby:创建一个新的KeyedStream,使用提供的key进行分区
  • global:设置DataStream的分区,以便输出值全部转到下一个处理运算符的第一个实例。用这个请小心设置,因为这可能会导致严重的性能瓶颈在应用程序中.
  • rebalance:设置DataStream的分区,以便输出元素以轮询方式平均分配给下一个操作的实例

Sink

Flink没有类似spark中foreach方法,让用户进行迭代操作。虽有对外的输出操作都要利用sink完成。最后通过类似方式完成整个任务最终输出操作:

stream.addSink(new MySink(xxx))

官方提供了一部分框架的sink,也可自定义实现sink

1.10版本提供的sink

  • Apache Kafka(source/sink)
  • Apache Cassandra(sink)
  • Amazom Kinesis Streams (source/sink)
  • Elasticsearch(sink)
  • Hadoop FileSystem(sink)
  • RabbitMQ(source/sink)
  • Apache NiFi(source/sink)
  • Twitter Streaming API(source)

三方框架(Apache Bahir)提供的sink

  • Apache ActiveMQ(source/sink)
  • Apache Flume(sink)
  • Redis(sink)
  • Akka(sink)
  • Netty(source)

Flink-1.12.0提供的sink

案例中变量值

<flink.version>1.12.0</flink.version>		
<scala.binary.version>2.11</scala.binary.version>

Kafka

flink - kafka 依赖坐标导入

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

案例代码 -> 基于Flink 1.12.0版本

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> filedata = env.readTextFile("data/temps.txt");

        DataStream<String> mapDataStream = filedata.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] split = value.split(",");
                return new TempInfo(split[0],new Long(split[1]),new Double(split[2])).toString();
            }
        });

        mapDataStream.addSink(new FlinkKafkaProducer<String>("localhost:9092","topicName",new SimpleStringSchema()));
        
        env.execute();

Redis

flink - redis 依赖坐标导入

<dependency>
			<groupId>org.apache.bahir</groupId>
			<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
			<version>1.0</version>
		</dependency>

案例代码 -> 基于Flink 1.12.0版本

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> filedata = env.readTextFile("data/temps.txt");

        DataStream<TempInfo> dataStream = filedata.map(new MapFunction<String, TempInfo>() {
            @Override
            public TempInfo map(String value) throws Exception {
                String[] split = value.split(",");
                return new TempInfo(split[0],new Long(split[1]),new Double(split[2]));
            }
        });

        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost").setPort(6379).build();

        dataStream.addSink(new RedisSink<>(config, new RedisMapper<TempInfo>() {

            // 定义保存数据到Redis的命令,存成hash表, hset,
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
            }

            @Override
            public String getKeyFromData(TempInfo tempInfo) {
                return tempInfo.getId();
            }

            @Override
            public String getValueFromData(TempInfo tempInfo) {
                return tempInfo.getTemp().toString();
            }
        }));
        
        env.execute();

ElasticSearch

flink - ElasticSearch 依赖坐标导入

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

案例代码 -> 基于Flink 1.12.0版本

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<String> filedata = env.readTextFile("data/temps.txt");

        DataStream<String> dataStream = filedata.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] split = value.split(",");
                return new TempInfo(split[0],new Long(split[1]),new Double(split[2])).toString();
            }
        });


        Map<String, String> config = new HashMap<>();
        config.put("cluster.name", "my-cluster-name");
        // This instructs the sink to emit after every element, otherwise they would be buffered
        config.put("bulk.flush.max.actions", "1");

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
        httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
		// 构建esSinkBuilder
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        // 定义写入的数据源
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        // 创建请求,向es发送写入命令
                        return Requests.indexRequest()
                                .index("my-index")
                                .type("my-type")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        // 用index发送请求
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        dataStream.addSink(esSinkBuilder.build());

JDBC

flink - jdbc 依赖坐标导入(flink-1.11版本后有Flink官方提供的链接)

	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

案例代码 -> 基于Flink 1.12.0版本

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> filedata = env.readTextFile("data/temps.txt");

        DataStream<TempInfo> dataStream = filedata.map(new MapFunction<String, TempInfo>() {
            @Override
            public TempInfo map(String value) throws Exception {
                String[] split = value.split(",");
                return new TempInfo(split[0],new Long(split[1]),new Double(split[2]));
            }
        });

        dataStream.addSink(JdbcSink.sink(
                "insert into temps (id, timestamp, temp) values (?,?,?)",
                (ps, t) -> {
                    ps.setString(1, t.getId());
                    ps.setDouble(2, t.getTimeStamp());
                    ps.setDouble(3, t.getTemp());
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql//locahost:3306/dbname")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .build()));
        env.execute();
相关标签: flink flink