Post pic - get it? Because he is Franz Kafka!

Lame jokes aside, we’ve already talked about Kafka testing and why I don’t like annotations. So let’s combine those two things and talk about how to make “Spring for Apache Kafka” work but without annotations?

First of all, why don’t I use just plain Kafka library for Java? Because it’s a bit too “raw”. In samples for consuming messages infinite while(true) loop is used, which seems a bit weird (or maybe I haven’t seen enough production code that do that?).

Spring-kafka, as most Spring-related libraries, likes annotations. Fortunately, docs include both approaches - plain Java code and annotations, so it’s not that bad.

Also, I went for “Spring for Apache Kafka” in hope of easier configuration. And while I do complain about EmbeddedKafka, setting up consumer and producer was fairly painless process. Docs are pretty good. Maybe they could be structured differently (sections order is a bit weird), but that’s just my personal opinion.

Let’s get to code. How do I consume message from Kafka?

Code is based on spring-kafka version 2.1.0.RELEASE

// Create configuration for your consumer. This is most basic configuration,
// you probably want to add something more, like 'auto.offset.reset',
// see: https://kafka.apache.org/documentation/#consumerconfigs
Map<String, Object> consumerConfig = ImmutableMap.of(
        BOOTSTRAP_SERVERS_CONFIG, "http://localhost:50523",
        GROUP_ID_CONFIG, "groupId"
);

// create KafkaConsumerFactory which adds information about parsing key and value
// you could also do it in config but this is compile-safe approach
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
        new DefaultKafkaConsumerFactory<>(
                consumerConfig,
                new StringDeserializer(),
                new StringDeserializer());

// you also need container which has info about topic and what to do with messages
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(
                (MessageListener<String, String>) record -> consumedMessages.add(record.value()));

// blend container with full consumer configuration and you get...
// MessageListenerContainer!
ConcurrentMessageListenerContainer container =
        new ConcurrentMessageListenerContainer<>(
                kafkaConsumerFactory,
                containerProperties);

// ... which you can finally start()
container.start();

Not that bad, right? You create consumer configuration and KafkaConsumerFactory where you configure everything: addresses, parsing etc. Then you create ContainerProperties which specifies Kafka topic and what to do with message.
In short: KafkaConsumerFactory says HOW and ContainerProperties says WHAT.

Consuming done, how to produce messages? It’s even easier.

// again, this is minimum required config, you should check
// https://kafka.apache.org/documentation/#producerconfigs
Map<String, Object> producerConfig = Map.of(
        BOOTSTRAP_SERVERS_CONFIG, brokerAddress
);

// create KafkaTemplate from KafkaProducerFactory
// which specifies serializers
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(
        new DefaultKafkaProducerFactory<>(
                producerConfig,
                new StringSerializer(),
                new StringSerializer()));

kafkaTemplate.send(topic, message);

There is really not much to add. Code is pretty simple and we didn’t use any annotation. Great success!

Great success!

You can see sample code on my Github: https://github.com/douevencode/spring-kafka-sample

(Code might be structured differently because this way is more readable for blog post, but for reading normal code it might be not the best choice.)