프로그래밍/Java

reactor-kafka version up 이슈 (No subscriptions have been created)

seungdols 2023. 4. 11. 18:09

이관 받은 코드에서 reactor-kafka 버전을 1.2.5.RELEASE 버전을 쓰고 있었는데, 버전 업그레이드를 하면서 겪은 이슈는 다음과 같았다.

2023-04-10 18:17:36.601 ERROR 1 --- [ard.processor-1] r.k.r.internals.ConsumerEventLoop : Unexpected exception  



java.lang.IllegalStateException: No subscriptions have been created  

at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:515)  

at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:241)  

at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)  

at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)  

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)  

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)  

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

왜 구독하지 못하는 것일까? 궁금한 부분이었는데, 특정 부분에서 버전에 따라 토픽 정보가 설정 되고, 1.3.2 버전에서는 설정이 안된다.

        final ReceiverOptions<String, V> options = ReceiverOptions.create(props);
        options.subscription(Collections.singleton(topicName));

위와 같은 코드였는데, 처음에 문제의 원인은 Collections.singletonList로 바꿔보자 였는데, 동일하게 안됐다.
Reactor의 개념이 불변이라, 설정값 또한 인스턴스가 새로 생성 되기 때문으로 보인다.

Collections.singleton or Collections.singletonList를 쓰는 것은 해당 이슈와 무관하다.

추측한 부분과 새로운 버전의 차이를 알고보니 원인은 이렇다.
1.2.7 이하에는 ReceiverOptions 인터페이스를 구현하는 구현체는 총 두개다.

  • MutableReceiverOptions
    • Configuration properties for Reactive Kafka KafkaReceiver and its underlying KafkaConsumer.
    • deprecated in favor of ImmutableReceiverOptions and will be deleted in 3.x version
  • ImmutableReceiverOptions

결론적으로 1.3.0 이후부터 MutableReceiverOptions 해당 구현체가 삭제 됐다.
as-is

        final ReceiverOptions<String, V> options = ReceiverOptions.create(props);
        options.subscription(Collections.singleton(topicName));

to-be

        ReceiverOptions<String, V> receiverOptions = ReceiverOptions.create(props);
        ReceiverOptions<String, V> options = receiverOptions.subscription(Collections.singleton(topicName));
        options.withValueDeserializer(valueDeserializer);

이렇게 코드를 변경 해주면 정상적으로 잘 된다.

반응형