kafka消费者持久化问题
如果kafka消费者没有关闭并且没有持续的调用poll,则会导致再使用这个消费者的时候会卡死。
但是持续调用poll就等于一直在消费,不能做到按需消费。也就是说要消费的时候重新创建一个消费者,不消费的时候一定要把消费者连接关闭,否则会有卡死消费现象产生。
经过一番代码研究,发现kafka消费者一个蛋疼的问题。kafka的消费者与coordinator的通信并不是自动线程实现的,而是在用户的每一次poll中实现的。(大坑!!!具体可以搜索kafka consumer rebalance)
既然找到原因了,那就写个线程把coordinator的通信单独抽出来定时实现。
可是可是。。源码里面出现了这么一个
private final ConsumerCoordinator coordinator;
WTF。
好吧。只能用黑科技了。继承KafkaConsumer写一个子类。
public class DcConsumerextends KafkaConsumer { protected ConsumerCoordinator DcCoordinator; public DcConsumer(Map configs) { super(configs); // TODO Auto-generated constructor stub } public DcConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { super(configs, keyDeserializer, valueDeserializer); // TODO Auto-generated constructor stub } public DcConsumer(Properties properties){ super(properties); // TODO Auto-generated constructor stub } public DcConsumer(Properties properties, Deserializer keyDeserializer, Deserializer valueDeserializer) { super(properties, keyDeserializer, valueDeserializer); // TODO Auto-generated constructor stub } public void coordinatorPoll() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, ClassNotFoundException{ if(this.DcCoordinator == null){ Class> classType = Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"); Field[] fields = classType.getDeclaredFields(); Field field = classType.getDeclaredField("coordinator"); field.setAccessible(true); this.DcCoordinator = (ConsumerCoordinator) field.get(this); } long now = System.currentTimeMillis(); this.DcCoordinator.poll(now); } }
然后,定时调用coordinatorPoll。这样就实现了消费者存活而且能按需消费了。