kafka消费者持久化问题

如果kafka消费者没有关闭并且没有持续的调用poll,则会导致再使用这个消费者的时候会卡死。

但是持续调用poll就等于一直在消费,不能做到按需消费。也就是说要消费的时候重新创建一个消费者,不消费的时候一定要把消费者连接关闭,否则会有卡死消费现象产生。

经过一番代码研究,发现kafka消费者一个蛋疼的问题。kafka的消费者与coordinator的通信并不是自动线程实现的,而是在用户的每一次poll中实现的。(大坑!!!具体可以搜索kafka consumer rebalance)

既然找到原因了,那就写个线程把coordinator的通信单独抽出来定时实现。

可是可是。。源码里面出现了这么一个

private final ConsumerCoordinator coordinator;

WTF。

好吧。只能用黑科技了。继承KafkaConsumer写一个子类。

public class DcConsumer extends KafkaConsumer{
	protected  ConsumerCoordinator DcCoordinator;
	public DcConsumer(Map configs) {
		super(configs);
		// TODO Auto-generated constructor stub
	}
	
	public DcConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) {
		super(configs, keyDeserializer, valueDeserializer);
		// TODO Auto-generated constructor stub
	}

	public DcConsumer(Properties properties){
		super(properties);
		// TODO Auto-generated constructor stub
	}

	public DcConsumer(Properties properties, Deserializer keyDeserializer, Deserializer valueDeserializer) {
		super(properties, keyDeserializer, valueDeserializer);
		// TODO Auto-generated constructor stub
	}
	
	public void coordinatorPoll() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, ClassNotFoundException{
		if(this.DcCoordinator == null){
			Class classType = Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer");
			Field[] fields = classType.getDeclaredFields();
			Field field = classType.getDeclaredField("coordinator");
			field.setAccessible(true);
			this.DcCoordinator = (ConsumerCoordinator) field.get(this);
		}
		long now = System.currentTimeMillis();
		this.DcCoordinator.poll(now);
	}
}

然后,定时调用coordinatorPoll。这样就实现了消费者存活而且能按需消费了。


发表评论