4

I had a scenario to read the JSON data from my Kafka topic, and by making use of Kafka 0.11 version I need to write Java code for streaming the JSON data present in the Kafka topic.My input is a Json Data containing arrays of Dictionaries.

Now my requirement is to get the "text" field, key in dictionary contained in array from the json data and pass all those text tweets to another topic through Kafka Streaming.

I wrote code till here. Please help me to parse the data.

Java code for streaming

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input


personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");
1

1 Answer 1

8

I would suggest you the following to have more control on the JSON data.

  1. write a Serializer and De-Serializer.
  2. Create a POJO basing on the JSON String. POJO is the best way to have more control on the data.
  3. Map the data to POJO to access the required data.

POJO:

@JsonRootName("person")
public class Person implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String name;
    private String personalID;
    private String country;
    private String occupation;

    public Person() {

    }

    @JsonCreator
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
            @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
        this.name= name;
        this.personalID = personalID;
        this.country = country;
        this.occupation = occupation;
    }

    //getters and setters stripped
}

Serializer:

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        byte[] retval = null;
        try {
            System.out.println(data.getClass());
            retval = om.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
        return retval;
    }

}

Deserializer:

public class JsonDeserializer<T> implements Deserializer<T> {

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if (type == null) {
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        T data = null;
        if (bytes == null || bytes.length == 0) {
            return null;
        }

        try {
            System.out.println(getType());
            data = om.readValue(bytes, type);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    protected Class<T> getType() {
        return type;
    }

}

Consumer:

public class ConsumerUtilities {

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void getStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
                Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
                personJsonDeserializer);
        KStreamBuilder builder = getStreamingConsumer();

        try {

            KStream<String, Person> kStream = builder.stream(Serdes.String(),
                    personSerde, "test");
            kStream.foreach(new ForeachAction<String, Person>() {

                @Override
                public void apply(String arg0, Person arg1) {
                    System.out.println(arg1.getCountry());                  
                }

            });
        } catch (Exception s) {
            s.printStackTrace();
        }
        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

Producer:

public class ProducerUtilities {

    public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "com.kafka.api.serdes.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String, Person> createRecord(Person person) {
        ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
                "test", person);
        return record;
    }

}
Sign up to request clarification or add additional context in comments.

3 Comments

Hai sir,Thank you for your speedy reply.
But my json is not an string it json array
Then iterate through the array and operate on a string.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.