1

I have a wso2 stream processor code where i am receiving input from kafka as text format because debezium mongodb connector gives out a lengthy output and it's not a valid json to send to WSO2. So, i am receiving it as text and plans to use regex to get only the part of payload i need. I am not able to figure out how to convert this text format to json so that i can send it to another kafka stream and do custom json mapping on this.

Another approach i am trying: I am also trying custom mapping for text:

@source(type='kafka',
        topic.list='demo',
        partition.no.list='0',
        threading.option='single.thread',
        group.id="group",
        bootstrap.servers='localhost:9092',
 @map(type='text',fail.on.missing.attribute='false', regex.A='(.*)',
                @attributes(id = 'A[0]')))
define stream transactionstream1(id string);

Sample Json:

{"_id":{"$oid":"342fs"},"code":"ffssefse","name":"test1","desc":"description1","transRefId":"esfef3423des","amount":1000,"currency":"INR","requestId":null,"redeemedCashcode":null,"sender":{"id":"5d9c0dedcf71a09664922042","name":"tat","phone":"3242324"}}

Regex to get code attribute from above json is : (?:"code":")(.*?)(?:") but wso2 regex.A is not accepting this.

Thats why currently i am getting entire data in text format and trying to convert to json

Is there some other approach to this? or some other convention of writing regex in wso2 stream processor?

I need to get individual attributes in custom mapping from text from the above json in wso2sp.

1 Answer 1

1

Since your regex consist of ", you need to escape that using 3 double quotes as follows,

 regex.A = """(?:"code":")(.*?)(?:")""",

Sign up to request clarification or add additional context in comments.

8 Comments

Thanks It worked. I was struggling since yesterday to get this working in wso2. I have one more question. Can we use regex in wso2 to replace " / " with " " ? @Niveathika
Yes, you can use str:replaceAll() /str:replaceFirst() as per your need, siddhi-io.github.io/siddhi-execution-string/api/5.0.9/…
I have a few more questions. I am trying to get kafka input as text format, where i filter out a part of it with regex and then send it to another stream where i want to do custom mapping. I am not able to figure out how to do custom mapping in another stream here? @Niveathika
Mappers are only at the receiving/publishing parts i.e mappers can only be coupled with sources/sinks. After you extract part of it to a stream attribute, you can only use siddhi extensions to manipulate such str:.., json:..
If i send this event to another kafka topic and receive it in a new stream then it's possible? or i will have to do all the tasks at my initial source stream where i receive this text?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.