I am backing up all topics from an AWS MSK cluster to Amazon S3 using the Lenses Kafka Connect AWS S3 connector (kafka-connect-aws-s3-8.1.33.zip).
Backup configuration
I use the following S3 Sink Connector configuration to back up all topics:
connector_configuration_backup = {
"connector.class" = "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"key.converter.schemas.enable" = "false",
"connect.s3.kcql" = "INSERT INTO ${module.msk_backup_s3.id}:backup SELECT * FROM `*` STOREAS `JSON` PROPERTIES ('store.envelope'=true, 'flush.interval'=300, 'flush.count'=10000)",
"connect.s3.aws.region" = data.aws_region.current.region,
"tasks.max" = 2,
"topics.regex" = "^(?!(__amazon_msk_|__consumer_offsets)).*",
"schema.enable" = "false",
"errors.log.enable" = "true",
"errors.log.include.messages" = "true",
"value.converter" = "org.apache.kafka.connect.storage.StringConverter",
"key.converter" = "org.apache.kafka.connect.storage.StringConverter",
"connect.s3.vhost.bucket" = "false",
}
Restore configuration
After recreating the topics in the MSK cluster, I restore the data from S3 using the following S3 Source Connector configuration:
connector_configuration_restore = {
"connector.class" = "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector",
"key.converter.schemas.enable" = "false",
"connect.s3.kcql" = "INSERT INTO backup-test-event SELECT * FROM test-msk-backup:backup/backup-test-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
INSERT INTO test-event SELECT * FROM test-msk-backup:backup/test-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
INSERT INTO test1-event SELECT * FROM test-msk-backup:backup/test1-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
INSERT INTO test2-event SELECT * FROM test-msk-backup:backup/test2-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
INSERT INTO test3-event SELECT * FROM test-msk-backup:backup/test3-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
INSERT INTO test4-event SELECT * FROM test-msk-backup:backup/test4-event STOREAS `JSON` PROPERTIES ('store.envelope'=true)",
"connect.s3.aws.region" = data.aws_region.current.name,
"tasks.max" = 10,
"topics.regex" = ".*",
"schema.enable" = "false",
"errors.log.enable" = "true",
"errors.log.include.messages" = "true",
"value.converter" = "org.apache.kafka.connect.storage.StringConverter",
"key.converter" = "org.apache.kafka.connect.storage.StringConverter",
"connect.s3.vhost.bucket" = "false",
"connect.s3.source.empty.results.backoff.max.delay" = 5000,
"connect.s3.pool.max.connections" = 50,
"connect.s3.source.partition.search.continuous" = "false",
"connect.s3.ordering.type" = "AlphaNumeric",
"errors.tolerance" = "all",
"connect.s3.source.partition.search.recurse.levels" = "1",
"offset.flush.interval.ms" = "5000",
"offset.flush.timeout.ms" = "10000"
}
Observed behavior
When the restore connector is configured to restore multiple topics in a single connector, some topics are not restored at all.
When the restore connector is configured to restore only a single topic, the restore works as expected.
Additionally, for some topics, the restored data contains duplicate messages.
Expected behavior
All topics defined in the restore connector should be restored consistently.
Messages should be restored exactly once, without duplication.