Pulsar Admin API 操作 Source
程序员文章站
2024-03-11 17:33:01
...
序言
以监控 mysql 数据为例,简单写了一个使用 Pulsar Admin API 来操作 Source 的 Demo
一、POM 文件
添加以下依赖
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client-admin -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>2.8.0</version>
</dependency>
二、Demo
pulsar-io-debezium-mysql-2.8.0.nar
下载地址:https://archive.apache.org/dist/pulsar/pulsar-2.8.0/connectors/
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class PulsarAdminDemo {
private static final String serviceHttpUrl = "http://node1:8081,node2:8081,node3:8081/";
private static final String tenant = "public";
private static final String namespace = "default";
private static final String sourceName = "debezium-mysql-source";
public static void main(String[] args){
PulsarAdmin admin = null;
try {
admin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
Sources sourceAdmin = admin.sources();
// 创建 source
createPulsarSource(sourceAdmin);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (admin != null) {
admin.close();
}
}
}
// 创建 source
public static void createPulsarSource(Sources sourceAdmin) throws PulsarAdminException {
Map<String, Object> configs = Utils.getSourceConfigs();
SourceConfig sourceConfig = SourceConfig.builder()
.archive("pulsar-io-debezium-mysql-2.8.0.nar")
.className("org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource")
.name(sourceName)
.topicName("debezium-mysql-topic")
.tenant(tenant)
.namespace(namespace)
.parallelism(1)
.configs(configs)
.build();
// createSource(SourceConfig sourceConfig, String fileName)
// fileName 为本地文件名
sourceAdmin.createSource(sourceConfig,"src/main/resources/pulsar-io-debezium-mysql-2.8.0.nar");
}
// 获取 source 运行状态
public static Boolean getPulsarSourceStatus(Sources sourceAdmin, String tenant, String namespace, String sourceName) {
SourceStatus sourceStatus = null;
SourceStatus.SourceInstanceStatus sourceInstanceStatus = null;
try {
sourceStatus = sourceAdmin.getSourceStatus(tenant, namespace, sourceName);
sourceInstanceStatus = sourceStatus.instances.get(0);
} catch (PulsarAdminException e) {
e.printStackTrace();
} finally {
if (sourceInstanceStatus != null) {
return sourceInstanceStatus.status.running ? true : false;
}
return false;
}
}
// 删除 source
public static Boolean deletePulsarSource(Sources sourceAdmin, String tenant, String namespace, String sourceName) {
Boolean flag = true;
try {
sourceAdmin.deleteSource(tenant,namespace,sourceName);
} catch (PulsarAdminException e) {
flag = false;
e.printStackTrace();
}
return flag;
}
// 启动 source
public static Boolean startPulsarSource(Sources sourceAdmin, String tenant, String namespace, String sourceName) {
Boolean flag = true;
try {
sourceAdmin.startSource(tenant,namespace,sourceName);
} catch (PulsarAdminException e) {
flag = false;
e.printStackTrace();
}
return flag;
}
// 暂停 source
public static Boolean stopPulsarSource(Sources sourceAdmin, String tenant, String namespace, String sourceName){
Boolean flag = true;
try {
sourceAdmin.stopSource(tenant,namespace,sourceName);
} catch (PulsarAdminException e) {
flag = false;
e.printStackTrace();
}
return flag;
}
}
import java.io.IOException;
import java.util.*;
public class Utils {
public static Map<String, Object> getSourceConfigs(){
Properties properties = new Properties();
Map<String, Object> configs = null;
try {
properties.load(Utils.class.getClassLoader().getResourceAsStream("configs.properties"));
configs = setSourceConfigs(properties);
} catch (IOException e) {
e.printStackTrace();
}
return configs;
}
private static Map<String,Object> setSourceConfigs(Properties properties){
Map<String, Object> map = new HashMap<>();
Set<Map.Entry<Object, Object>> entrySet = properties.entrySet();
Iterator<Map.Entry<Object, Object>> ite = entrySet.iterator();
while (ite.hasNext()) {
Map.Entry<Object, Object> next = ite.next();
map.put(next.getKey().toString(), next.getValue());
}
return map;
}
}
# resources 下的 configs.properties 文件
database.hostname=192.168.2.174
database.port=3306
database.serverTimezone=GMT+8
database.user=root
database.password=root123
database.server.id=174
database.server.name=dbserver1
database.whitelist=test
table.whitelist=test.t_user
database.history=org.apache.pulsar.io.debezium.PulsarDatabaseHistory
database.history.pulsar.topic=my-history-topic
database.history.pulsar.service.url=pulsar://node1:6650,node2:6650,node3:6650
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
pulsar.service.url=pulsar://node1:6650,node2:6650,node3:6650
offset.storage.topic=offset-topic
如有问题,欢迎一起交流讨论。