Hi
I’m using S3 Kafka connectors ver. 8.1.32 from stream-reactor (lensesio)
I deploy them as MSK connector.
Sink writes to s3 bucket, source restores back to MSK
This is my config for sink connector:
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"connect.s3.kcql": "INSERT INTO bucket SELECT * FROM mytopic WITHSTRUCTURE STOREAS `JSON` PROPERTIES ('flush.count'=1, 'store.envelope'=true)",
"topics": "mytopic",
"tasks.max": "2",
"connect.s3.aws.region": "myregion",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter"
}
this is config for source connector:
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector",
"connect.s3.kcql": "INSERT INTO mytopic SELECT * FROM bucket:mytopic WITHSTRUCTURE STOREAS `JSON` PROPERTIES ('store.envelope'=true)",
"topics": "mytopic",
"tasks.max": "2",
"connect.s3.aws.region": "myregion",
"errors.log.enable": "true",
"auto.offset.reset": "earliest",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
sink works and creates such file .json file in s3 bucket
{
"headers": {
"x-message-type": "SomeType"
},
"metadata": {
"partition": 2,
"offset": 1,
"topic": "mytopic",
"timestamp": 1745516252181
},
"value": null,
"key": "b60cf234-8a49-4b5d-9cd4-c10ae2e52e7d"
}
when i try to restore with my source connector i’m getting the next error:
org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRING
at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:55)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
I found similar issue on github: Tombstone handling in s3 backup/recovery scenario · Issue #1365 · lensesio/stream-reactor · GitHub
Where it states that issue probably happens as valueIsArray
parameter is not present in the backup file.
So i decided to modify my backup file manually. I just added this valueIsArray parameter to my backup file
In the result i have got such file:
{
"headers": {
"x-message-type": "SomeType"
},
"metadata": {
"partition": 2,
"offset": 1,
"topic": "mytopic",
"timestamp": 1745516252181
},
"value": null,
"valueIsArray": true,
"key": "b60cf234-8a49-4b5d-9cd4-c10ae2e52e7d"
}
When i tried to restore this new file - it worked and restored without any issues, consumers were able to consume as well. Tombstone was properly displayed
Can we expect to have this fixed?
Thanks!