InfluxDB

From Leo's Notes
Last edited on 11 December 2023, at 17:27.

InfluxDB is a time series database written in Go. This page contains notes relating to InfluxDB 2.x. My older notes on InfluxDB 1.x has been moved.

Introduction[edit | edit source]

InfluxDB 2.x integrates the TICK stack (Telegraf, InfluxDB, Chronograf, Kapacitor) into one binary.

Migration considerations[edit | edit source]

Before you mgirate to InfluxDB 2.x, be aware of a few things:

  1. You can't run InfluxQL. All your existing dashboards based on InfluxQL will need to be rewritten
  2. InfluxQL is nice and might make more sense than the SQL-based language, but it's completely different and converting SQL to this functional-like syntax can be tricky. You should probably learn flux before you do the migration.
  3. Your telegraf configs will need some tweaking. The database defaults to collectd and requires a token
  4. Everything requires a token or authentication. If you use the line protocol with a shell script somewhere, you will need to update these scripts.

Installation and migration from InfluxDB 1.x[edit | edit source]

I had an existing InfluxDB 1.x dataset which I want to migrate to InfluxDB 2.x. The upgrade is automatically triggered by the official docker image if it detects 1.x data but no 2.x data. You must also mount /var/lib/influxdb2 as that will be where the migrated data as well as any new data will be written to.

I used the following docker-compose when I first did the migration.

services:
  influxdb:
    image: influxdb:2.0.7
    hostname: influxdb
    networks:
      - influxnet
      - traefik
    expose:
      - "8086"
    restart: always
    volumes:
      - /var/volumes/influx/influxdb:/var/lib/influxdb
      - /var/volumes/influx/influxdb2:/var/lib/influxdb2
      - /var/volumes/influx/config/influxdb.conf:/etc/influxdb/influxdb.conf
      - /var/volumes/influx/types.db:/usr/share/collectd/types.db
    environment:
      DOCKER_INFLUXDB_INIT_USERNAME: admin
      DOCKER_INFLUXDB_INIT_PASSWORD: AdminPassword1
      DOCKER_INFLUXDB_INIT_ORG: home
      DOCKER_INFLUXDB_INIT_BUCKET: initial
      DOCKER_INFLUXDB_INIT_MODE: upgrade

Once the migration is complete, you can remove the INIT environment variables and the old influxdb volume.

Continuous queries to tasks[edit | edit source]

If you use continuous queries to down-sample older data, you will have to covert these into tasks using Flux.

See: https://docs.influxdata.com/influxdb/v2.0/upgrade/v1-to-v2/migrate-cqs/

Flux[edit | edit source]

Flux is the new query language introduced in InfluxDB v2. Its syntax is basically a list of functions that you apply to the data.

See: https://www.sqlpac.com/en/documents/influxdb-flux-language-advanced-features.html

Cheat sheet[edit | edit source]

Task description Query
Selecting data from(bucket: "mybucket")
Select a time range Relative:
|> range(start: -1h)

Absolute:

|> range(start: -1h, stop: -10m)

Time range:

|> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)

When dealing with Grafana, use this for the time range window:

|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
Filtering data By tag.

Filter on _measurement to filter by a specific measurement (Eg. temperature, account info, etc.):

|> filter(fn: (r) => r["_measurement"] == "account")

Filter on specific tag keys that your data is labeled with. (Eg. If account measurements are tagged with usernames, so I could filter with username == 'leo')

|> filter(fn: (r) => r["username"] == "leo")

By field:

If you need a specific field by name. Eg. number of VMs by a user.

|> filter(fn: (r) => r["_field"] == "vm_count")

If you want to remove specific values out. Eg. vm_count less than 4.

|> filter(fn: (r) => r["_value"] > 4)
Complex filters Filters take in a lambda function. If you want to add additional conditions, you can either pipe multiple filters together (which ends up as a conditional AND) or rewrite the filter function. Eg:
|> filter(fn: (r) => r["_measurement"] == "account" and r["_field"] == "vm_count")

You can use regular expressions as well with filters.

|> filter(fn: (r) => r.hostname =~ /^host[1-8]$/ and r.hostname !~ /^host[5-6]$/ )
Rename column Rename columns.
|> rename(columns: {new: "old"})
|> rename(columns: {_value: "average", _time: "when"})
Drop a column Remove columns.
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
Yielding data Use yield() to yield the data. This is a fancy way of labeling the data you have so far in the pipe so that you can reference it later on. For simple queries using a single pipe, this likely isn't very useful. It will still name the resulting table output, however.
... get data ...
Windowing data |> aggregateWindow(every: 10m, fn: mean, column: "colname")

Null values are created by default. To disable null values from being created, set createEmpty to false.

|> aggregateWindow(every: 10m, fn: mean, createEmpty: false)

Pivot data Pivoting can be used to make multiple values into columns. This is useful if you have 2 or more measurement values (eg. CPU used and CPU available) which you want to do math on into another field.

|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

Doing math on data Use the map function to compute columns. The function you pass in can also do math. Eg:

|> map(fn: (r) => ({_value: r.memoryallocated / r.memorytotal}))

The with operator updates a column if it exists or creates a new column in the output table.

|> map(fn: (r) => ({r with _value: r.memoryallocated / r.memorytotal}))

Aggregate data with sum After grouping specific columns, you can aggregate the grouped value using sum. For example, to count the number of times something occurred in each windowPeriod:
|> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: true)
|> fill(column: "_value", value: 0.0)
|> group(columns: ["_time"])
|> sum(column: "_value")
Set Sets a new field
|> set(key: "new_field", value: "Hi!")
Histogram Creates a cumulative histogram. You must specify the bins manually:

|> histogram(bins: [0.0, 0.016666667, 0.25, 0.5, 1.0, 3.0, 6.0, 12.0, 24.0, 48.0, 99999.0]) To change this into a non-cumulative histogram (such as if you're trying to graph Flux data on Grafana), you can pass the histogram data through difference().

Get list of fields[edit | edit source]

Useful for Grafana variables, where you want to have the user be able to select specific sets of fields.

from(bucket: "cloud")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "account")
  |> distinct(column: "name")
  |> keep(columns: ["_value"])
  |> group()

Determine your version of Flux[edit | edit source]

Run this query and use 'view raw data'. If certain features on aren't working, check if it is supported by your version of Flux. You may need to upgrade InfluxDB to make use of the newer features that are listed on their documentation.

import "array"
import "runtime"

array.from(rows: [{version: runtime.version()}])

Tasks[edit | edit source]

Recovering access tokens[edit | edit source]

If you have access to the InfluxDB web interface but have lost the API access key, you can recover the access keys by:

  1. Log in to the web interface
  2. Generate a new 'all access' API key. This will be used temporarily. Copy it.
  3. Log in to the InfluxDB server or enter the InfluxDB container and then run:
    # influx auth list -t $newly-generated-token
    

You should be able to see all the tokens that have been generated including the secret.

Once you're done, you can delete the newly generated API key.

Importing data from one bucket to another[edit | edit source]

I had Telegraf data written to a Telegraf database. However, InfluxDB 2.x uses the collectd bucket. To import old Telegraf data to the new bucket, you can use the following flux query.

from(bucket: "telegraf/autogen")
  |> range(start: -30d)
  |> to(bucket: "collectd/autogen")

However, I ran into this error: input field "free" on measurement "disk" is type integer, already exists as type float dropped. As a workaround, I made a filter which converted values to floats for this particular measurement:

from(bucket: "telegraf/autogen")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "disk")
  |> map(fn:(r) => ({ r with x: float(v: r.x) }))
  |> to(bucket: "collectd/autogen")

Deleting data between a time range[edit | edit source]

Use influx delete to delete data. You can specify the range you want to delete as well as the predicate to limit what measurement gets deleted. The following will delete everything from the swap measurement between July 10, 2023 till the current time.

# influx delete \
  --org $OrganizationName \
  -t $token$ \
  --bucket "arc-telegraf-longtermtesttest" \
  --start '2023-07-10T00:00:00Z' \
  --stop $(date +"%Y-%m-%dT%H:%M:%SZ") \
  --predicate '_measurement="swap"'

Move buckets between InfluxDB instances[edit | edit source]

If you have multiple InfluxDB instances and you want to consolidate the buckets into one single InfluxDB server, the best option would be to backup the bucket and then restore it on the destination server.

## We'll move 'telegraf/autogen' from one instance to another

## On the source server
## If you don't know what the admin token is, you can get it by listing all  tokens that are available:
# influx auth list

# influx backup --bucket telegraf/autogen -t AdminToken-xxxxxxx== /backups/backup-telegraf

## On the destination server
## Get the admin token on the destination server (if you don't already know the token)
# influx auth list

## Restore the bucket with a different name on the destination server
# influx restore --bucket telegraf/autogen --new-bucket arc-telegraf -t AdminToken-xxxxxxx== /backups/backup-telegraf

Create a new variable[edit | edit source]

Unlike in Grafana where variables are part of a dashboard, in InfluxDB, variables are its own entity which needs to be created separately. To do so, go to settings -> variables. A variable can be defined as a query, csv, or key-pair map. Some common variables from queries can be found at https://docs.influxdata.com/influxdb/v2.0/visualize-data/variables/common-variables/.

To create a new 'hosts' variable which contains all Telegraf hosts pushing metrics to InfluxDB, use this query:

import "influxdata/influxdb/schema"
schema.tagValues(bucket: "collectd/autogen", tag: "host")

When creating a dashboard, you can inject this hosts variable in the query editor or reference it from the 'v' key. That is, v.hosts.

Pushing data with curl[edit | edit source]

You can push data into InfluxDB using curl with the line protocol.

set -a
InfluxServer="https://influxdb.example.com"
InfluxOrg="MyOrg"
InfluxToken="Token-blahblahblah=="
InfluxBucket="my-bucket"
set +a

function curl_to_influx() {
        curl -k -s --header "Authorization: Token $InfluxToken" \
                --retry 5 \
                --header "Content-Type: text/plain; charset=utf-8" \
                --header "Accept: application/json" \
                -X POST "$InfluxServer/api/v2/write?org=$InfluxOrg&bucket=$InfluxBucket" --data-binary @-
}

echo "measurement,host=%s,sensor=%s,unit=%s temperature=%f %d" | curl_to_influx

Grafana change series name from _value[edit | edit source]

By default, grafana will show series names as something like _value {_start="2022-06-01 17:33:54.89 +0000 UTC", _stop="2022-06-21 17:33:54.891 +0000 UTC", state="COMPLETED"}. That's hard to read and gross.

To give a series an appropriate name, use set or map to assign an appropriate name to a field. For example, I could add   |> set(key: "name", value: "1 to 5 hours"). This adds a new field to the data which we can tell Grafana to use as the series' name. In Grafana, apply a transformation under 'Labels to fields' and assign your custom field as the label for the series. In my example, set 'name' as the field to use.

Pushing network counters to InfluxDB using curl[edit | edit source]

You can use the following script to push the tx/rx bytes/packet port counters to InfluxDB. This is similar to what Telegraf does with the net input but is suitable on systems that can't use Telegraf (such as low-end embedded systems). Use a cronjob to run this script periodically.

#!/bin/bash

set -a
InfluxServer="https://influxdb.example.com"
InfluxOrg="home"
InfluxToken="****"
InfluxBucket="collectd/autogen"
set +a

function curl_to_influx() {
        curl -k -s --header "Authorization: Token $InfluxToken" \
                --retry 5 \
                --header "Content-Type: text/plain; charset=utf-8" \
                --header "Accept: application/json" \
                -X POST "$InfluxServer/api/v2/write?org=$InfluxOrg&bucket=$InfluxBucket" --data-binary @-
}

cat /proc/net/dev | tail -n+3 | tr -d : \
        | while read interface rx_bytes rx_packets rx_errors rx_drops foo foo foo foo tx_bytes tx_packets tx_errors tx_drops foo ; do
                echo "net,host=$HOSTNAME,interface=$interface bytes_recv=${rx_bytes}i,packets_recv=${rx_packets}i,drop_in=${rx_drops}i,err_in=${rx_errors}i,bytes_sent=${tx_bytes}i,packets_sent=${tx_packets}i,drop_out=${tx_drops}i,err_out=${tx_errors}i"
done | curl_to_influx