博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka - 消费接口分析
阅读量:7042 次
发布时间:2019-06-28

本文共 8953 字,大约阅读时间需要 29 分钟。

转自: 

 

1.概述


 

  在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。

2.内容


 

  在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:

  • 一个消息进行多次读取
  • 在处理过程中只消费 Partition 其中的某一部分消息
  • 添加事物管理机制以保证消息仅被处理一次

  当然,在使用的过程当中也是有些弊端的,其内容如下:

  • 必须在程序中跟踪 Offset 的值
  • 必须找出指定的 Topic Partition 中的 Lead Broker
  • 必须处理 Broker 的变动

  使用其 API 的思路步骤如下所示:

  • 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
  • 找出指定 Topic Partition 中的所有备份 Broker
  • 构造请求
  • 发送请求并查询数据
  • 处理 Leader Broker 的变动

3.代码实现


 

3.1 Java Project

  若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:

  • scala-xml_${version}-${version}.jar
  • scala-library-${version}.jar
  • metrics-core-${version}.jar
  • kafka-client-${version}.jar
  • kafka_${version}-${version}.jar

  针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。

3.2 Maven Project

  对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:

1 
2
org.apache.kafka
3
kafka_2.11
4
0.8.2.1
5
6
7
org.apache.zookeeper
8
zookeeper
9
10
11
log4j
12
log4j
13
14
15

这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。

3.3 代码实现

  在低级消费 API 中,实现代码如下所示:

  

1 public class SimpleKafkaConsumer {  2     private static Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer.class);  3     private List
m_replicaBrokers = new ArrayList
(); 4 5 public SimpleKafkaConsumer() { 6 m_replicaBrokers = new ArrayList
(); 7 } 8 9 public static void main(String[] args) { 10 SimpleKafkaConsumer example = new SimpleKafkaConsumer(); 11 // Max read number 12 long maxReads = SystemConfig.getIntProperty("kafka.read.max"); 13 // To subscribe to the topic 14 String topic = SystemConfig.getProperty("kafka.topic"); 15 // Find partition 16 int partition = SystemConfig.getIntProperty("kafka.partition"); 17 // Broker node's ip 18 List
seeds = new ArrayList
(); 19 String[] hosts = SystemConfig.getPropertyArray("kafka.server.host", ","); 20 for (String host : hosts) { 21 seeds.add(host); 22 } 23 int port = SystemConfig.getIntProperty("kafka.server.port"); 24 try { 25 example.run(maxReads, topic, partition, seeds, port); 26 } catch (Exception e) { 27 log.error("Oops:" + e); 28 e.printStackTrace(); 29 } 30 } 31 32 public void run(long a_maxReads, String a_topic, int a_partition, List
a_seedBrokers, int a_port) 33 throws Exception { 34 // Get point topic partition's meta 35 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); 36 if (metadata == null) { 37 log.info("[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting"); 38 return; 39 } 40 if (metadata.leader() == null) { 41 log.info("[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting"); 42 return; 43 } 44 String leadBroker = metadata.leader().host(); 45 String clientName = "Client_" + a_topic + "_" + a_partition; 46 47 SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 48 long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), 49 clientName); 50 int numErrors = 0; 51 while (a_maxReads > 0) { 52 if (consumer == null) { 53 consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 54 } 55 FetchRequest req = new FetchRequestBuilder().clientId(clientName) 56 .addFetch(a_topic, a_partition, readOffset, 100000).build(); 57 FetchResponse fetchResponse = consumer.fetch(req); 58 59 if (fetchResponse.hasError()) { 60 numErrors++; 61 // Something went wrong! 62 short code = fetchResponse.errorCode(a_topic, a_partition); 63 log.info("[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:" + leadBroker 64 + " Reason: " + code); 65 if (numErrors > 5) 66 break; 67 if (code == ErrorMapping.OffsetOutOfRangeCode()) { 68 // We asked for an invalid offset. For simple case ask for 69 // the last element to reset 70 readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), 71 clientName); 72 continue; 73 } 74 consumer.close(); 75 consumer = null; 76 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); 77 continue; 78 } 79 numErrors = 0; 80 81 long numRead = 0; 82 for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { 83 long currentOffset = messageAndOffset.offset(); 84 if (currentOffset < readOffset) { 85 log.info("[SimpleKafkaConsumer.run()] - Found an old offset: " + currentOffset + " Expecting: " 86 + readOffset); 87 continue; 88 } 89 90 readOffset = messageAndOffset.nextOffset(); 91 ByteBuffer payload = messageAndOffset.message().payload(); 92 93 byte[] bytes = new byte[payload.limit()]; 94 payload.get(bytes); 95 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); // Message deal enter 96 numRead++; 97 a_maxReads--; 98 } 99 100 if (numRead == 0) {101 try {102 Thread.sleep(1000);103 } catch (InterruptedException ie) {104 }105 }106 }107 if (consumer != null)108 consumer.close();109 }110 111 public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime,112 String clientName) {113 TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);114 Map
requestInfo = new HashMap
();115 requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));116 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,117 kafka.api.OffsetRequest.CurrentVersion(), clientName);118 OffsetResponse response = consumer.getOffsetsBefore(request);119 120 if (response.hasError()) {121 log.info("[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "122 + response.errorCode(topic, partition));123 return 0;124 }125 long[] offsets = response.offsets(topic, partition);126 return offsets[0];127 }128 129 /**130 * @param a_oldLeader131 * @param a_topic132 * @param a_partition133 * @param a_port134 * @return String135 * @throws Exception136 * find next leader broker137 */138 private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {139 for (int i = 0; i < 3; i++) {140 boolean goToSleep = false;141 PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);142 if (metadata == null) {143 goToSleep = true;144 } else if (metadata.leader() == null) {145 goToSleep = true;146 } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {147 // first time through if the leader hasn't changed give148 // ZooKeeper a second to recover149 // second time, assume the broker did recover before failover,150 // or it was a non-Broker issue151 //152 goToSleep = true;153 } else {154 return metadata.leader().host();155 }156 if (goToSleep) {157 try {158 Thread.sleep(1000);159 } catch (InterruptedException ie) {160 }161 }162 }163 throw new Exception("Unable to find new leader after Broker failure. Exiting");164 }165 166 private PartitionMetadata findLeader(List
a_seedBrokers, int a_port, String a_topic, int a_partition) {167 PartitionMetadata returnMetaData = null;168 loop: for (String seed : a_seedBrokers) {169 SimpleConsumer consumer = null;170 try {171 consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");172 List
topics = Collections.singletonList(a_topic);173 TopicMetadataRequest req = new TopicMetadataRequest(topics);174 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);175 176 List
metaData = resp.topicsMetadata();177 for (TopicMetadata item : metaData) {178 for (PartitionMetadata part : item.partitionsMetadata()) {179 if (part.partitionId() == a_partition) {180 returnMetaData = part;181 break loop;182 }183 }184 }185 } catch (Exception e) {186 log.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", "187 + a_partition + "] Reason: " + e);188 } finally {189 if (consumer != null)190 consumer.close();191 }192 }193 if (returnMetaData != null) {194 m_replicaBrokers.clear();195 for (kafka.cluster.Broker replica : returnMetaData.replicas()) {196 m_replicaBrokers.add(replica.host());197 }198 }199 return returnMetaData;200 }201 }

 

4.总结

  在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。

转载于:https://www.cnblogs.com/XQiu/p/5241464.html

你可能感兴趣的文章
mysql 表的增删改
查看>>
bash 快捷键
查看>>
2012年工作中遇到的20个问题
查看>>
我的友情链接
查看>>
Hibernate SQL查询 addScalar()或addEntity()
查看>>
android 4.2.2_r1 SDK的一个错误
查看>>
Shutdown the ldap server in the domino
查看>>
同一word文档如何纵版、横版混用
查看>>
我的友情链接
查看>>
开启远程控制
查看>>
Mysql 5.6.18解压包版在Rhel6.7上安装
查看>>
Redis参数汇总
查看>>
Objective-C底层数据结构
查看>>
TPYBoard开发板搭建WHID通道实现隐秘通信
查看>>
未来程序员都会丢了饭碗么?
查看>>
Flink内部计算之EventTime WaterMark-调研最终版(终结篇)---思考
查看>>
源码编译安装PHP7
查看>>
Windows7首次使用XenApp初始化操作手册
查看>>
ubuntu15.04安装vm11
查看>>
设计模式——状态模式(State Pattern)
查看>>