Hi folks,
- We have a centralise kafka cluster and we want to back up topics on behave of users. Therefore, we are hoping to run a single sink connector to back up for all topics. We use
org.apache.kafka.connect.converters.ByteArrayConverter
as it provides a “pass-through” option that does no conversion. However, the data file stored in S3 can’t be restored.
I would keen to know if this is something anyone has tried?
Some experiments I have tried under lenses s3 connector version 7.2.0
:
- If we store the bytes into
xx.avro
file in S3, it would return errorclass lshaded.apache.avro.util.Utf8 cannot be cast to class lshaded.apache.avro.generic.GenericRecord
. See below for the sink/source connector configuration.
Sink connector:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:sink-with-avro-1 SELECT * FROM `*` STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
topics=test-topic-03
connect.s3.aws.region=ap-southeast-2
errors.log.enable=true
Source Connector
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test-topic-03-restore SELECT * FROM test-bucket:sink-with-avro-1 STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
connect.s3.aws.region=ap-southeast-2
errors.log.enable=true
The error trace
1.71636E+12 [Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,483] INFO [source-with-envelope-avro-1|task-0] WorkerSourceTask{id=source-with-envelope-avro-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
1.71636E+12 [Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,483] INFO [source-with-envelope-avro-1|task-0] WorkerSourceTask{id=source-with-envelope-avro-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
1.71636E+12 [Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,484] ERROR [source-with-envelope-avro-1|task-0] WorkerSourceTask{id=source-with-envelope-avro-1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
1.71636E+12 [Worker-0c580284e9bf6ff17] java.lang.ClassCastException: class lshaded.apache.avro.util.Utf8 cannot be cast to class lshaded.apache.avro.generic.GenericRecord (lshaded.apache.avro.util.Utf8 and lshaded.apache.avro.generic.GenericRecord are in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @77f80c04)
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.next(AvroStreamReader.scala:42)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.next(AvroStreamReader.scala:27)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:53)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:34)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.accumulate(ResultReader.scala:56)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.retrieveResults(ResultReader.scala:45)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$12(ReaderManager.scala:78)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at set @ io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery$.$anonfun$run$9(PartitionDiscovery.scala:56)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$10(ReaderManager.scala:56)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)"
1.71636E+12 "[Worker-0c580284e9bf6ff17] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)"
1.71636E+12 [Worker-0c580284e9bf6ff17] [2024-05-22 05:22:47,488] INFO [source-with-envelope-avro-1|task-0] Stopping S3 source task (io.lenses.streamreactor.connect.aws.s3.source.S3SourceTask:93)
================
- If we store the bytes into
xx.bytes
file in S3, it would return errororg.apache.kafka.common.InvalidRecordException: One or more records have been rejected
. See below for the sink/source connector configuration.
S3 sink connector
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:sink-with-bytes-1-topic-3 SELECT * FROM test-topic-03 STOREAS `BYTES` PROPERTIES ('flush.count'=1)
aws.region=ap-southeast-2
tasks.max=2
topics=test-topic-03
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
S3 source Connector
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test-topic-03-restore SELECT * FROM test-bucket:sink-with-bytes-1-topic-3 STOREAS `BYTES` PROPERTIES ('flush.count'=1)
aws.region=ap-southeast-2
tasks.max=2
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
The error trace
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:40,736] INFO [source-with-bytes-1|task-2] [source-with-bytes-1 - 4 of 4] Read 1 record(-s) from file htest-bucket:(prefix: sink-with-bytes-1-topic-3//)sink-with-bytes-1-topic-3/test-topic-03/2/000000003278_1715668881109_1715668881109.bytes/:#-1:@2024-05-19T23:29:27Z (io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager:85)
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:40,736] INFO [source-with-bytes-1|task-2] [source-with-bytes-1 - 4 of 4] Read 0 records from file htest-bucket:(prefix: sink-with-bytes-1-topic-3//)sink-with-bytes-1-topic-3/test-topic-03/2/000000003278_1715668881109_1715668881109.bytes/:#-1:@2024-05-19T23:29:27Z (io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager:89)
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:40,736] INFO [source-with-bytes-1|task-2] [source-with-bytes-1 - 4 of 4] Read complete - 1 records from file htest-bucket:(prefix: sink-with-bytes-1-topic-3//)sink-with-bytes-1-topic-3/test-topic-03/2/000000003278_1715668881109_1715668881109.bytes/:#-1:@2024-05-19T23:29:27Z (io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager:115)
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:41,019] ERROR [source-with-bytes-1|task-3] WorkerSourceTask{id=source-with-bytes-1-3} failed to send record to test-topic-03-restore: (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:41,020] ERROR [source-with-bytes-1|task-3] WorkerSourceTask{id=source-with-bytes-1-3} failed to send record to test-topic-03-restore: (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] [2024-05-19 23:40:41,021] ERROR [source-with-bytes-1|task-3] WorkerSourceTask{id=source-with-bytes-1-3} failed to send record to test-topic-03-restore: (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
1.71616E+12 [Worker-0ecd2ec7bf6311a3f] org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:284)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:338)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] at java.base/java.lang.Thread.run(Thread.java:829)
2024-05-20T09:40:46.000+10:00 [Worker-0ecd2ec7bf6311a3f] Caused by: org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
====================
- I would receive
java.io.IOException: Not an Avro data file.
using the below config.
S3 sink
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:sink-with-envelope SELECT * FROM `*` STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
topics=test-topic-03
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
S3 source
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test-topic-03-restore SELECT * FROM test-bucket:sink-with-envelope-1-topic-3 STOREAS `AVRO` PROPERTIES ('flush.count'=1, 'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=2
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Error trace
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] java.io.IOException: Not an Avro data file.
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.readMagic(DataFileStream.java:108)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.initialize(DataFileStream.java:122)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.<init>(DataFileStream.java:90)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.<init>(AvroStreamReader.scala:32)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection$.toStreamReader(FormatSelection.scala:157)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at map @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at map @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.poll(CloudSourceTaskState.scala:35)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] Caused by: java.io.EOFException
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:855)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:372)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.io.BinaryDecoder.readFixed(BinaryDecoder.java:328)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.io.Decoder.readFixed(Decoder.java:159)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.readMagic(DataFileStream.java:106)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.initialize(DataFileStream.java:122)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at lshaded.apache.avro.file.DataFileStream.<init>(DataFileStream.java:90)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.<init>(AvroStreamReader.scala:32)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection$.toStreamReader(FormatSelection.scala:157)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at scala.util.Either.flatMap(Either.scala:360)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at cats.effect.IOFiber.runLoop(IOFiber.scala:413)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at cats.effect.IOFiber.execR(IOFiber.scala:1362)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at cats.effect.IOFiber.run(IOFiber.scala:112)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:588)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] [2024-05-17 06:06:58,508] INFO [source-with-envelope-2|task-1] Stopping S3 source task (io.lenses.streamreactor.connect.aws.s3.source.S3SourceTask:93)
2024-05-17T16:06:58.000+10:00 [Worker-048ec71a09047229b] [2024-05-17 06:06:58,510] INFO [source-with-envelope-2|task-1] Stopped S3 source task (io.lenses.streamreactor.connect.aws.s3.source.S3SourceTask:98)