2

Could you please help me with the InfluxDB 2 Flux query syntax to build a windowed query with a custom aggregate function.

I went through the online docs, but they seem to be lacking examples on how to get to the actual window content (first, last records) from within the custom aggregate function. It also doesn't immediately describe the expected signature of the custom functions.

I'd like to build a query with a sliding window that would produce a difference between the first and the last value in the window. Something along these lines:

difference = (column, tables=<-) => ({ tables.last() - tables.first() })

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: 1mo, fn: difference, column: "_value", timeSrc: "_stop", timeDst: "_time", createEmpty: true)
  |> yield(name: "diff")

The syntax of the above example is obviously wrong, but hopefully you can understand, what I'm trying to do.

Thank you!

2 Answers 2

3

I had a similar circumstance where I needed a custom aggregate function to be used with aggregateWindow() and could not make any sense out of the influx docs and nothing on the internet helped. I'm posting some example generic working code here which isn't exactly to do with yours, but might be helpful as a reference for someone else. As far as I can tell, this might be the ONLY complete example on the entire internet 🤦.

Note a couple things:

  1. Everything outside of the custom agg function is as-normal.
  2. In order to get the agg function to work, you must supply the default , column="_value"
    1. otherwise you'll get the error missing required argument column (argument fn)
import "generate"

example = generate.from(
    count: 100,
    fn: (n) => (n + 1),
    start: 2021-01-01T00:00:00Z,
    stop: 2022-01-01T00:00:00Z,
)

alt_sum = (tables=<-, column="_value") => tables
  |> reduce(
        fn: (r, accumulator) => (
        {
            sum: (r._value) + accumulator.sum
        }),
        
        identity: {sum: 0},
    )
  // optional: 
  |> map(fn: (r) => ({r with _value: r.sum}))

example
  |> range(start: 2021-01-01T00:00:00Z, stop: 2022-01-01T00:00:00Z)
  |> aggregateWindow(every: 1mo, fn: alt_sum, createEmpty: false)

Sign up to request clarification or add additional context in comments.

Comments

2

Came up with the following. It works at least syntactically:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(
      every: 1mo, 
      fn: (column, tables=<-) => tables |> reduce(
            identity: {first: -1.0, last: -1.0, diff: -1.0},
            fn: (r, acc) => ({
                first:
                    if acc.first < 0.0 then r._value
                    else acc.first,
                last:
                    r._value,
                diff:
                    if acc.first < 0.0 then 0.0
                    else (acc.last - acc.first)
            })
          )
          |> drop(columns: ["first", "last"])
          |> set(key: "_field", value: column)
          |> rename(columns: {diff: "_value"})
      )
  |> yield(name: "diff")

The window is not really sliding though.

The same for the sliding window:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> window(every: 1h, period: 1mo)
  |> reduce(
    identity: {first: -1.0, last: -1.0, diff: -1.0},
    fn: (r, acc) => ({
        first:
            if acc.first < 0.0 then r._value
            else acc.first,
        last:
            r._value,
        diff:
            if acc.first < 0.0 then 0.0
            else (acc.last - acc.first)
    })
  )
  |> duplicate(column: "_stop", as: "_time")
  |> drop(columns: ["first", "last"])
  |> rename(columns: {diff: "_value"})
  |> window(every: inf)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.