You set up Kafka, everything works. It’s time to make it REALLY work. Kafka is made to handle huge load of data, but good luck with that if you have a single thread. So let’s make it multithreaded!

As previously, code is based on spring-kafka version 2.1.0.RELEASE

We will base on code from the previous post, where we created usual consumer, which was ConcurrentMessageListenerContainer container. It was Concurrent because we could do it easily. Concurrency for free is pretty cool, right?

Let’s see how it works. Start with adding log message to consumer:

System.out.println(
  "I received message on thread: " + Thread.currentThread().getName());

Let’s run test, our message consumer prints:

I received message on thread: -C-1  

Cool story bro. It’s time for a new test which sends more messages - make it 10:

IntStream.range(0, 10)
    .forEach(i -> sendMessageToKafka("message", CONSUMER_TOPIC));

We are using ConcurrentMessageListenerContainer so there should be many different threads:

I received message on thread: -C-1  
I received message on thread: -C-1  
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1

Disappointing… What’s the point of Concurrent if it uses one thread? Of course we forgot about proper configuration. By default ConcurrentMessageListenerContainer uses just one thread. Ok, make it 10 then.

container.setConcurrency(10);

Logs after the change:

I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1

Ok, ConcurrentMessageListenerContainer is definitely broken. We set it to use 10 different threads and it still uses the same thread.

These are the moments you feel so desperate you actually start reading (java)docs carefully. And javadoc for ConcurrentMessageListenerContainer#setConcurrency says:

The maximum number of concurrent KafkaMessageListenerContainer running. Messages from within the same partition will be processed sequentially.

Ooohhhhh. So one topic partition means one thread. We need more partitions than one. Partitions are set in EmbeddedKafka and indeed at some point it might have been set like this:

new KafkaEmbedded(1, false, 1, CONSUMER_TOPIC, PRODUCER_TOPIC);

second ‘1’ means number of partitions so go ahead, let’s make it 10. Yeah, that’s definitely the reason it didn’t work. Run test again aaaaaand:

I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1
I received message on thread: -C-1

WTF? This should work like a charm. Maybe thread names are duplicated? Let’s print each thread’s hashcode just to be sure:

System.out.println(
  "I received message on thread: " + Thread.currentThread().getName() +
  " hashcode: " + Thread.currentThread().hashCode());
I received message on thread: -C-1 hashcode: 719080551
I received message on thread: -C-1 hashcode: 74028355
I received message on thread: -C-1 hashcode: 76919768
I received message on thread: -C-1 hashcode: 2001838603
I received message on thread: -C-1 hashcode: 1068336330
I received message on thread: -C-1 hashcode: 74028355
I received message on thread: -C-1 hashcode: 1068336330
I received message on thread: -C-1 hashcode: 74028355
I received message on thread: -C-1 hashcode: 1068336330

That’s a relief… to some extent. Somehow these are all different threads but have the same name?

At that point I just tried setting bean name using method AbstractMessageListenerContainer#setBeanName:

container.setBeanName("NAME");

and behold! Immediately in logs produced by Kafka a lot more different threads are visible. And the most interesting part:

I received message on thread: NAME-6-C-1
I received message on thread: NAME-2-C-1
I received message on thread: NAME-7-C-1
I received message on thread: NAME-0-C-1
I received message on thread: NAME-0-C-1
I received message on thread: NAME-2-C-1
I received message on thread: NAME-1-C-1
I received message on thread: NAME-1-C-1
I received message on thread: NAME-1-C-1

Finally!

FINALLY! I have no idea why Kafka Spring authors decided to put thread number ONLY if beanName is set. Important thing is, it works.

That’s for consuming messages. And producing them back to Kafka? It’s much simpler but also - it depends. In most cases you probably don’t need to do much. KafkaTemplate - which sends messages - is a wrapper for KafkaProducer and it is thread safe. Below javadoc for KafkaProducer:

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

So if you are already working on different threads - like you would do if you receive messages via http - you can just send message to Kafka from the current thread. Problem solved.

Both multithreaded producer (described case with http endpoint) and multithreaded consumer you can check on my sample code on Github: https://github.com/douevencode/spring-kafka-multithreading-sample