GCS Sink connector intermitten write issue

While running GCS Sink connector our connector stopped with following error message:

{"name":"******","connector":{"state":"RUNNING","worker_id":"*****:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"*****:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:635)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)\n\tat 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)\n\tat java.base/java.util.concurrent.FutureTask.run(Unknown Source)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.kafka.connect.errors.ConnectException: 
org.apache.kafka.connect.errors.ConnectException: fatal:\nerror writing file ({\"owner\":\"***\",\"committedOffset\":592,\"pendingState\":...
{\"pendingOffset\":1261,\"pendingOperations\":[{\"type\":\"copy\",\"bucket\":\"c*******\",\"source\":\".temp-upload/Topic(***********4\"}]}}) 
At least one of the pre-conditions you specified did not hold.\n\nnonFatal:\n\n\nFatal TPs:\nSet(Set(TopicPartition(Topic(*******),0)))\n\tat io.lenses.streamreactor.common.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:61)\n\tat io.lenses.streamreactor.common.errors.ErrorHandler.handleError(ErrorHandler.scala:81)\n\tat io.lenses.streamreactor.common.errors.ErrorHandler.handleTry(ErrorHandler.scala:60)\n\tat io.lenses.streamreactor.common.errors.ErrorHandler.handleTry$(ErrorHandler.scala:41)\n\tat io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask.handleTry(CloudSinkTask.scala:64)\n\tat io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask.$anonfun$put$1(CloudSinkTask.scala:148)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)\n\tat io.lenses.streamreactor.metrics.Metrics$.withTimer(Metrics.scala:46)\n\tat io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask.put(CloudSinkTask.scala:212)\n\tat 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)\n\t... 11 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: fatal:\nerror writing file ({\"owner\":\"*****\",\"committedOffset\":592,\"pendingState\":{\"pendingOffset\":1261,\"pendingOperations\":[{\"type\":\"copy\",\"bucket\":\"****
....

com.google.api.client.googleapis.json.GoogleJsonResponseException: 412 Precondition Failed\nPOST *****++******. rt\n{\n \"code\" : 412,\n \"errors\" : [ {\n \"domain\" : \"global\",\n \"location\" : \"If-Match\",\n 
\"locationType\" : \"header\",\n \"message\" : \
"At least one of the pre-conditions you specified did not hold.\",\n \"reason\" : \"conditionNotMet\"\n } ],\n \"message\" : \"At least one of the pre-conditions you specified did not hold.\"\n}\n\tat com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)\n\tat 
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)\n\tat 
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)\n\tat 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:583)\n\tat 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:506)\n\tat 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:616)\n\tat 
com.google.cloud.storage.spi.v1.HttpStorageRpc.create(HttpStorageRpc.java:407)\n\t... 57 more\n"}],"type":"sink"}

We use following properties

gcs.part.retries: 10
gcs.retry.backoff.ms: 1000
connect.gcpstorage.http.retry.interval: "2000"
connect.gcpstorage.http.max.retries: "300"
errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: true

Do you have an idea why this might have happened?

The 412 error you’re encountering with GCP Storage indicates that the ETag is changing between requests. This can occur due to downstream processes or concurrent modifications to the blob. Based on your description, this appears to be a transient issue.

To address this, I recommend implementing the following configuration:

Recommended Configuration:

  1. Increase GCP SDK Retries: Configure the GCP SDK within the connector to allow for additional retry attempts. Reference: GCP Storage | Lenses Docs
  2. Configure Error Policy: Set the error policy to RETRY to enable the Connect framework to redeliver messages to the sink after the initial retry attempts are exhausted. Reference: Using Error Policies | Lenses Docs
  3. Avoid Silent Failures: Do not set errors.tolerance: all, as this instructs the Connect framework to fail silently, which prevents proper error visibility and handling. For more information, please refer to the Connect framework documentation.

Specific Configuration Parameters

connect.gcpstorage.error.policy=RETRY
connect.gcpstorage.max.retries=5
connect.gcpstorage.retry.interval=30000

This configuration will ensure that transient ETag conflicts are handled through automatic retries while maintaining proper error visibility and logging.