KCQL function doesn't work in influx db sink connector

When I’m trying to use following query on influx db sink connector I’m getting below error
Query:
INSERT INTO orders_model1 SELECT id, name, order_code, COALESCE(order_type, ‘’) AS order_type FROM orders WITHTIMESTAMP sys_time()

Error

java.lang.IllegalArgumentException: Invalid syntax.failed to parse at line 1 due to mismatched input '(' expecting FROM
              at com.datamountaineer.kcql.Kcql.parse(Kcql.java:897)
              at com.datamountaineer.streamreactor.connect.config.Helpers$$anonfun$3.apply(Helpers.scala:101)
              at com.datamountaineer.streamreactor.connect.config.Helpers$$anonfun$3.apply(Helpers.scala:101)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
              at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
              at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
              at com.datamountaineer.streamreactor.connect.config.Helpers$.checkInputTopics(Helpers.scala:101)
              at com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector.start(InfluxSinkConnector.scala:65)
              at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
              at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
              at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
              at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
              at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
              at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: failed to parse at line 1 due to mismatched input '(' expecting FROM
  at com.datamountaineer.kcql.Kcql$1.syntaxError(Kcql.java:403)
  at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)
  at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)
  at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:299)
  at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:124)
  at com.datamountaineer.kcql.antlr4.ConnectorParser.select_clause_basic(ConnectorParser.java:1436)
  at com.datamountaineer.kcql.antlr4.ConnectorParser.insert_from_clause(ConnectorParser.java:704)
  at com.datamountaineer.kcql.antlr4.ConnectorParser.stat(ConnectorParser.java:207)
  at com.datamountaineer.kcql.Kcql.parse(Kcql.java:895)
  ... 21 more
Caused by: org.antlr.v4.runtime.InputMismatchException
  at org.antlr.v4.runtime.DefaultErrorStrategy.recoverInline(DefaultErrorStrategy.java:453)
  at org.antlr.v4.runtime.Parser.match(Parser.java:206)
  at com.datamountaineer.kcql.antlr4.ConnectorParser.select_clause_basic(ConnectorParser.java:1419)
  ... 24 more

Lenses KCQL is a SQL-like DSL to configure Kafka connectors. However, it is not full-blown SQL. In this case, you use a function: COALESCE, but functions are not supported.

KCQL supports only field selection and metadata settings:

INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]

To work around your challenge, for example, you can rely on Kafka Connect SMT, but you’ll have to write one since there is no support in the provided transformations. Alternatively, you can transform your topic data stream and use, for example, Lenses SQL processors to prepare the payload before you sink it. You can run a SQL processor:

INSERT INTO orders_fixed
SELECT STREAM 
       id
      , name
      , order_code
      , COALESCE(order_type, ‘’) AS order_type 
FROM orders

And then your connector config KCQL would be:

NSERT INTO orders_model1 SELECT * FROM orders WITHTIMESTAMP sys_time()
2 Likes