Hello Team!
Recently we tried your HTTP connector and we’re generally satisfied with how it’s performing. However, we have one query regarding maintaining the topic offset by the connector. When we restart the connector, we see that the connector is processing all messages in the topic from the earliest offset. Is it normal behaviour? Can it be configured?
Out config:
name=lenseshttp
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
tasks.max=2
topics=currency-changes
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
consumer.auto.offset.reset=earliest
connect.http.config={"method":"Post","endpoint":"http://arraya.local/updateCurrency","content":"{"data":[{value}]}","batch":{"batchCount":1}}
Thanks in advance!
Thank you for using our HTTP connector and for your question!
The behavior you’re observing is due to the consumer.auto.offset.reset=earliest
setting in your configuration. This setting dictates that if the consumer cannot find a valid offset (or if no committed offset exists), it will read from the earliest offset available in the topic. This is the default behavior in Kafka if the earliest
option is specified.
To change this, you can remove or adjust this property:
-
Remove the consumer.auto.offset.reset=earliest
property altogether. This will allow the connector to respect the committed offsets, so when the connector restarts, it will resume from the last committed offset in the topic.
-
Set it to latest
if you want the connector to start reading from the latest offset in the topic if no previous offsets are found.
Your configuration would look like this after the change:
name=lenseshttp
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
tasks.max=2
topics=currency-changes
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
# Removed or adjusted to allow resuming from the last committed offset
connect.http.config={"method":"Post","endpoint":"http://arraya.local/updateCurrency","content":"{"data":[{value}]}","batch":{"batchCount":1}}
After making this adjustment, the connector will start from the committed offsets, allowing smooth restarts without processing all messages from the beginning.
Let us know if you have any more questions!
1 Like