Kafka to mqtt source connector 1:1 mapping for wild card topics

I’m using the mqtt source connector to move data into kafka topics ,
Im looking for way to autocreate topics with the same name as mqtt topics
the connector would be a wildcard one something like Select * from AUTO/+/+ OR AUTO/# AND SOMEHOW I WANT TO CREATE MULTIPLE KAFKA TOPICS with the same name as the MQTT topics , i tried using the sql processor after inserting everything into a TOPIC and then later grouping them based on field and inserting only grouped messages into a different topic but i cant seem to use the topicname from the field or any way iterate over the grouped by values

heres a stcakoverflow question referencing something similar:

my connector config:
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.username=user
name=src-connector
connect.mqtt.kcql=INSERT INTO SametopicnameasmqttTOPIC SELECT * FROM AUTO/+/+
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
connect.mqtt.service.quality=1
connect.mqtt.password=******
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://MQTTHOST:1883

Hi Thilak,

At the moment there is no such support.

There are two approaches you can take, both involves writing code. One is using the connector support for its own converters (see MQTT to Kafka open source connector | Lenses.io Documentation and the WITHConverter) which allows to create the output Connect framework record.

The second one is to write your own SMT for a source record, and use the SourceRecord.sourcePartition Map and look for the key named: topic. This might be less complex to write and deploy than the first option.

Regards,
Stefan

One thing to keep in mind is that MQTT topics/queue names might not match the Kafka topic name restrictions, and you should account for that in your implementation.