Time bounded drop duplicates

Supported in: Streaming

Drops duplicate rows from the input for given column subset, rows seen will expire after configured amount of event time. Rows that arrive late by an amount greater than the configured amount of event time will always be dropped. Partitions by keys specified. Each drop duplicates will be computed separately for distinct key column values.

Transform categories: Other

Declared arguments

  • Dataset - Dataset to deduplicate rows.
    Table
  • Key expiration time unit - Unit for amount of time to wait for data to deduplicate over.
    Enum<Days, Hours, Milliseconds, Minutes, Seconds, Weeks>
  • Key expiration time value - Value for the amount of time to wait for data to deduplicate over.
    Literal<Long>
  • optional Column subset - If any columns are specified only those will be used when determining uniqueness.
    Set<Column<AnyType>>
  • optional Key by columns - Columns on which to partition the input by key. Each drop duplicates will be computed separately in parallel for each distinct key value.
    Set<Column<AnyType>>

Examples

In all tables, the most recently streamed rows appear higher. Additionally, each example uses the time bounded drop duplicates node shown below:

An example of the time bounded drop duplicates node.

Incoming records update watermark

Input

row_orderdaytemperaturemeasurement_timestamp
4Monday10.42024-09-30T00:00:28
3Monday10.32024-09-30T00:00:18
2Monday10.22024-09-30T00:00:09
1Monday10.12024-09-30T00:00:00

Output

daytemperaturemeasurement_timestamp
Monday10.42024-09-30T00:00:28
Monday10.12024-09-30T00:00:00

Explanation: Records with temperatures of 10.2 and 10.3 arrive within the threshold time gap of 10 seconds relative to the previous record. As a result, these additional records are not emitted. The increasing timestamps mean that the watermark is updated for each of these records. This is why the record with a temperature reading of 10.3 is not emitted, despite arriving more than 10 seconds after the first record with a reading of 10.1. The last streamed row, with a temperature of 10.4, arrives more than 10 seconds after the last row that updated the watermark (the row with a reading of 10.3), and is therefore emitted.

Late events are dropped and do not update watermark

Input

row_orderdaytemperaturemeasurement_timestamp
4Monday10.42024-09-30T00:00:30
3Monday10.32024-09-30T00:00:00
2Monday10.22024-09-30T00:00:05
1Monday10.12024-09-30T00:00:20

Output

daytemperaturemeasurement_timestamp
Monday10.42024-09-30T00:00:30
Monday10.12024-09-30T00:00:20

Explanation: Records with temperatures of 10.2 and 10.3 arrive after the record with a temperature of 10.1, but have earlier timestamps. As a result, these records are dropped and do not advance the watermark. The last streamed row, with a temperature of 10.4, arrives after the threshold time gap of 10 seconds from the last row that updated the watermark (the row with a reading of 10.1), and is therefore emitted.