Hi everyone,
I previously raised this issue( GCPStorage sink plugin committed offsets ) regarding GCS bucket versioning and offset commits. When versioning is enabled and a file (Avro or Parquet) is uploaded, the offset for the last record in the file isn’t committed. Lets assume we are sending 5 messages to a topic only offset 4 will be committed on upload of resulting AVRO file, lets say its named {topic-name}(46_000000000005).avro based on the last inserted record offset. If the pod or application restarts, Kafka resumes from offset 5. If only one record is present in the next flush, the connector overwrites the previously written file with the same name, effectively deleting records 1-4.
We have managed to reproduce that in a test as well. This isn’t an issue when connect.gcpstorage.exactly.once.enable property is set to true but since it prevents us replaying which we need to do at times we cant use that feature.
Can you suggest a workaround or raise a bug to address this please.
Having a peek inside your code the problem seems to be centered around line 89 in writer.scala
and line 62 FileNamer.scala
Since we are using the TopicFileNamePartitioner by virtue of using field partition. Even if you fix it by committing the last record offset (currently last record - 1 is committed) incase of a commit failure we will end up with overwrites and data loss
One reliable fix would be to include the processed timestamp in TopicPartitionOffsetFileNamer (in addition to topic, partition, and offset), ensuring unique filenames and preventing overwrites.
Let us know your thoughts or if you have a preferred alternative. Once you indicate the direction you’d like to take, we can prepare a PR accordingly.
Hi lenses devs,
thinking more about it I think (I am a colleague of Umar) that if you fix the commit (currently last record on each partition is not committed, so lag never goes to zero even if there is no traffic on the topic), the above would not be such a huge issue as the file with the last record would not be rewritten - or the whole file would be regenerated.
Nevertheless probably worth adding more data to the filename, e.g. perhaps the lowest offset, instead of timestamps as kafka timestamps are not guaranteed to be strictly monotonically increasing, and all of them can be the same.
But still after reconfiguration, we might rewrite the same file with less records.
My colleague will raise a PR with an option possibly give a custom namer class (for example it can add a random string to the name to avoid clash).
Also another option could be to add the starting offset to the filename not only the last offset (timestamps are not guaranteed to be strictly monotonically increasing or unique, so the filename built on timestamps and last offset is also not going to stop us from possibly rewriting the file).