博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka Java API操作topic
阅读量:6942 次
发布时间:2019-06-27

本文共 1869 字,大约阅读时间需要 6 分钟。

Kafka官方提供了两个脚本来管理topic,包括topic的增删改查。其中kafka-topics.sh负责topic的创建与删除;kafka-configs.sh脚本负责topic的修改和查询,但很多用户都更加倾向于使用程序API的方式对topic进行操作。
 
上一篇文章中提到了如何使用客户端协议(client protocol)来创建topic,本文则使用服务器端的Java API对topic进行增删改查。
开始之前,需要明确的是,下面的代码需要引入kafka-core的依赖,以kafka 0.10.2版本为例:
Maven版本
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.0</version>
</dependency>
 
Gradle版本
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.10.2.0'
 
创建topic
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());// 创建一个单分区单副本名为t1的topicAdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);zkUtils.close();

删除topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());// 删除topic 't1'AdminUtils.deleteTopic(zkUtils, "t1");zkUtils.close();

比较遗憾地是,不管是创建topic还是删除topic,目前Kafka实现的方式都是后台异步操作的,而且没有提供任何回调机制或返回任何结果给用户,所以用户除了捕获异常以及查询topic状态之外似乎并没有特别好的办法可以检测操作是否成功。

查询topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());// 获取topic 'test'的topic属性属性Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");// 查询topic-level属性Iterator it = props.entrySet().iterator();while(it.hasNext()){    Map.Entry entry=(Map.Entry)it.next();    Object key = entry.getKey();    Object value = entry.getValue();    System.out.println(key + " = " + value);}zkUtils.close();

修改topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");// 增加topic级别属性props.put("min.cleanable.dirty.ratio", "0.3");// 删除topic级别属性props.remove("max.message.bytes");// 修改topic 'test'的属性AdminUtils.changeTopicConfig(zkUtils, "test", props);zkUtils.close();

转载地址:http://evinl.baihongyu.com/

你可能感兴趣的文章
python 转 exe -- py2exe库实录
查看>>
第 55 章 Cherokee
查看>>
iOS - Plist 数据解析
查看>>
sql 经常使用的语句(个人)
查看>>
日志管理之 Docker logs - 每天5分钟玩转 Docker 容器技术(87)
查看>>
查看Linux下的文件
查看>>
7mall:4种方法弥补店铺亮点不够多的产品
查看>>
SAP WM LRFMD中Variant参数的影响初探
查看>>
【Xamarin挖墙脚系列:多窗口之间的导航】
查看>>
JPA & Hibernate 注解
查看>>
android 读写sd卡的权限设置
查看>>
Android4: Write Storage权限问题
查看>>
9.9、Libgdx之软键盘
查看>>
LB 负载均衡的层次结构(转)
查看>>
JavaWeb-Servlet技术的监听器-解析与实例-网站在线用户信息与网页点击量
查看>>
【百度地图API】批量地址解析与批量反地址解析(带商圈数据)
查看>>
Flink内存管理源码解读之内存管理器
查看>>
libcurl,多线程,gzip,共享DNS
查看>>
如何通过SQL Server执行系统命令?
查看>>
Java——1个自动拆箱的例子
查看>>