ELK streams

  • Last Updated 3/31/2023, 12:34:01 PM UTC
  • About 9 min read

Plugin info

name: es-query-metrics

Streams metrics from Elasticsearch queries. Because documents are not indexed in real-time, there exists a delay between the time that they are generated and the time before they can appear in index searches. To overcome this, the plugin maintains a per stream state so that at each run it searches for documents within overlapping time windows relative to the previous runs. Since these time windows are overlapping, the plugin de-duplicates documents seen during previous runs. The size and offset of the search window is controlled through these parameters:

  • delay, an estimate for the amount of time the index is behind current time. Each time the plugin runs the search window is offset by this amount. This allows you to not search for documents in a time period that is bound to be volatile - i.e. you are expecting that more documents will arrive in this time period.
  • late_documents_window, how far back in time to look for documents that may have not been indexed during previous runs of this plugin. This will cause the plugin to buffer document _ids processed for this window of time. Obviously this will increase the memory and disk space requirements proportionally to the number of documents in the window.

Note, if the end time (now - delay - late_documents_window) for the search window is after the start time of the last run's search window, then the plugin will extend the end time to the start time of the last run's start time.

diagram

# Scraping requirements

  • Must have a timestamp field (default @timestamp)
  • Must have at least one tiebreaker field. When scrolling through large result sets, these fields are used to uniquely identify between documents with the same timestamp. For indexes populated by filebeats, the fields log.path, log.offset are good candidates but you should ensure that its value is always populated. Additionally the tiebreaker fields cannot be of type text. If your documents do not contain any fields that can be used as tiebreakers, you can create one by cloning the document _id field into a new field using a set ingest processor (opens new window).
  • All above fields must be searchable (indexed), sortable and their type not text
  • You cannot use the metadata field _id as a tiebreaker

# Commands

  • Is a document filed indexed (searchable and sortable)? execute the below query and check that your field is part of the index mappings and doc_value, index fields are not set to false

    GET /<index_name>/_mapping
    
  • Create an _id clone field

    PUT /<index_name>-*/_mapping
    {
      "properties": {
        "_id_clone": {
          "type": "keyword"
        }
      }
    }
    
  • Create a pipeline to clone _id into _id_clone

    PUT _ingest/pipeline/set_id_clone
    {
      "description": "clones _id into _id_clone",
      "processors": [
        {
          "set": {
            "field": "_id_clone",
            "value": "{{_id}}"
          }
        }
      ]
    }
    
  • List index settings

    GET /<index_name>-*/_settings
    
  • Set the set_id_clone pipeline as the index default pipeline

    PUT /<index_name>-*/_settings
    {
      "default_pipeline": "set_id_clone"
    }
    

# Prerequisites

  • Elasticsearch version 7.x
  • It is important that the field values stored in ES indexes have data types that are compatible with your query filters and metric values that you emit. That is, you cannot expect a string value in an index to work correctly with a numeric query filter or to be automatically converted to a number when aggregating. For more details see:

# Events

None

# Metrics

User defined

# Configuration

The plugin is configured with a yaml file. This section describes the configuration settings.

Important Sensitive data should be stored inside the agent secrets store and referenced in this file using notation ${secret key}. For example https://${my-api/user}:${my-api/passwd}@127.0.0.1:9200/v1.

  • url (Required, supports secret references)
    List of one or more ES cluster URLs. For example https://127.0.0.1:9200.

  • gzip (Optional, default false)
    Instructs ES to gzip responses

  • username (Optional, supports secret references)
    If using HTTP basic-auth set this to the username

  • password (Optional, supports secret references)
    If using HTTP basic-auth set this to the password

  • tls_ca_path (Optional)
    The path to the server TLS CA root certificate path if using self signed certs

  • tls_cert_path (Optional)
    The path to the client certificate path if using mTLS

  • tls_key_path (Optional)
    The path to the client key path if using mTLS

  • tls_key_passphrase (Optional, supports secret references)
    The passphrase for TLS key in tls_key_path

  • tls_insecure_skip_verify (Optional default false)
    Set to true if server TLS cert should always be trusted

  • streams (Required)
    List of stream configuration for the streams to pull from ES. See next section for details

# Stream Settings

A metric stream is controlled with the following parameters

  • id (Required)
    The unique id for this stream across all plugin configurations. The system will not verify this for you and will not warn you! Allows any characters except /?*:;,{}\. Max length is 128.

  • timestamp_field (Optional, default @timestamp)
    The document field to use as the timestamp for the document.

  • delay (Optional, default 3m)
    The amount of time the stream is behind current time. Each time the plugin runs it does not consider documents with timestamp between now and delay.

  • late_documents_window(Optional, default 30m)
    How far back in time to look for documents that may have not been indexed during previous runs of this plugin. This will cause the plugin to buffer document _ids processed for this window of time. Obviously this will increase the memory and disk space requirements proportionally to the number of documents in the window.

  • index (Required)
    List of index names or wildcards to stream from. For example kibana_sample_data_logs, kibana_sample_data_*. Only open indexes are searched.

  • scroll_size (Optional, default 1000)
    The maximum number of hits to fetch in ES scroll batch.

  • filters (Optional)
    List of filters in ES Query DSL format defining the qualifying stream documents. The resulting query will AND the supplied filters. See the ES query DSL reference guide (opens new window) for how to construct these filters.

    Examples:

    1. All requests with bytes between 10,000 and 15,000

      filters:
       - range:
           bytes:
             gte: 10000
             lt: 15000
      
    2. All requests from clientip 69.141.33.21

      filters:
       - term:
          clientip:
            value: "69.141.33.21"
      
    3. All requests from clientip 69.141.33.21 AND bytes between 10,000 and 15,000

      filters:
        - term:
            clientip:
              value: "69.141.33.21"
        - range:
            bytes:
              gte: 10000
              lt: 15000
      
  • aggregation (Optional)
    By default each document from the ES stream will produce one metric. If aggregation is configured then the documents in the stream will be aggregated and one or more events will be produced based on the configured groups. The available aggregators are count, sum, avg, max.

    • count|sum|avg|max (Required)
      • field (Required) the field name to aggregate its values
      • groups (Optional) list of field name to aggregate by
  • metric (Required)
    Defines the metric to produce from the stream documents

    • name (Required) the name of the metric. Example http/logs/total_bytes
    • value_path (Optional) the document field to use as the value for the metric. If aggregation is used, this setting is not used.
    • source_path (Optional) the document field to use as the source value for the metric. If empty the hostname where this plugin in running will be used
    • datatype (Optional) allows for type conversion of value from elasticsearch datatype. Available options: number, string, bool
    • dimensions (Optional) the dimensions for the metric. A list of dimension names with optional ES document paths
      • [string] (Required) the name for the dimension
        • path (Optional) the ES document path to read the dimension value from. If not provided or empty then dimension name is used
    • append_if_missing (Optional) appends missing metrics to stream. Allowed only when aggregation is used
      • value (Required) the metric value for the generated metrics
      • dimensions (Required) list of metric dimensions by name to append if missing stream

# Examples

# Multiple streams

url:
  - http://127.0.0.1:9200
gzip: true
streams:
  - id: stream-1
    timestamp_field: "@timestamp"
    delay: "150s"
    late_documents_window: 480h
    index:
      - "kibana_sample_data_logs"
    filters:
      - range:
          bytes:
            gte: 10000
      - terms:
          geo.src: [GR, IN, MG]
    metric:
      name: http/request/bytes
      value_path: "bytes"
      dimensions:
        geosrc:
          path: geo.src
        clientip:
  - id: stream-2
    delay: "150s"
    late_documents_window: 480h
    index:
      - "kibana_sample_data_logs"
    filters:
      - term:
          clientip:
            value: 69.141.33.21
    aggregation:
      sum:
        field: bytes
        groups:
        - clientip
    metric:
      name: http/request/total_bytes
      value_path: "bytes"
      dimensions:
        clientip:

# Aggregation

url:
  - http://127.0.0.1:9200
gzip: true
streams:
    - id: stream-1
      delay: "150s"
      late_documents_window: "0s"
      index:
        - "kibana_sample_data_logs"
      aggregation:
        count:
            groups:
             - "geo.src"
      metric:
        name: http/request/count
        dimensions:
          geosrc:
            path: geo.src

# Generate missing metrics by dimensions

url:
  - http://127.0.0.1:9200
gzip: true
streams:
    - id: stream-1
      delay: "150s"
      late_documents_window: "0s"
      index:
        - "kibana_sample_data_logs"
      aggregation:
        count:
            groups:
             - "geo.src"
      metric:
        name: http/request/count
        dimensions:
          geosrc:
            path: geo.src
        append_if_missing:
          value: 0
          dimensions:
            - {geosrc: "US"}
            - {geosrc: "GR"}

# Basic filtering

url:
  - http://127.0.0.1:9200
gzip: true
streams:
  - id: stream-1
    delay: "150s"
    late_documents_window: "0s"
    index:
      - "kibana_sample_data_logs"
    filters:
      - range:
          bytes:
            gte: 20000
    metric:
      name: http/request/bytes
      value_path: "bytes"
      dimensions:
        ## no doc path mapping. uses clientip
        clientip:

# Basic filtering with aggregations

url:
  - http://127.0.0.1:9200
gzip: true
streams:
  - id: stream-1
    delay: "150s"
    late_documents_window: "0s"
    index:
      - "kibana_sample_data_logs"
    filters:
      - range:
          bytes:
            gte: 20000
    aggregation:
       sum:
         field: "bytes"
         groups:
           - "clientip"
    metric:
      name: http/request/bytes
      dimensions:
        clientip:

# Validate Configuration

es-query-metrics --run-conf /path/to/config/file.yaml --validate

# Testing

Run the plugin from the command line and get any emitted metrics on stdout

  • state, the path to the directory that holds ES stream states
  • simulate, instructs the plugin to not commit any state information
es-query-metrics --run-conf /path/to/config/file.yaml --state /path/to/state/dir --simulate

# List the metrics emitted by a plugin configuration

es-query-metrics --run-conf /path/to/config/file.yaml --metrics
Last Updated: 3/31/2023, 12:34:01 PM