GCSStorage Sink, local file housekeeping (connect.gcpstorage.local.tmp.directory)

Does the GCPStorage Sink handle housekeeping for files accumulating in the /tmp folder, and is there a mechanism to delete these files after upload, or do they continue to pile up? Since we’re using memory storage, we’re concerned about running out of space.

Hi Umar,

There are 3 places in Writer.scala where the temporary files are deleted:
1. 2. 3.

Are you seeing files left behind?

Yep, seems its not removing them at all, we are using
kafka-connect-gcp-storage-8.1.31.zip

and in some cases we have files stuck there for days, since we are using ephemeral storage when the pod restarts it starts off with a clean slate.

Could you check your logs to see if there is any activity that is attempting to delete these files but failing?

root logger is set to info so it should pick up any exceptions but i dont see any logs come through

You can find instructions for setting the Kafka Connect log level here:

Once you’ve enabled debug logging and re-run the connector, please check the logs for the following lines:

Writer.resetState: Resetting state
Writer.resetState: New state

These messages appear around the delete operation, so they’ll help us understand what’s happening.

Let me know what you see around those log entries as well.

Thanks for looking into this, i have captured some logs. let me know if it helps. I can only see it trying to delete index files

329 [kafka-coordinator-heartbeat-thread | connect-connector-test-topic-jPt2q8zi] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=connector-consumer-connector-test-topic-jPt2q8zi-0, groupId=connect-connector-test-topic-jPt2q8zi] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=connector-consumer-connector-test-topic-jPt2q8zi-0, correlationId=295, headerVersion=2) and timeout 30000 to node 2147483646: HeartbeatRequestData(groupId='connect-connector-test-topic-jPt2q8zi', generationId=1, memberId='connector-consumer-connector-test-topic-jPt2q8zi-0-f73e4ac8-652c-4cc8-987b-33f7f7e72522', groupInstanceId=null)
361 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager -- [connector-test-topic-jPt2q8zi - 1 of 1] Retaining index file: .indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00004/00000000000000000005, Deleting files: (.indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00004/00000000000000000011)
361 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Writer -- [connector-test-topic-jPt2q8zi - 1 of 1] Writer.resetState: Resetting state Uploading(CommitState(TopicPartition(Topic(test-topic-jPt2q8zi),4),1750324877620,None,Some(1750325012082),0,0,Some(Schema{com.example.transactions.Transaction:STRUCT})),/tmp/connector-test-topic-jPt2q8zi - 1 of 1.e24a8942-3a62-4d08-8aaf-7e059b4c6dd27846881887028338121/test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id19/avro/0ab3f328-c5f0-4eb7-8de7-448b24d7eb08,Offset(5),1750324871748,1750324871748)
361 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Uploading -- state transition: Uploading => NoWriter
362 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Writer -- [connector-test-topic-jPt2q8zi - 1 of 1] Writer.resetState: New state NoWriter(CommitState(TopicPartition(Topic(test-topic-jPt2q8zi),4),1750324877620,Some(Offset(5)),Some(1750325012082),0,0,Some(Schema{com.example.transactions.Transaction:STRUCT})))
362 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Writing -- state transition: Writing => Uploading
362 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager -- [connector-test-topic-jPt2q8zi - 1 of 1] Writing index .indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00003/00000000000000000013 pointing to file test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/test-topic-jPt2q8zi(3_000000000013).avro
362 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageStorageInterface -- [connector-test-topic-jPt2q8zi - 1 of 1] Uploading file from data string (UploadableString(test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/test-topic-jPt2q8zi(3_000000000013).avro)) to Storage data-lake-test-lenses:.indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00003/00000000000000000013
426 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageStorageInterface -- [connector-test-topic-jPt2q8zi - 1 of 1] Completed upload from data string (UploadableString(test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/test-topic-jPt2q8zi(3_000000000013).avro)) to Storage data-lake-test-lenses:.indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00003/00000000000000000013
426 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageStorageInterface -- [connector-test-topic-jPt2q8zi - 1 of 1] GCP Uploading file from local UploadableFile(/tmp/connector-test-topic-jPt2q8zi - 1 of 1.e24a8942-3a62-4d08-8aaf-7e059b4c6dd27846881887028338121/test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/avro/ea2462e2-4e9e-4bc2-b9d8-3c558b62ade5) to Storage data-lake-test-lenses:test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/test-topic-jPt2q8zi(3_000000000013).avro
430 [kafka-coordinator-heartbeat-thread | connect-connector-test-topic-jPt2q8zi] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=connector-consumer-connector-test-topic-jPt2q8zi-0, groupId=connect-connector-test-topic-jPt2q8zi] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=connector-consumer-connector-test-topic-jPt2q8zi-0, correlationId=295, headerVersion=2): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
430 [kafka-coordinator-heartbeat-thread | connect-connector-test-topic-jPt2q8zi] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -- [Consumer clientId=connector-consumer-connector-test-topic-jPt2q8zi-0, groupId=connect-connector-test-topic-jPt2q8zi] Received successful Heartbeat response
564 [task-thread-connector-test-topic-jPt2q8zi-0] INFO io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageStorageInterface -- [connector-test-topic-jPt2q8zi - 1 of 1] Completed upload from local UploadableFile(/tmp/connector-test-topic-jPt2q8zi - 1 of 1.e24a8942-3a62-4d08-8aaf-7e059b4c6dd27846881887028338121/test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/avro/ea2462e2-4e9e-4bc2-b9d8-3c558b62ade5) to Storage data-lake-test-lenses:test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/test-topic-jPt2q8zi(3_000000000013).avro
638 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager -- [connector-test-topic-jPt2q8zi - 1 of 1] Retaining index file: .indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00003/00000000000000000013, Deleting files: (.indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00003/00000000000000000004)
639 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Writer -- [connector-test-topic-jPt2q8zi - 1 of 1] Writer.resetState: Resetting state Uploading(CommitState(TopicPartition(Topic(test-topic-jPt2q8zi),3),1750324877668,None,Some(1750325012362),0,0,Some(Schema{com.example.transactions.Transaction:STRUCT})),/tmp/connector-test-topic-jPt2q8zi - 1 of 1.e24a8942-3a62-4d08-8aaf-7e059b4c6dd27846881887028338121/test-topic-jPt2q8zi/dt=2025-06-20/event_id=event id75/avro/ea2462e2-4e9e-4bc2-b9d8-3c558b62ade5,Offset(13),1750324871752,1750324871752)
639 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Uploading -- state transition: Uploading => NoWriter
639 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Writer -- [connector-test-topic-jPt2q8zi - 1 of 1] Writer.resetState: New state NoWriter(CommitState(TopicPartition(Topic(test-topic-jPt2q8zi),3),1750324877668,Some(Offset(13)),Some(1750325012362),0,0,Some(Schema{com.example.transactions.Transaction:STRUCT})))
639 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.writer.Writing -- state transition: Writing => Uploading
639 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManager -- [connector-test-topic-jPt2q8zi - 1 of 1] Writing index .indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00004/00000000000000000009 pointing to file test-topic-jPt2q8zi/dt=2025-06-19/event_id=event id42/test-topic-jPt2q8zi(4_000000000009).avro
639 [task-thread-connector-test-topic-jPt2q8zi-0] DEBUG io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageStorageInterface -- [connector-test-topic-jPt2q8zi - 1 of 1] Uploading file from data string (UploadableString(test-topic-jPt2q8zi/dt=2025-06-19/event_id=event id42/test-topic-jPt2q8zi(4_000000000009).avro)) to Storage data-lake-test-lenses:.indexes/connector-test-topic-jPt2q8zi/test-topic-jPt2q8zi/00004/00000000000000000009
659 [KafkaBasedLog Work Thread - config] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=connect-cluster-configs, groupId=connect-cluster] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=15, clientId=connect-cluster-configs, correlationId=292, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=179727542, responses=[], nodeEndpoints=[])
660 [KafkaBasedLog Work Thread - offset] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=connect-cluster-offsets, groupId=connect-cluster] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=15, clientId=connect-cluster-offsets, correlationId=289, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=139262182, responses=[], nodeEndpoints=[])
660 [KafkaBasedLog Work Thread - config] DEBUG org.apache.kafka.clients.FetchSessionHandler -- [Consumer clientId=connect-cluster-configs, groupId=connect-cluster] Node 1 sent an incremental fetch response with throttleTimeMs = 0 for session 179727542 with 0 response partition(s), 1 implied partition(s)
660 [KafkaBasedLog Work Thread - config] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch -- [Consumer clientId=connect-cluster-configs, groupId=connect-cluster] Removing pending request for fetch session: 179727542 for node: 1767be409efe:9092 (id: 1 rack: null)
660 [KafkaBasedLog Work Thread - offset] DEBUG org.apache.kafka.clients.FetchSessionHandler -- [Consumer clientId=connect-cluster-offsets, groupId=connect-cluster] Node 1 sent an incremental fetch response with throttleTimeMs = 0 for session 139262182 with 0 response partition(s), 25 implied partition(s)
660 [KafkaBasedLog Work Thread - offset] DEBUG org.ap```