2

I am trying to consume a json message using kafka connect api in kafka streams. I tried searching in google but i could not find any substantial information on how to read json message in streams api.

Therefore, with the limited knowledge i have tried the below method.

package com.kafka.api.serializers.json;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ConsumerUtilities {

    private static ObjectMapper om = new ObjectMapper();

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                "org.apache.kafka.connect.json.JsonDeserializer");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void printStreamData() {
        KStreamBuilder builder = getStreamingConsumer();
        KStream<String, JsonNode> kStream = builder.stream("test");
        kStream.foreach(new ForeachAction<String, JsonNode>() {
            @Override
            public void apply(String key, JsonNode value) {
                try {
                    System.out.println(key + " : " + om.treeToValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        });

        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

package com.kafka.api.serializers.json;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ProducerUtilities {

    private static ObjectMapper om = new ObjectMapper();


    public static org.apache.kafka.clients.producer.Producer<String, JsonNode> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.connect.json.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String,JsonNode> createRecord(Person person){
        JsonNode jsonNode = om.valueToTree(person);
        ProducerRecord<String,JsonNode> record = new ProducerRecord<String,JsonNode>("test",jsonNode);
        return record;
    }

}

When i execute the code i am getting exception as below

[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group Kafka test application failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Shutting down
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed.
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from REBALANCING to ERROR.
Exception in thread "Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    ... 3 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more

I am looking for some guidance to solve the issue.

Created custom serializer and deserializer as per Matthias suggestion

package com.kafka.api.utilities;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import com.kafka.api.models.Person;
import com.kafka.api.serdes.JsonDeserializer;
import com.kafka.api.serdes.JsonSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde;

public class ConsumerUtilities {

    //private static ObjectMapper om = new ObjectMapper();

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//      configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
//              "org.apache.kafka.common.serialization.ByteArraySerializer");
//      configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
//              "org.apache.kafka.connect.json.JsonDeserializer");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void printStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer, personJsonDeserializer);

        KStreamBuilder builder = getStreamingConsumer();
        KStream<String, Person> kStream = builder.stream(Serdes.String(),personSerde , "test");
        kStream.foreach(new ForeachAction<String, Person>() {
            @Override
            public void apply(String key, Person value) {
                System.out.println(key + " : " + value.toString());
            }

        });

        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

package com.kafka.api.serdes;

import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonDeserializer<T> implements Deserializer<T>{

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if(type == null){
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        if(bytes == null || bytes.length == 0){
            return null;
        }

        try{
            return om.readValue(bytes, type);
        }catch(Exception e){
            throw new SerializationException(e);
        }
    }

    protected Class<T> getType(){
        return type;
    }

}

package com.kafka.api.serdes;

import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        // TODO Auto-generated method stub
        try {
            return om.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
    }

}

Exception: After executing the streaming application i am getting the below exception. I am confused.

[Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b] State transition from RUNNING to ERROR.
Exception in thread "Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=test, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
 at [Source: [B@5ee179dc; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
 at [Source: [B@5ee179dc; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:43)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

1 Answer 1

4

Streams API needs to read and write data, and thus, it used the abstraction of a Serde that is a wrapper for a serializer and deserializer at the same time. This is what the exception basically says.

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde

Thus, you need to wrap JsonSerializer and JsonDeserialzier into a JsonSerde and use this JsonSerde in StreamsConfig.

The simplest way to do this, is to use Serdes.serdeFrom(...) method (note: Serdes -- plural). As an alternative, you can also implement Serde interface (note Serde -- singular) directly. You can find examples in Serdes class on how to implement Serde interface.

Sign up to request clarification or add additional context in comments.

4 Comments

As per your suggestion i have created serializer and deserializer and wrapped. But i am getting the above exception. Please help.
The exception comes from your deserializer. I guess you need to debug this part by yourself -- it's not a Streams issue... You can see from the stack trace, that your deserializer is called... Seems like a schema issue?
This answer is not helping please add any code how You have used JsonSerde or any example

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.