Back up and restore Kafka Topic without conversion

Hi folks,

  • We have a centralise kafka cluster and we want to back up topics on behave of users. Therefore, we are hoping to run a single sink connector to back up for all topics. We use org.apache.kafka.connect.converters.ByteArrayConverter as it provides a “pass-through” option that does no conversion. However, the data file stored in S3 can’t be restored.

I would keen to know if this is something anyone has tried?

Some experiments I have tried under lenses s3 connector version 7.2.0:

  • If we store the bytes into xx.avro file in S3, it would return error class lshaded.apache.avro.util.Utf8 cannot be cast to class lshaded.apache.avro.generic.GenericRecord. See below for the sink/source connector configuration.

Sink connector:

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:sink-with-avro-1 SELECT * FROM `*` STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
topics=test-topic-03
connect.s3.aws.region=ap-southeast-2
errors.log.enable=true

Source Connector

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test-topic-03-restore SELECT * FROM test-bucket:sink-with-avro-1 STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
connect.s3.aws.region=ap-southeast-2
errors.log.enable=true

The error trace

1.71636E+12	[Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,483] INFO [source-with-envelope-avro-1|task-0] WorkerSourceTask{id=source-with-envelope-avro-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
1.71636E+12	[Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,483] INFO [source-with-envelope-avro-1|task-0] WorkerSourceTask{id=source-with-envelope-avro-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
1.71636E+12	[Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,484] ERROR [source-with-envelope-avro-1|task-0] WorkerSourceTask{id=source-with-envelope-avro-1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
1.71636E+12	[Worker-0c580284e9bf6ff17] java.lang.ClassCastException: class lshaded.apache.avro.util.Utf8 cannot be cast to class lshaded.apache.avro.generic.GenericRecord (lshaded.apache.avro.util.Utf8 and lshaded.apache.avro.generic.GenericRecord are in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @77f80c04)
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.next(AvroStreamReader.scala:42)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.next(AvroStreamReader.scala:27)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:53)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:34)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.accumulate(ResultReader.scala:56)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.retrieveResults(ResultReader.scala:45)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$12(ReaderManager.scala:78)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at set @ io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery$.$anonfun$run$9(PartitionDiscovery.scala:56)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$10(ReaderManager.scala:56)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)"
1.71636E+12	"[Worker-0c580284e9bf6ff17] 	at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)"
1.71636E+12	[Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,488] INFO [source-with-envelope-avro-1|task-0] Stopping S3 source task (io.lenses.streamreactor.connect.aws.s3.source.S3SourceTask:93)

================

  • If we store the bytes into xx.bytes file in S3, it would return error org.apache.kafka.common.InvalidRecordException: One or more records have been rejected. See below for the sink/source connector configuration.

S3 sink connector

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:sink-with-bytes-1-topic-3 SELECT * FROM test-topic-03 STOREAS `BYTES` PROPERTIES ('flush.count'=1)
aws.region=ap-southeast-2
tasks.max=2
topics=test-topic-03
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

S3 source Connector

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test-topic-03-restore SELECT * FROM test-bucket:sink-with-bytes-1-topic-3 STOREAS `BYTES` PROPERTIES ('flush.count'=1)
aws.region=ap-southeast-2
tasks.max=2
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

The error trace

1.71616E+12	[Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:40,736] INFO [source-with-bytes-1|task-2] [source-with-bytes-1 - 4 of 4] Read 1 record(-s) from file htest-bucket:(prefix: sink-with-bytes-1-topic-3//)sink-with-bytes-1-topic-3/test-topic-03/2/000000003278_1715668881109_1715668881109.bytes/:#-1:@2024-05-19T23:29:27Z (io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager:85)
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:40,736] INFO [source-with-bytes-1|task-2] [source-with-bytes-1 - 4 of 4] Read 0 records from file htest-bucket:(prefix: sink-with-bytes-1-topic-3//)sink-with-bytes-1-topic-3/test-topic-03/2/000000003278_1715668881109_1715668881109.bytes/:#-1:@2024-05-19T23:29:27Z (io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager:89)
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:40,736] INFO [source-with-bytes-1|task-2] [source-with-bytes-1 - 4 of 4] Read complete - 1 records from file htest-bucket:(prefix: sink-with-bytes-1-topic-3//)sink-with-bytes-1-topic-3/test-topic-03/2/000000003278_1715668881109_1715668881109.bytes/:#-1:@2024-05-19T23:29:27Z (io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager:115)
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:41,019] ERROR [source-with-bytes-1|task-3] WorkerSourceTask{id=source-with-bytes-1-3} failed to send record to test-topic-03-restore:  (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:41,020] ERROR [source-with-bytes-1|task-3] WorkerSourceTask{id=source-with-bytes-1-3} failed to send record to test-topic-03-restore:  (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:41,021] ERROR [source-with-bytes-1|task-3] WorkerSourceTask{id=source-with-bytes-1-3} failed to send record to test-topic-03-restore:  (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
1.71616E+12	[Worker-0ecd2ec7bf6311a3f] org.apache.kafka.common.InvalidRecordException: One or more records have been rejected

2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:284)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:338)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] at java.base/java.lang.Thread.run(Thread.java:829)
2024-05-20T09:40:46.000+10:00	[Worker-0ecd2ec7bf6311a3f] Caused by: org.apache.kafka.common.InvalidRecordException: One or more records have been rejected

====================

  • I would receive java.io.IOException: Not an Avro data file. using the below config.

S3 sink

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:sink-with-envelope SELECT * FROM `*` STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
topics=test-topic-03
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

S3 source

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test-topic-03-restore SELECT * FROM test-bucket:sink-with-envelope-1-topic-3 STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Error trace

2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] java.io.IOException: Not an Avro data file.
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.readMagic(DataFileStream.java:108)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.initialize(DataFileStream.java:122)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.<init>(DataFileStream.java:90)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.<init>(AvroStreamReader.scala:32)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection$.toStreamReader(FormatSelection.scala:157)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at map @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at map @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.poll(CloudSourceTaskState.scala:35)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] Caused by: java.io.EOFException
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:855)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:372)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.io.BinaryDecoder.readFixed(BinaryDecoder.java:328)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.io.Decoder.readFixed(Decoder.java:159)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.readMagic(DataFileStream.java:106)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.initialize(DataFileStream.java:122)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.<init>(DataFileStream.java:90)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.<init>(AvroStreamReader.scala:32)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection$.toStreamReader(FormatSelection.scala:157)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at cats.effect.IOFiber.runLoop(IOFiber.scala:413)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at cats.effect.IOFiber.execR(IOFiber.scala:1362)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at cats.effect.IOFiber.run(IOFiber.scala:112)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:588)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] [2024-05-17 06:06:58,508] INFO [source-with-envelope-2|task-1] Stopping S3 source task (io.lenses.streamreactor.connect.aws.s3.source.S3SourceTask:93)
2024-05-17T16:06:58.000+10:00	[Worker-048ec71a09047229b] [2024-05-17 06:06:58,510] INFO [source-with-envelope-2|task-1] Stopped S3 source task (io.lenses.streamreactor.connect.aws.s3.source.S3SourceTask:98)

Posting an update for anyone having similar issues as me.

I was told by @stheppi that when defining below in the KCQL, it would trigger a bug that it ignores the store.envelope=true config. To resolve this, we need to manually build from the repo’s master branch, and that jar to create the connector plugin.

SELECT * FROM `*` 

There was another mistake I had made when specifying the connector config.

I have to specify the bucket path to the topic location. It can’t be a directory up from the topic location. For example,

  • Valid case: test-bucket/avro-data/test-topic
  • Invalid case: test-bucket/avro-data

Without providing the correct S3 bucket path, it returns error saying Not an Arvo data file.