I’m working on a POC where in I should be able to push the messages to kafka topic on updatation of cassandra table. I’ve eabled cdc on perticular table using alter table.
I’m using Cassandra 3.10, AWS MSK.
I’ve downloaded source connector from Releases · lensesio/stream-reactor · GitHub
File name- kafka-connect-cassandra-8.1.33.zip
I’ve successfully created msk connector using below conector json which is in running state-
{
"connector.class": "io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"connect.cassandra.import.mode": "timestamp",
"connect.cassandra.key.space": "ncl",
"connect.cassandra.initial.offset": "2025-05-01 00:00:00.0000000Z",
"tasks.max": "1",
"connect.cassandra.kcql": "INSERT INTO test_topic SELECT event_data, event_ts FROM pack_events IGNORE event_ts PK event_ts WITHUNWRAP",
"bootstrap.servers": "b-2,b-1",
"connect.cassandra.password": "pwd",
"connect.progress.enabled": "true",
"connect.cassandra.import.poll.interval": "1000",
"connect.cassandra.username": "user",
"connect.cassandra.error.policy": "THROW",
"key.converter.schemas.enable": "false",
"connect.cassandra.contact.points": "host",
"connect.lenses.log.error.policy": "THROW",
"connect.cassandra.port": "9042",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
On table updation (as per below update sql) I’m not able to get the kafka message in desigred format and also exact operation like update/insert. What could be the reason? Is it cassandra version?
UPDATE pack_events SET event_data = 'Updated event data 6' WHERE event_id = 'event123' AND event_ts = '2024-07-28 10:00:00';