Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Record Timestamps into Avro Logical Date Type #665

Open
schrieveslaach opened this issue Sep 9, 2024 · 1 comment
Open

Convert Record Timestamps into Avro Logical Date Type #665

schrieveslaach opened this issue Sep 9, 2024 · 1 comment

Comments

@schrieveslaach
Copy link

Is your feature request related to a problem? Please describe.

In avro there is a logical type date:

A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).

And it should be possible to parse fields as dates and convert it into a compliant Avro record.

Describe the solution you'd like

It would be great if DateFilter provides a configuration option to get access to temporal fields to be able to parse a date into a Avro compatible logical date int.

Describe alternatives you've considered

I tried to convert timestamp into “number of days from the unix epoch, 1 January 1970” but I couldn't find a way express that as SCeL.

@schrieveslaach
Copy link
Author

I found a way to convert the timestamp to an int with {{ timestamp_diff( 'DAYS', $.dateAsTimestamp, 0 ) }}:

    "filters.DateParser.type": "io.streamthoughts.kafka.connect.filepulse.filter.DateFilter",
    "filters.DateParser.field": "$.dateAsStr",
    "filters.DateParser.formats": "yyyy-MM-dd",
    "filters.DateParser.target": "$.dateAsTimestamp",
    "filters.DateAsDays.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
    "filters.DateAsDays.field": "$.date",
    "filters.DateAsDays.value": "{{ timestamp_diff( 'DAYS', $.dateAsTimestamp, 0 ) }}",
    "filters.DateAsInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
    "filters.DateAsInt.field": "date",
    "filters.DateAsInt.to": "INTEGER",

However, the processing fails than with the schema conversion error:

connect  | io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException: Failed to convert data into Kafka Connect record at offset [position=2100, rows=24, timestamp=1725872898509] from object-file: [uri=file:/opt/feeds/d943719c-7866-491e-b4be-b9c8269788c1.csv, name='d943719c-7866-491e-b4be-b9c8269788c1.csv', contentLength=2100, lastModified=1725872893757, contentDigest=[digest=1514060459, algorithm='CRC32'], userDefinedMetadata={system.inode=12725087, system.hostname=connect}]'
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.buildSourceRecord(FilePulseSourceTask.java:331)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.lambda$poll$0(FilePulseSourceTask.java:211)
connect  |      at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
connect  |      at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1033)
connect  |      at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
connect  |      at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
connect  |      at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
connect  |      at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
connect  |      at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
connect  |      at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:212)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:481)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:78)
connect  |      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
connect  |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  |      at java.base/java.lang.Thread.run(Thread.java:829)
connect  | Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema "org.apache.kafka.connect.data.Date" with type INT32: class java.lang.Integer for field: "date"
connect  |      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
connect  |      at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.internal.ConnectSchemaMapper.toConnectStruct(ConnectSchemaMapper.java:261)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.internal.ConnectSchemaMapper.map(ConnectSchemaMapper.java:202)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord.lambda$toSourceRecord$0(TypedFileRecord.java:94)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.internal.InternalSourceRecordBuilder.build(InternalSourceRecordBuilder.java:42)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord.toSourceRecord(TypedFileRecord.java:102)
connect  |      at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.buildSourceRecord(FilePulseSourceTask.java:310)
connect  |      ... 21 more
``

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant