You set up Kafka, everything works. It’s time to make it REALLY work. Kafka is made to handle huge load of data, but good luck with that if you have a single thread. So let’s make it multithreaded!
As previously, code is based on spring-kafka version 2.1.0.RELEASE
We will base on code from the previous post, where we created usual consumer, which was ConcurrentMessageListenerContainer container
. It was Concurrent
because we could do it easily. Concurrency for free is pretty cool, right?
Let’s see how it works. Start with adding log message to consumer:
Let’s run test, our message consumer prints:
Cool story bro. It’s time for a new test which sends more messages - make it 10:
We are using ConcurrentMessageListenerContainer
so there should be many different threads:
Disappointing… What’s the point of Concurrent
if it uses one thread? Of course we forgot about proper configuration. By default ConcurrentMessageListenerContainer
uses just one thread. Ok, make it 10 then.
Logs after the change:
Ok, ConcurrentMessageListenerContainer
is definitely broken. We set it to use 10 different threads and it still uses the same thread.
These are the moments you feel so desperate you actually start reading (java)docs carefully. And javadoc for ConcurrentMessageListenerContainer#setConcurrency
says:
The maximum number of concurrent KafkaMessageListenerContainer running. Messages from within the same partition will be processed sequentially.
Ooohhhhh. So one topic partition means one thread. We need more partitions than one. Partitions are set in EmbeddedKafka and indeed at some point it might have been set like this:
second ‘1’ means number of partitions so go ahead, let’s make it 10. Yeah, that’s definitely the reason it didn’t work. Run test again aaaaaand:
WTF? This should work like a charm. Maybe thread names are duplicated? Let’s print each thread’s hashcode just to be sure:
That’s a relief… to some extent. Somehow these are all different threads but have the same name?
At that point I just tried setting bean name using method AbstractMessageListenerContainer#setBeanName
:
and behold! Immediately in logs produced by Kafka a lot more different threads are visible. And the most interesting part:
FINALLY! I have no idea why Kafka Spring authors decided to put thread number ONLY if beanName
is set. Important thing is, it works.
That’s for consuming messages. And producing them back to Kafka? It’s much simpler but also - it depends.
In most cases you probably don’t need to do much. KafkaTemplate
- which sends messages - is a wrapper for KafkaProducer
and it is thread safe. Below javadoc for KafkaProducer
:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
So if you are already working on different threads - like you would do if you receive messages via http - you can just send message to Kafka from the current thread. Problem solved.
Both multithreaded producer (described case with http endpoint) and multithreaded consumer you can check on my sample code on Github: https://github.com/douevencode/spring-kafka-multithreading-sample