Duplicate events outputted in Lenses SQL Processor

I’m having an issue where one of my sql processors ‘seems’ to be reprocessing or re-sending events to one of my topics approximately once every two minutes or so.

Duplicate events are identical, including timestamp. All that changes is the partition.offset, and that seems to incremented by 1 each repeat event.

When I pull the sql portion out and run it separately under SQL Studio, I get the expected result, but only one copy of the event.

I initially added two new events to the topic, one at approx 4:30 pm and the other at 8pm. Both began replicating soon after they were inserted, both continue to replicate. Only one of my processors is behaving in this fashion.

Could this be configuration issue? Where else should I look.

1 Like

Hi @wkolbenschlag

Can you please share the Processor SQL code and the input topics schema?

I was able to get some information out of the processor logs:

> 2023-05-11 18:21:48,387 ERROR [o.a.k.s.p.i.RecordCollectorImpl] [kafka-producer-network-thread | lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90-StreamThread-1-producer] stream-thread [lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90-StreamThread-1] task [0_1] Error encountered sending record to topic lsql-evidence_typer_rules-0331129230-TableSource-0-storage-changelog for task 0_1 due to:
> **org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for lsql-evidence_typer_rules-0331129230-TableSource-0-storage-changelog-1:120000 ms has passed since batch creation**
> **The broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response.**
> **Consider overwriting `** **[max.block.ms](http://max.block.ms/)** **` and /or `** **[delivery.timeout.ms](http://delivery.timeout.ms/)** **` to a larger value to wait longer for such scenarios and avoid timeout errors**
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for lsql-evidence_typer_rules-0331129230-TableSource-0-storage-changelog-1:120000 ms has passed since batch creation
> 2023-05-11 18:21:48,387 WARN [o.a.k.s.p.i.StreamThread] [lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90-StreamThread-1] stream-thread [lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90-StreamThread-1] Detected the states of tasks [0_1] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_1] are corrupted and hence needs to be re-initialized
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:222)
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
> at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1418)
> at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)
> at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)
> at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198)
> at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758)
> at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
> at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:384)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2023-05-11 18:21:48,507 INFO [o.a.k.s.s.i.RocksDBTimestampedStore] [lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90-StreamThread-1] Opening store TableSource-0-storage in regular mode
> 2023-05-11 18:21:48,713 INFO [o.a.k.s.KafkaStreams] [lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90-StreamThread-1] stream-client [lsql-evidence_typer_rules-0331129230-2f2e7625-6bee-4456-8ad3-199cfc4b6f90] State transition from RUNNING to RUNNING
SQL Processor code:
SET defaults.topic.autocreate=true;
SET defaults.topic.cleanup.policy=compact;
SET error.policy='dlq';
SET dead.letter.queue = 'dlq_evidence_typer_rules';



insert into evidence_typer_rules
STORE VALUE AS AVRO
  SELECT TABLE
    objective,batch,device,device_type, device_id,
    objective_id, batch_id, package_id, filename,
    folder_type, location, ext,
        Case
                WHEN lower(folder_type) = 'evidence'
                    AND COALESCE (ext, 'none') in ('tz', 'tar', 'zip', 'bzip')
                    then 'RULE 1'
                WHEN lower(folder_type) = 'evidence'
                    AND lower(device_type) in ('phone', 'tablet','sim')
                    then 'RULE 2'
                WHEN lower(folder_type) = 'evidence'
                    AND (COALESCE (lower(ext), 'none') in ('bin', 'dd')
                     OR COALESCE (lower(ext), 'none') like 'e%%'
                     OR COALESCE (lower(ext), 'none') like 'l%%'
                    )
                    then 'RULE 3'
                WHEN lower(folder_type) = 'evidence'
				   AND COALESCE (ext, 'none') NOT in ('tz', 'tar', 'zip', 'bzip')
                    then 'RULE 4'
                WHEN lower(folder_type) in ('case archive', 'reports')
                    then 'RULE 5'
                else 'ERROR'
        END as RULE
        from evidence_typer_format
        ;

We changed the SELECT TABLE to a SELECT STREAM and all appears to be working now, monitoring.
Throughput appears to be much faster as well.