ELK streams
- Last Updated 3/31/2023, 12:34:01 PM UTC
- About 9 min read
Plugin info
name: es-query-metrics
Streams metrics from Elasticsearch
queries. Because documents are not indexed in real-time, there exists a delay between the time that they are generated and the time before they can appear in index searches. To overcome this, the plugin maintains a per stream state so that at each run it searches for documents within overlapping time windows relative to the previous runs. Since these time windows are overlapping, the plugin de-duplicates documents seen during previous runs. The size and offset of the search window is controlled through these parameters:
delay
, an estimate for the amount of time the index is behind current time. Each time the plugin runs the search window is offset by this amount. This allows you to not search for documents in a time period that is bound to be volatile - i.e. you are expecting that more documents will arrive in this time period.late_documents_window
, how far back in time to look for documents that may have not been indexed during previous runs of this plugin. This will cause the plugin to buffer document_id
s processed for this window of time. Obviously this will increase the memory and disk space requirements proportionally to the number of documents in the window.
Note, if the end time (
now - delay - late_documents_window
) for the search window is after the start time of the last run's search window, then the plugin will extend the end time to the start time of the last run's start time.
# Scraping requirements
- Must have a timestamp field (default
@timestamp
) - Must have at least one tiebreaker field. When scrolling through large result sets, these fields are used to uniquely identify between documents with the same timestamp. For indexes populated by
filebeat
s, the fieldslog.path, log.offset
are good candidates but you should ensure that its value is always populated. Additionally the tiebreaker fields cannot be of typetext
. If your documents do not contain any fields that can be used as tiebreakers, you can create one by cloning the document_id
field into a new field using aset
ingest processor (opens new window). - All above fields must be searchable (indexed), sortable and their type not
text
- You cannot use the metadata field
_id
as a tiebreaker
# Commands
Is a document filed indexed (searchable and sortable)? execute the below query and check that your field is part of the index mappings and
doc_value
,index
fields are not set tofalse
GET /<index_name>/_mapping
Create an
_id
clone fieldPUT /<index_name>-*/_mapping { "properties": { "_id_clone": { "type": "keyword" } } }
Create a pipeline to clone
_id
into_id_clone
PUT _ingest/pipeline/set_id_clone { "description": "clones _id into _id_clone", "processors": [ { "set": { "field": "_id_clone", "value": "{{_id}}" } } ] }
List index settings
GET /<index_name>-*/_settings
Set the
set_id_clone
pipeline as the index default pipelinePUT /<index_name>-*/_settings { "default_pipeline": "set_id_clone" }
# Prerequisites
- Elasticsearch version
7.x
- It is important that the field values stored in ES indexes have data types that are compatible with your query filters and metric values that you emit. That is, you cannot expect a
string
value in an index to work correctly with a numeric query filter or to be automatically converted to a number when aggregating. For more details see:
# Events
None
# Metrics
User defined
# Configuration
The plugin is configured with a yaml
file. This section describes the configuration settings.
Important Sensitive data should be stored inside the agent secrets store and referenced in this file using notation
${secret key}
. For examplehttps://${my-api/user}:${my-api/passwd}@127.0.0.1:9200/v1
.
url
(Required, supports secret references)
List of one or more ES cluster URLs. For examplehttps://127.0.0.1:9200
.gzip
(Optional, defaultfalse
)
Instructs ES togzip
responsesusername
(Optional, supports secret references)
If using HTTPbasic-auth
set this to the usernamepassword
(Optional, supports secret references)
If using HTTPbasic-auth
set this to the passwordtls_ca_path
(Optional)
The path to the server TLS CA root certificate path if using self signed certstls_cert_path
(Optional)
The path to the client certificate path if using mTLStls_key_path
(Optional)
The path to the client key path if using mTLStls_key_passphrase
(Optional, supports secret references)
The passphrase for TLS key intls_key_path
tls_insecure_skip_verify
(Optional defaultfalse
)
Set to true if server TLS cert should always be trustedstreams
(Required)
List of stream configuration for the streams to pull from ES. See next section for details
# Stream Settings
A metric stream is controlled with the following parameters
id
(Required)
The unique id for this stream across all plugin configurations. The system will not verify this for you and will not warn you! Allows any characters except/?*:;,{}\
. Max length is 128.timestamp_field
(Optional, default@timestamp
)
The document field to use as the timestamp for the document.delay
(Optional, default3m
)
The amount of time the stream is behind current time. Each time the plugin runs it does not consider documents withtimestamp
betweennow
anddelay
.late_documents_window
(Optional, default30m
)
How far back in time to look for documents that may have not been indexed during previous runs of this plugin. This will cause the plugin to buffer document_id
s processed for this window of time. Obviously this will increase the memory and disk space requirements proportionally to the number of documents in the window.index
(Required)
List of index names or wildcards to stream from. For examplekibana_sample_data_logs
,kibana_sample_data_*
. Only open indexes are searched.scroll_size
(Optional, default1000
)
The maximum number of hits to fetch in ES scroll batch.filters
(Optional)
List of filters in ES Query DSL format defining the qualifying stream documents. The resulting query willAND
the supplied filters. See the ES query DSL reference guide (opens new window) for how to construct these filters.Examples:
All requests with
bytes
between10,000
and15,000
filters: - range: bytes: gte: 10000 lt: 15000
All requests from
clientip
69.141.33.21
filters: - term: clientip: value: "69.141.33.21"
All requests from
clientip
69.141.33.21
AND
bytes
between10,000
and15,000
filters: - term: clientip: value: "69.141.33.21" - range: bytes: gte: 10000 lt: 15000
aggregation
(Optional)
By default each document from the ES stream will produce one metric. Ifaggregation
is configured then the documents in the stream will be aggregated and one or more events will be produced based on the configured groups. The available aggregators arecount
,sum
,avg
,max
.count|sum|avg|max
(Required)field
(Required) the field name to aggregate its valuesgroups
(Optional) list of field name to aggregate by
metric
(Required)
Defines the metric to produce from the stream documentsname
(Required) the name of the metric. Examplehttp/logs/total_bytes
value_path
(Optional) the document field to use as the value for the metric. If aggregation is used, this setting is not used.source_path
(Optional) the document field to use as the source value for the metric. If empty thehostname
where this plugin in running will be useddatatype
(Optional) allows for type conversion of value from elasticsearch datatype. Available options:number
,string
,bool
dimensions
(Optional) the dimensions for the metric. A list of dimension names with optional ES document paths[string]
(Required) the name for the dimensionpath
(Optional) the ES document path to read the dimension value from. If not provided or empty then dimension name is used
append_if_missing
(Optional) appends missing metrics to stream. Allowed only whenaggregation
is usedvalue
(Required) the metric value for the generated metricsdimensions
(Required) list of metric dimensions by name to append if missing stream
# Examples
# Multiple streams
url:
- http://127.0.0.1:9200
gzip: true
streams:
- id: stream-1
timestamp_field: "@timestamp"
delay: "150s"
late_documents_window: 480h
index:
- "kibana_sample_data_logs"
filters:
- range:
bytes:
gte: 10000
- terms:
geo.src: [GR, IN, MG]
metric:
name: http/request/bytes
value_path: "bytes"
dimensions:
geosrc:
path: geo.src
clientip:
- id: stream-2
delay: "150s"
late_documents_window: 480h
index:
- "kibana_sample_data_logs"
filters:
- term:
clientip:
value: 69.141.33.21
aggregation:
sum:
field: bytes
groups:
- clientip
metric:
name: http/request/total_bytes
value_path: "bytes"
dimensions:
clientip:
# Aggregation
url:
- http://127.0.0.1:9200
gzip: true
streams:
- id: stream-1
delay: "150s"
late_documents_window: "0s"
index:
- "kibana_sample_data_logs"
aggregation:
count:
groups:
- "geo.src"
metric:
name: http/request/count
dimensions:
geosrc:
path: geo.src
# Generate missing metrics by dimensions
url:
- http://127.0.0.1:9200
gzip: true
streams:
- id: stream-1
delay: "150s"
late_documents_window: "0s"
index:
- "kibana_sample_data_logs"
aggregation:
count:
groups:
- "geo.src"
metric:
name: http/request/count
dimensions:
geosrc:
path: geo.src
append_if_missing:
value: 0
dimensions:
- {geosrc: "US"}
- {geosrc: "GR"}
# Basic filtering
url:
- http://127.0.0.1:9200
gzip: true
streams:
- id: stream-1
delay: "150s"
late_documents_window: "0s"
index:
- "kibana_sample_data_logs"
filters:
- range:
bytes:
gte: 20000
metric:
name: http/request/bytes
value_path: "bytes"
dimensions:
## no doc path mapping. uses clientip
clientip:
# Basic filtering with aggregations
url:
- http://127.0.0.1:9200
gzip: true
streams:
- id: stream-1
delay: "150s"
late_documents_window: "0s"
index:
- "kibana_sample_data_logs"
filters:
- range:
bytes:
gte: 20000
aggregation:
sum:
field: "bytes"
groups:
- "clientip"
metric:
name: http/request/bytes
dimensions:
clientip:
# Validate Configuration
es-query-metrics --run-conf /path/to/config/file.yaml --validate
# Testing
Run the plugin from the command line and get any emitted metrics on stdout
state
, the path to the directory that holds ES stream statessimulate
, instructs the plugin to not commit any state information
es-query-metrics --run-conf /path/to/config/file.yaml --state /path/to/state/dir --simulate
# List the metrics emitted by a plugin configuration
es-query-metrics --run-conf /path/to/config/file.yaml --metrics