上篇说到kafka在window环境下的搭建之后,这篇我们就开始尝试写一个简单的producer和consumer来测试了
依次启动bin/windows下的zookeeper-start.bat和kafak-start.bat(这2个.bat是我自己为了方便启动而写的)。下面就开始测试了:kafak jar包版本:kafak_2.9.2-0.8.1.jar
Produce端:
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducerTest { String topic = "test"; public static void main(String[] args) { Properties props = new Properties(); // props.put("zookeeper.connect", "10.16.0.200:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async");//默认是sync props.put("compression.codec", "1"); props.put("metadata.broker.list", "127.0.0.1:9092"); ProducerConfig config = new ProducerConfig(props); Producer<String, Object> producer = new Producer<String, Object>(config); KeyedMessage<String, Object> message = new KeyedMessage<String, Object>("test", "hello world"); producer.send(message); }
其中ProducerConfig是Producer端的属性配置类,更多属性可参见kakfa的jar包反编译后的kafka.producer.ProducerConfig类,该类的属性定义里面有很多Producer端的必须或可选属性。上述代码中的属性不过
是必须要配置的属性而已。
注意kafak的jar包里会有2个Producer类,我们必须引用的是kafka.javaapi.producer包下面的才行。
KeyedMessage即表明消息。它的构造函数有以下几个:
public KeyedMessage(String topic, K key, Object partKey, V message) { Product.class.$init$(this); if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); } public KeyedMessage(String topic, V message) { this(topic, null, null, message); } public KeyedMessage(String topic, K key, V message) { this(topic, key, key, message); }
第一个参数是消息的topic,第二个参数是消息的内容
metadata.broker.list为Producer端配置的用于指定元数据节点的属性,节点与节点之间用,隔开。关于集群式节点的配置这里不再祥述。
Consumer端:
public class KafkaConsumerTest { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zookeeper.connect", "127.0.0.1:2181"); props.put("zookeeper.connectiontimeout.ms", "1000000"); props.put("group.id", "test_group"); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("test", new Integer(4)); //key--topic Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); StringBuffer sb = new StringBuffer(); while(it.hasNext()) { try { String msg = new String(it.next().message(), "utf-8").trim(); System.out.println("receive:" + msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
while(it.hasNext())循环里可以进一步改进成这样:
while (it.hasNext()) {
String msg= "";
byte[] packs= it.next().message();
InputStream is = new ByteArrayInputStream(bPack);
//如果阻塞了,也就是该流不可以被读取,那么ready()就返回false.
BufferedReader ioBuffRead = new BufferedReader(new InputStreamReader(is, charset));
while(ioBuffRead.ready()){
msg= ioBuffRead.readLine();
//接下来的代码略。。
}
}
同样注意kafak的jar包里会有2个Consumer类,我们必须引用的是kafka.javaapi.consumer包下面的才行;而关于Consumer端具体的属性配置的也可在ConsumerConfig类的属性定义下面找到。
Producer端的代码比较简单。我们需要好好理解的是Consumer端的代码——
1)ConsumerConnector类是消费端根据消费配置的连接类
2)topicCountMap为·topic的map,key为topic,value为该topic消息流的分区数
3)consumerMap为消费端的map,key为topic,value为该topic对应的消息队列,表现为一个List集合,头元素即为该队列的头部元素,就是为什么consumerMap.get("test").get(0);
的原因,该集合的大小是动态的,因为队列中有元素不停地进出;
4)ConsumerIterator为该topic消息流的迭代器,用于迭代从而取出里面的消息。
运行之后,会发现在Consumer端打印:hello world。你也可以在控制台运行命令查看该topic的消息。
至此,kafka的一个入门级生产-消费测试就完毕了。
在成功启动windows下的kafka或Linux版本的kafka之后,运行该简单的demo示例,你很可能会出现如下异常:
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:29)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
解决这个异常也是比较诡异,多亏F哥的帮助。这是由于你缺少必要的地址的dns解析,修改C盘下的hosts文件,加上:
服务器ip地址 服务器域名
即可。其中很有可能你访问windows本地的kafka也会出现该异常,则加上127.0.0.1 localhost即可
注:1个topic对应1个offset(消费内容的位置); 1个group.id可对应多个topic
最后再次衷心的感谢F哥的帮助!!!
相关推荐
入门 参考文件 有关更多参考,请考虑以下部分: 导游 以下指南说明了如何具体使用某些功能: 请按照以下步骤操作: 首先,您必须从官方网站下载Apache Kafka。 请注意! 您必须下载它的二进制版本。 如果您的...
Kafka Streams入门–针对Kafka主题构建流分析Java应用程序-https: NodeJS –随机延迟将消息发布到Apache Kafka主题以基于CSV文件中的记录生成示例事件-https: Apache Kafka研讨会–演示和动手实验入门-https: ...
kafka_2.10-0.8.2.1 动物园管理员 3.4.6 Java 1.8 集群运行步骤 卡夫卡集群 纱线簇 samza 部署/运行/测试 纱线簇 服务器 jps yarn1.alibaba.net 6817 名称节点7172 DFSZK故障转移控制器7404 资源管理器 yarn2....
为了开始使用Apache Kafka,请从找到快速入门手册。 完成这些操作后,我们就可以开始一个一个地理解Apache Kafka的概念了。 首先,Apache Kafka是一个分布式排队系统,这意味着所产生的消息将被发送出去,并进入...
您可以找到的其中包含一些示例以及Fluent Kafka Streams测试如何工作的详细说明。 入门 您可以通过Maven Central添加Fluent Kafka Streams测试。 Gradle compile group : ' ...
您将在此处找到有关如何构建连接器以及如何运行Kafka和Zeebe的信息,以快速入门: 连接器 该插件带有两个连接器,即源连接器和接收器连接器。 源连接器激活Zeebe作业,将其发布为Kafka记录,并在将它们提交给Kafka...
使用HiveMQ,Apache Kafka和TensorFLow从100000个IoT设备进行大规模流式机器学习如果您只是想开始并在几分钟内快速启动演示,请转到入门以设置基础结构(在GCP上)并运行演示。 您还可以通过现场演示观看20分钟的...
入门学习示例 项目名 所属组件 介绍 MapReduce MapReduce 实验 - 计算气温 最大/最小/平均 值 HDFS HDFS Java API 增删查改 HDFS Timer 定时将日志文件备份到 HDFS 中去,copyFromLocalFile MapReduce MapReduce ...
旨在为用户提供一个快速入门的环境和示例,让用户快速了解 PyFlink 的特性。 Playgrounds 设置环境与 docker-compose 并集成了 PyFlink、Kafka、Python 以方便体验。 当前的 Playgrounds 示例基于最新的 PyFlink ...
入门: 在 src/main/java/com/hortonworks/streaming/impl 下查看 在topologies目录下我们可以看到Storm拓扑(基类和实现类) 在螺栓下,您可以看到 HDFS(带旋转)、hive 和 solr 的螺栓 StormKafka方案在kafka...
该存储库包含一个示例应用程序,该应用程序演示了如何使用CQRS架构样式来实现基于事件的系统。 该解决方案使用Apache Kafka,我们可以轻松地将其集成到基于Spring Boot的应用程序中,该应用程序使用 (2.6.5),...
Java生态系统Java生态系统索引去做模块生态利率限制者通用限流的实现生态算法整合数据结构和算法的实现,工作和学习中用到的算法也会放在这里 Java项目Sprint Boot通过示例学习Spring Boot。 Spring Boot教程,技术...
MapR Streams入门MapR Streams是MapR融合数据平台的核心组件。 它是用于存储和访问实时数据流的分布式框架。... 此项目中提供了以下示例,以显示通过MapR流流式传输JSON和Java对象的常见模式: 流纯文本消息使用A
独立快速入门注意:您必须安装才能运行该示例。配置基础首先,您需要为连接器指定配置设置。 这些可以在kcbq-connector/quickstart/properties/connector.properties文件中找到。 查找本节: ######################...
入门 使用HDP 2.2.4,或在pom中相应地调整版本字符串。 在kafka客户端上,运行安装脚本以创建syslog_events主题。 配置flume以将syslog事件推送到该主题。 从资源创建config.properties的副本,并将主机指向群集...
《Hadoop大数据挖掘从入门到进阶实战》 内容简介 ...最后一章对Hadoop进行了拓展,剖析了Kafka消息系统并介绍了笔者的开源监控系统Kafka Eagle。 本书结构清晰、案例丰富、通俗易懂、实用性强。特别适合
使用 Java / Scala 运行 spark Jobs 单元测试 Spark 1.0 版本前的应用程序迁移 下一步 Spark Streaming Spark Streaming 概述 一个简单的示例 基本概念 依赖 初始化 StreamingContext Discretized ...
先决条件您需要什么来运行该软件: Java 8+总览示例应用程序基于提供信用卡的简单域。 有两个用例: 可以从卡中提款(提款命令) 可以读取卡中的取款清单(查询) 重要的是: After a successful Withdraw command, ...
它可以从批处理数据源(例如Hadoop HDFS,Amazon S3,Azure ADLS,Google Cloud Storage)以及流数据源(例如Apache Kafka)中提取。 Pinot由LinkedIn和Uber的工程师打造,旨在无上限地扩展和扩展。 根据群集的...