Dynamically sink the data to multiple kafka topics using mqtt source connector

I am new to this tool. I have a requirement in which I need to dynamically sink the data to multiple kafka topics using mqtt source connector. For that, I have used the kafka CRD like

spec:
  class: io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
  config:
    tasks.max: 1
    connect.mqtt.clean: "true"
    connect.mqtt.hosts: "tcp://rabbitmq-cluster:1883"
    connect.mqtt.converter.throw.on.error: "true"
    connect.mqtt.log.message: "true"
    connect.mqtt.client.id: "lenses-client-connector"
    connect.mqtt.timeout: "1"
    connect.mqtt.keep.alive: "2"
    connect.mqtt.username: "<rabbitmq-client-id>"
    connect.mqtt.password: "<rabbitmq-client-secret>"
    connect.mqtt.service.quality: "1"
    connect.key.converter: "org.apache.kafka.connect.storage.StringConverter"
    connect.value.converter: "org.apache.kafka.connect.converters.StringConverter"
    connect.mqtt.kcql: "INSERT INTO <kafka-topic> SELECT * FROM `<queue-name>` WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`"

I have check on the ChatGPT it suggests that we can use "INSERT INTO kafka_${1} SELECT * FROM mqtt/topic/+/device/+/+" but i further check with the lenses stream reactor documentation, I have found no way like this.

Can someone suggest how can I achieve my usecase?

Thanks

You use the dynamic target functionality. This with route to a topic based on a field value. You can use SMTs to add or modify fields.

MQTT | Lenses Docs

Thank you for your response. But, in my use case, we don’t receive the kafka topic name in the field, but we’ll receive it in the mqtt topic name itself. So, are there any way to do this. Also, if you have some documents regarding SMTs because I have tried to find them but got no documentations.
Thanks.

Ah, sorry I thought it was the sink.

For the source you can use the wildcard subscription and set the KCQL target to $. I will update the docs but the test case his here. stream-reactor/kafka-connect-mqtt/src/it/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManagerTest.scala at master · lensesio/stream-reactor

Thank you for your response, It will dynamically route to a kafka topic same as mqtt topic. But I need to extract only one string from my mqtt pattern.
For example:
MQTT Topic Pattern - mqtt/topic/+/device/+/+
MQTT Topic Example 1 - mqtt/topic/A/device/B/C
MQTT Topic Example 2 - mqtt/topic/X/device/Y/Z

The kafka topic needs to create like - kafka_A, kafka_X

Can you help me out with this?

Have you tried an SMT, for source connectors they sit between the connector and kafka and can manipulate the records including the topic name.

Single Message Transforms | Redpanda Cloud