kafka的listeners配置错误导致主线程阻塞
问题背景 我们在用kafka的时候,偶尔会遇到这样这样一个问题。
我们写的kafka的客户端程序,在启动的时候,会无缘无故的 卡住(阻塞)
如下图所示:
这时程序会长时间阻塞在这里,无法继续进行后续操作。
问题排查 因为日志没有任何报错信息,但是又可以肯定当前项目并没有完全启动成功。感觉像是程序当中有个地方卡到了。通过 VisualVM
工具dump 线程相关的信息,很快发现了问题所在。原来卡在了consumer初始化的地方。
一下是我这边的处理方式,大家可以参考下。如果有更好的方式欢迎大家相互交流。
以下方法是在初始化Consumer的时候进行处理的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public KafkaConsumerImpl init () { if (group == null || group.isEmpty()) { throw new RuntimeException ("phoenix.mq.group is empty" ); } Properties props = new Properties (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, namesrvAddr); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); consumer = new KafkaConsumer <>(props); topicPartition = new TopicPartition (this .topic, this .partitionId); this .partitions = Collections.singletonList(topicPartition); Callable<Boolean> call = new Callable <Boolean>() { boolean res = false ; int tryTimes = 3 ; @Override public Boolean call () throws Exception { while (tryTimes-- > 0 ) { try { consumer.assign(partitions); nextBeginOffset = consumer.position(topicPartition); res = true ; break ; } catch (Exception e) { if (e instanceof InterruptedException) { break ; } LOG.error(e.getMessage(), e); LOG.error(" ==> error when trying to fetch metadata for kafka. topic<{}>, partition<{}>" , topic, partitionId); } try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } return res; } }; FutureTask<Boolean> task = new FutureTask <>(call); new Thread (task).start(); boolean isOk = false ; try { isOk = task.get(10000 , TimeUnit.MILLISECONDS); } catch (Exception e) { LOG.error("Get task result timeout" , e); } task.cancel(true ); if (isOk) { LOG.info(" ==> init kafka consumer succeed: servers<{}>, topic<{}>, partition<{}>, nextBeginOffset<{}>" , namesrvAddr, topic, partitionId, nextBeginOffset); } else { throw new RuntimeException (String.format( " ==> init kafka consumer failed. please check the conf (listeners or advertised.listeners or ...) and try to ping the host name in the conf value" )); } return this ; }
利用 FutureTask 的特性,定义一个定时任务, 在初始化Consumer的时候,尝试去连接kafka,如果配置的kafka的地址有误,或者配置出错在这里可以通过抛出错误体现出来。
最后通过task.get()
方法返回的结果来判断 Consumer 是否成功初始化。