Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析
程序员文章站
2022-06-25 10:01:07
简介通过 pulsar-flink-connector 读取到 apache pulsar 中的namespaces、topics的元数据信息。pulsar-flink-connector 的 git...
简介
通过 pulsar-flink-connector 读取到 apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github:
maven
<dependency> <groupid>io.streamnative.connectors</groupid> <artifactid>pulsar-flink-connector-2.11-1.12</artifactid> <version>2.7.3</version> </dependency> <!-- jar repositories --> <repositories> <repository> <id>central</id> <layout>default</layout> <url>https://repo1.maven.org/maven2</url> </repository> <repository> <id>bintray-streamnative-maven</id> <name>bintray</name> <url>https://dl.bintray.com/streamnative/maven</url> </repository> </repositories>
code
使用pulsarmetadatareader获取元数据
package com.levi.demo; import org.apache.flink.streaming.connectors.pulsar.internal.pulsarmetadatareader; import org.apache.pulsar.client.admin.pulsaradminexception; import org.apache.pulsar.client.impl.auth.authenticationtoken; import org.apache.pulsar.client.impl.conf.clientconfigurationdata; import org.apache.pulsar.common.schema.schemainfo; import org.apache.pulsar.common.schema.schematype; import java.io.ioexception; import java.util.hashmap; import java.util.list; import java.util.map; /** * test. * * @author levi * @version 1.0 **/ public class test { public static void main(string[] args) { final clientconfigurationdata configurationdata = new clientconfigurationdata(); configurationdata.setserviceurl("pulsar://127.0.0.1:6650"); //your pulsar token final authenticationtoken token = new authenticationtoken( "eyjxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); configurationdata.setauthentication(token); try (final pulsarmetadatareader reader = new pulsarmetadatareader("http://127.0.0.1:8443", configurationdata, "", new hashmap(), -1, -1)) { //获取namespaces final list<string> namespaces = reader.listnamespaces(); system.out.println("namespaces: " + namespaces.tostring()); for (final string namespace : namespaces) { //获取topics final list<string> topics = reader.gettopics(namespace); system.out.println("topic: " + topics.tostring()); for (string topic : topics) { //获取字段schemainfo final schemainfo schemainfo = reader.getpulsarschema(topic); final string name = schemainfo.getname(); system.out.println("schemaname:" + name); //topicname final schematype type = schemainfo.gettype(); system.out.println("schematype:" + type.tostring());// "json"... final map<string, string> properties = schemainfo.getproperties(); system.out.println(properties); final string schemadefinition = schemainfo.getschemadefinition(); system.out.println(schemadefinition); // field info. } } } catch (ioexception | pulsaradminexception e) { e.printstacktrace(); } } }
到此这篇关于java使用pulsar-flink-connector读取pulsar catalog元数据的文章就介绍到这了,更多相关java读取pulsar catalog元数据内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
上一篇: Python进行区间取值案例讲解