I am new to this tool. I have a requirement in which I need to dynamically sink the data to multiple kafka topics using mqtt source connector. For that, I have used the kafka CRD like
spec:
class: io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
config:
tasks.max: 1
connect.mqtt.clean: "true"
connect.mqtt.hosts: "tcp://rabbitmq-cluster:1883"
connect.mqtt.converter.throw.on.error: "true"
connect.mqtt.log.message: "true"
connect.mqtt.client.id: "lenses-client-connector"
connect.mqtt.timeout: "1"
connect.mqtt.keep.alive: "2"
connect.mqtt.username: "<rabbitmq-client-id>"
connect.mqtt.password: "<rabbitmq-client-secret>"
connect.mqtt.service.quality: "1"
connect.key.converter: "org.apache.kafka.connect.storage.StringConverter"
connect.value.converter: "org.apache.kafka.connect.converters.StringConverter"
connect.mqtt.kcql: "INSERT INTO <kafka-topic> SELECT * FROM `<queue-name>` WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`"
I have check on the ChatGPT it suggests that we can use "INSERT INTO kafka_${1} SELECT * FROM mqtt/topic/+/device/+/+"
but i further check with the lenses stream reactor documentation, I have found no way like this.
Can someone suggest how can I achieve my usecase?
Thanks