Documentation for spring-kafka-test is not the best. It’s not the worst neither, but sometimes it can be quite painful to use EmbeddedKafkaServer. More test samples would come handy, because I wasted so much time with simple test that should just work.

Quick note - I root for this project, because why not? Open-source FTW, I appreciate all hard work put in there. However, for combination of Spring and Kafka, I expected smaller learning curve.

I was doing Kafka magic on server side. Running standalone instance of Kafka is pretty easy - https://kafka.apache.org/quickstart - explained well and to do some basic stuff you need to go through very simple steps. You are not lost, you are led carefully. Maybe this quickstart could be done better, because some things which IMHO should be very basic - like creating topic - require long commands. But that’s minor issue, the most important thing - it works!

Then I started to look into spring-kafka and tried to do something basic - let my consumer receive messages. It was not flawless but after some time I had Kafka consumer receiving messages from command line. Great success!

Then - as natural order - I looked into spring-kafka-test. I wasn’t really sure of my Kafka skills so I started with some simple integration tests to see if my initial concept works. Real business logic could fit into a few lines, so writing tests was not hard at all. But making them pass was pain in the ass (rhyme intended).

Making it work at the beginning was not that bad. Making some tests pass was not that bad. But making all tests pass was impossible. It turned out that tests depend on each other. How? After spending a few days and trying different approaches I just gave up. I hate doing this but there are more interesting things to do and I finally wanted to just make it work.

Let’s get to code. I have very basic model for Consumer, which says which topic should be listened to and what should be done when there is a message on this topic.

public static class Consumer {
    private final String topic;
    private final MessageConsumer messageConsumer;

    public Consumer(String topic, MessageConsumer messageConsumer) {
        this.topic = topic;
        this.messageConsumer = messageConsumer;
    }
}

MessageConsumer specifies what to do with message, it’s simple interface:

public interface MessageConsumer {
    void consume(String message);
}

And controller, which makes consumer really work with Kafka:

public class KafkaController {

    public KafkaController(String brokerAddress, Consumer... consumers) {
        for (Consumer consumer : consumers) {
            addTopicListener(brokerAddress, consumer.topic, consumer.messageConsumer);
        }
    }

    private void addTopicListener(String brokerAddress, String topic, MessageConsumer consumer) {
        HashMap<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
                consumerConfig,
                new StringDeserializer(),
                new StringDeserializer());

        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener((MessageListener<String, String>) data -> {
            consumer.consume(data.value());
        });
        ConcurrentMessageListenerContainer container =
            new ConcurrentMessageListenerContainer<>(cf, containerProperties);
        container.start();
    }
}

BTW with such long names of classes like ConcurrentMessageListenerContainer you really miss Kotlin’s type inference…

This code should be quite easy to understand. And it even works! But when I wanted to test it, I failed. So I was simplifying my tests to really get into the core problem. Finally I got this:

public class KafkaControllerTest {

    private static final String FOO_TOPIC = "fooTopic";

    @Rule
    public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, FOO_TOPIC);

    MessageConsumer msgConsumerFoo = mock(MessageConsumer.class);

    @Before
    public void before() {
        assertTrue(embeddedKafka.getBrokerAddresses().length == 1);
        KafkaController controller = new KafkaController(
                embeddedKafka.getBrokerAddress(0).toString(),
                new Consumer(FOO_TOPIC, msgConsumerFoo));
        MockMvcBuilders.standaloneSetup(controller).build();
    }

    @Test
    public void kafkaFirstTest() throws Exception {
        sendMessage(FOO_TOPIC, "foo message");

        verify(msgConsumerFoo).consume(any());
    }

    @Test
    public void kafkaSecondTest() throws Exception {
        sendMessage(FOO_TOPIC, "foo2 message");

        verify(msgConsumerFoo).consume(any());
    }

    void sendMessage(String topic, String notification) throws ExecutionException, InterruptedException {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
        Producer<Integer, String> producer = new DefaultKafkaProducerFactory<Integer, String>(props).createProducer();
        producer.send(new ProducerRecord<>(topic, notification)).get();
    }
}

You have one topic, for every test you create EmbeddedKafkaServer and start controller listening to this topic. There is nothing complicated about that. But here we are, this test suite it not passing even though every test run separately passes… The worst thing that can happen.

Solution

I tried sending messages in different way (there are a few ways to do that) I tried many different settings with EmbeddedKafkaServer, still nothing. To make it short, possible solutions are:

  1. Adding Thread.sleep() before and after sendMessage() method:
    public void kafkaSecondTest() throws Exception {
        Thread.sleep(1000);
        sendMessage(FOO_TOPIC, "foo2 message");
        Thread.sleep(1000);
        verify(msgConsumerFoo).consume(any());
    }
    

    Yes, this sucks.

  2. Somehow getting reference to your Container (in my case ConcurrentMessageListenerContainer) and using method ContainerTestUtils.waitForAssignment() with this container as argument. It will make sure that container is properly initialised.
  3. While creating Kafka Consumer, set appropriate value for configuration auto.offset.reset, aka org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_CONFIG. Its doc is pretty descriptive but read it carefully, because me and my colleague totally misunderstood it anyway:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

Initially I thought earliest setting would be stupid - if consumer breaks, do you really want to process all saved messages again? But then I read more into it and it means that only if current offset is not saved, then you will process all saved messages again. And current offset is kept in Zookeeper so situation when current offset is not saved should be extremely rare, just restarting consumer is not enough.

Thus, this setting makes sense and - in my opinion - is the best solution for the problem. It makes test pass. If you insist on having different setting in your code, just make auto.offset.reset configurable, that’s all.

I hope this saves you a lot of time as it would save me.