Issue: Incomplete and duplicated topic restore from S3 to AWS MSK using Lenses S3 Source Connector

I am backing up all topics from an AWS MSK cluster to Amazon S3 using the Lenses Kafka Connect AWS S3 connector (kafka-connect-aws-s3-8.1.33.zip).

Backup configuration

I use the following S3 Sink Connector configuration to back up all topics:

connector_configuration_backup = {
  "connector.class"              = "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
  "key.converter.schemas.enable" = "false",
  "connect.s3.kcql"              = "INSERT INTO ${module.msk_backup_s3.id}:backup SELECT * FROM `*` STOREAS `JSON` PROPERTIES ('store.envelope'=true, 'flush.interval'=300, 'flush.count'=10000)",
  "connect.s3.aws.region"        = data.aws_region.current.region,
  "tasks.max"                    = 2,
  "topics.regex"                 = "^(?!(__amazon_msk_|__consumer_offsets)).*",
  "schema.enable"                = "false",
  "errors.log.enable"            = "true",
  "errors.log.include.messages"  = "true",
  "value.converter"              = "org.apache.kafka.connect.storage.StringConverter",
  "key.converter"                = "org.apache.kafka.connect.storage.StringConverter",
  "connect.s3.vhost.bucket"      = "false",
}

Restore configuration

After recreating the topics in the MSK cluster, I restore the data from S3 using the following S3 Source Connector configuration:

connector_configuration_restore = {
  "connector.class"                                   = "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector",
  "key.converter.schemas.enable"                      = "false",
  "connect.s3.kcql"                                   = "INSERT INTO backup-test-event SELECT * FROM test-msk-backup:backup/backup-test-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
                                                         INSERT INTO test-event SELECT * FROM test-msk-backup:backup/test-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
                                                         INSERT INTO test1-event SELECT * FROM test-msk-backup:backup/test1-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
                                                         INSERT INTO test2-event SELECT * FROM test-msk-backup:backup/test2-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
                                                         INSERT INTO test3-event SELECT * FROM test-msk-backup:backup/test3-event STOREAS `JSON` PROPERTIES ('store.envelope'=true);
                                                         INSERT INTO test4-event SELECT * FROM test-msk-backup:backup/test4-event STOREAS `JSON` PROPERTIES ('store.envelope'=true)",
  "connect.s3.aws.region"                             = data.aws_region.current.name,
  "tasks.max"                                         = 10,
  "topics.regex"                                      = ".*",
  "schema.enable"                                     = "false",
  "errors.log.enable"                                 = "true",
  "errors.log.include.messages"                       = "true",
  "value.converter"                                   = "org.apache.kafka.connect.storage.StringConverter",
  "key.converter"                                     = "org.apache.kafka.connect.storage.StringConverter",
  "connect.s3.vhost.bucket"                           = "false",
  "connect.s3.source.empty.results.backoff.max.delay" = 5000,
  "connect.s3.pool.max.connections"                   = 50,
  "connect.s3.source.partition.search.continuous"     = "false",
  "connect.s3.ordering.type"                          = "AlphaNumeric",
  "errors.tolerance"                                  = "all",
  "connect.s3.source.partition.search.recurse.levels" = "1",
  "offset.flush.interval.ms"                          = "5000",
  "offset.flush.timeout.ms"                           = "10000"
}

Observed behavior

When the restore connector is configured to restore multiple topics in a single connector, some topics are not restored at all.
When the restore connector is configured to restore only a single topic, the restore works as expected.
Additionally, for some topics, the restored data contains duplicate messages.

Expected behavior

All topics defined in the restore connector should be restored consistently.
Messages should be restored exactly once, without duplication.

Hello, welcome to our community!

Although I don’t have a definitive answer, my first instinct would be to increase the number of tasks and the memory available to Kafka Connect.

Resource exhaustion could be the root cause here. Perhaps there are not enough tasks to handle all the partitions, or not enough memory to process all topics and partitions at once. If there isn’t enough memory, the connector might experience out-of-memory issues, which can cause a restart or killed tasks, loss of recent source offsets, and, inevitably, duplicate messages.

If you could share logs —even better if the level is set to DEBUG— we can look for hints there too.

The source connector at the moment only works with restoring one topic

1 Like