Hello,
I’m getting an error with the S3 source connector when trying to restore messages.
The error is this:
ERROR [msk-s3-source-connector|task-2] WorkerSourceTask{id=msk-s3-source-connector-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0afa8495ef1dcd049] java.lang.ClassCastException: class java.nio.HeapByteBuffer cannot be cast to class lshaded.apache.avro.generic.GenericRecord (java.nio.HeapByteBuffer is in module java.base of loader 'bootstrap'; lshaded.apache.avro.generic.GenericRecord is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @77f80c04)
[Worker-0afa8495ef1dcd049] at io.lenses.streamreactor.connect.cloud.common.formats.reader.AvroStreamReader.next(AvroStreamReader.scala:27)
[Worker-0afa8495ef1dcd049] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:53)
[Worker-0afa8495ef1dcd049] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.accumulate(ResultReader.scala:56)
[Worker-0afa8495ef1dcd049] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$12(ReaderManager.scala:78)
[Worker-0afa8495ef1dcd049] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
[Worker-0afa8495ef1dcd049] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
[Worker-0afa8495ef1dcd049] at set @ io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery$.$anonfun$run$9(PartitionDiscovery.scala:56)
[Worker-0afa8495ef1dcd049] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
[Worker-0afa8495ef1dcd049] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
[Worker-0afa8495ef1dcd049] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
[Worker-0afa8495ef1dcd049] at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
[Worker-0afa8495ef1dcd049] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
[Worker-0afa8495ef1dcd049] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
The Sink connector configuration is this:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
name=msk-s3-sink-connector
topics=topic-1, topic-2, topic-3
connect.s3.aws.region=us-east-1
connect.s3.kcql=INSERT INTO aBucket:aPrefix SELECT * FROM `*` STOREAS `AVRO` PROPERTIES('store.envelope.key'=true , 'store.envelope.value'=true, 'store.envelope.metadata'=true, 'store.envelope.headers'=false,'flush.count'=1)
The Source connector configuration is this:
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
errors.log.enable=true
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
name=msk-s3-source-connector
connect.s3.aws.region=us-east-1
connect.s3.kcql=INSERT INTO `topic-1` SELECT * FROM aBucket:aPrefix/topic-1 STOREAS `AVRO` PROPERTIES('store.envelope'=true,'flush.count'=1)
In the source connector I also tried
... PROPERTIES('store.envelope.key'=true , 'store.envelope.value'=true, 'store.envelope.metadata'=true, 'store.envelope.headers'=false,'flush.count'=1)
and the same error shows up.