SQL fetch

  • Last Updated 3/31/2023, 12:34:01 PM UTC
  • About 11 min read

Plugin info

name: sql-collector

Extracts metrics and/or events from data sources using SQL. Data sources can be predefined DSNs in the local agent's SQL gateway or inlined in the collector configuration file. The following data source types are supported:

  • PostgresQL
  • Oracle
  • MS SQL
  • MySQL

The plugin can reuse SQL statements defined under the assets/sql/ directory structure in your git repos or inlined inside the collector configuration file.

# Prerequisites

None

# Events

User defined

# Metrics

User defined

# Filtering and Aggregation

You can apply filters and aggregations on top of the base SQL used by an extractor. Filters are pushed down to the data sources by augmenting the base SQL. Aggregations are not pushed down and are calculated by the plugin using the un-aggregated record stream returned from the base SQL.

The following filter operators are available:

Operator Description
eq Equals
neq Not Equals
lt Less Than
lte Less Than or Equals
gt Greater Than
gte Greater Than or Equals
between Between
in In
not_in Not In
like Like
not_like Not Like
is_null Is Null
is_not_null Is Not Null

The following aggregations are supported:

Aggregate Function
count
sum
avg
max
min

# Run History tracking

The plugin can optionally record the time last time a query was run. To enable this behavior set the history_key parameter for a query to a unique value.

# Filters

You can apply filters to the return columns of a query. Filter expressions are evaluated by the plugin at run-time into values which are then pushed down to the base SQL as where clauses. For example:

sql: select count(errors) as num_errors from my_table
filters:
  - num_error:
      gte: 10
-- pushed down as:
select *
from (
    select count(errors) as num_errors from my_table
) as a 
where a.num_errors >= 10

Filter values can be literals (strings, numbers, times, etc) or complex expressions. Literals are converted into types compatible with the filtered column's data-type before push down. Complex expression use the syntax $[<expression>] and are evaluated by the plugin before the result is converted into a type compatible with the filtered column's data-type and pushed down to SQL. For example:

sql: select error_code, count(errors) as num_errors from my_table group by error_code
filters:
  - num_error:
      gte: "$[10 + 5]"
  - error_code:
      eq: "$['an_err_' + 'code']"
-- pushed down as:
select *
from (
    select error_code, count(errors) as num_errors from my_table group by error_code
) as a 
where a.num_errors >= 15 
and a.error_code = 'an_err_code'

# Filter Expression Variables and Functions

Filter expression become useful when combined with built-in variables and functions. This allows you to dynamically modify the collector SQL based on runtime context:

# give me all error codes since the last collection time
sql: select error_dt, error_code as num_errors from my_table
history_key: some/system/errors
filters:
  - error_dt: # expr_1
      gte: "$[default(last_run_time, subtract_duration(now(), '24h'))]"
  - error_dt: # expr_2
      lt: "$[now()]"
-- pushed down as:
select *
from (
    select error_dt, error_code as num_errors from my_table
) a
where a.error_dt >= :_result_of_expr_1
and a.error_dt < :_result_of_expr_2

# Built-in variables

Variable Name Description
last_run_time The last time this query was run if a history_key has been defined, or current time.

# Built-in Functions

The following functions are available for use in filters:

Function Description
now() Returns the current datetime
duration(string) Parses a duration string
add_duration(time datetime, duration string) add duration to time
subtract_duration(time datetime, duration string) subtract duration from time
default(value any, default_value any) returns value if non-empty or default_value

# Configuration

This section describes the configuration settings for this plugin

IMPORTANT

Sensitive data should be stored inside the agent secrets store and referenced in this file using ${secret key} notation.

Name Type Required Default Description
timeout duration string No 5s Global execution timeout for queries
local_dsns map[string]object No Defines DSNs locally scoped to this configuration file
queries []object Yes Configures the list of SQL queries to be executed

# Local DSNs

Key/value pairs of DSN name and DSN definition. The DSN is defined using the following properties:

Name Type Required Default Description
connection_str string Yes The connection string for the DSN in a format understood my the driver
driver string Yes The driver name to use for the connection. One of oracle, postgresql, postgres, mysql, mssql, sqlserver
max_conns int No 2 The maximum number of open connections to maintain with the data source

# Oracle Connection Strings

BEQ connection with ORACLE_HOME and ORACLE_SID environment variables inherited from agent:

connection_str: oracle://@

BEQ connection with ORACLE_HOME and ORACLE_SID set inside connection string:

connection_str: oracle://@(DESCRIPTION=(ADDRESS=(PROTOCOL=beq)(PROGRAM=<oracle_home_path>/bin/oracle)(ARGV0=<oracle_sid>)(ARGS='(DESCRIPTION=(LOCAL=YES)(ADDRESS=(PROTOCOL=beq)))')(ENVS='ORACLE_HOME=<oracle_home_path>,ORACLE_SID=<oracle_sid>')))

TCP connection:

connection_str: oracle://<user>:<password>@<easy connect string>

To connect as sysdba, sysoper or sysasm append the required role as a URL parameter with value 1 to the connection string:

# sysdba over BEQ
connection_str: oracle://@?sysdba=1

# sysasm over TCP
connection_str: oracle://<user>:<password>@<easy connect string>?sysasm=1

# PostgresQL Connection Strings

Provide the connection string as a URL:

# tcp connection
connection_str: postgres://<user>:<password>@<host>/<database>?<param1>=<val1>&<param2>=<val2>

# unix domain socket
connection_str: postgres:///<dbname>?host=/var/run/postgresql/

The following URL parameters are available:

  • host, The host to connect to. Values that start with / are for unix domain sockets
  • port, The port to bind to, default is 5432
  • sslmode, Whether or not to use SSL as follows
    • disable, do not use SSL
    • require, use SSL without certificate verification (default)
    • verify-ca, use SSL with certificate verification
    • verify-full, use SSL with with certificate and host verification
  • connect_timeout, maximum wait for connection, in seconds. 0 or not specified implies indefinite wait
  • sslcert, path to PEM encoded certificate to present to server
  • sslkey, path to PEM encoded SSL key
  • sslrootcert, path to PEM encoded root CA certificate to present to server

# MySQL Connection Strings

Provide the connection string as a URL:

connection_str: mysql://[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]

For more details see https://pkg.go.dev/github.com/go-sql-driver/mysql#readme-dsn-data-source-name (opens new window)

# MS SQL Connection Strings

Provide the connection string as a URL:

connection_str: sqlserver://<username>:<password>@<host>/<instance>?<param1>=<value>&<param2>=value

For supported URL parameters see https://pkg.go.dev/github.com/denisenkom/go-mssqldb (opens new window)

# Queries

A list of query configurations with the following properties:

Name Type Required Default Description
history_key string No A unique name within this configuration file to track the last runtime for the query
timeout string No Execution timeout this query. Defaults to global timeout
sql_file string No Path to SQL query file relative to assets/sql
sql string No Inline SQL statement
dsn string Yes Local DNS from local_dsns or sql gateway DSN to execute query against
source string No Overrides the generated metric or event source from the DSN name to this value
filters []object No List of filters to apply to base SQL
metrics []object No List of metrics to extract from the query
events []object No List of events to extract from the query
tags []object No List of tags to add the metrics/events generated

# Filter Configuration

A filter maps a query column to a filter operator and the right hand side operands:

# scalar operand
<column_name>:
  <operator>: <operand>
# list operands
<column_name>:
  <operator>:
    - <operand_1>
    - <operand_2>
    - <operand_n>

For example:

filters:
  - error_code:
      eq: "err_123"
  - error_code:
      in:
        - "err_123"
        - "err_456"
        - "err_789"
  - error_dt:
      between:
        - "$[subtract_duration(now(), '1h')]"
        - "$[now()]"

# Metric Configuration

Emits one metric for each result row by mapping a query column into the metric value and zero or more columns into the metric dimensions:

Name Type Required Default Description
name string Yes The name of the metric. for example some/system/num_errors
value string Yes The column to extract the value from
condition object No Conditionally emit metric based on row column values
source string No dsn The column to extract the metric source from. Default to the DSN name used by the query
timestamp string No now The column name to extract time timestamp for the metric. Default to current time
dimensions []object No Defines the dimensions for this metric
aggregation string No Aggregates the metric value by the metric dimensions. For example avg

A metric condition is specified using a column name and value over which a metric is emitted. For example, the metric configuration below will generate the metric CPU Usage Per Sec - Avg when a row has value CPU Usage Per Sec for column metric_name:

- name: oracle/system/CPU Usage Per Sec - Avg
  value: average
  condition:
    col: metric_name
    value: CPU Usage Per Sec

A metric dimension is defined by a name and a mapping to a column to extract the dimension value from, or a constant. If only the dimension name is configured then its value is extracted from the column with the same name.

# dimension "err" is extracted from column "error_code"
err:
  col: "error_code"  

# dimension "error_code" is extract from col "error_code"
error_code:

# dimension "err" is hard wired to "err-123"
err: "err-123"

Sample metric configuration:

# emit all job execution times since the last collection time
sql: select job_name, end_time, exec_seconds from my_jobs 
history_key: some/system/jobs
filters:
  - end_time:
      gte: "$[default(last_run_time, subtract_duration(now(), '24h'))]"
  - end_time:
      lt: "$[now()]"
metrics:
  - name: my_jobs/elapsed_sec
    value: exec_seconds
    timestamp: end_time
    dimensions:
      job-name:
        col: "job_name"

# emit average execution time by job over the last 24 hours
sql: select job_name, end_time, exec_seconds from my_jobs 
filters:
  - end_time:
      gte: "$[subtract_duration(now(), '24h')]"
metrics:
  - name: my_jobs/avg_elapsed_sec
    value: exec_seconds
    aggregation: avg
    dimensions:
      job-name:
        col: "job_name"

# Event Configuration

Same as metric configuration

# Tag Configuration

Enriches metrics or events with tags which you do not want to participate in the metric or event dimensions. They are configured exactly like dimensions. For example:

# emit all job execution times since the last collection time and tag with job priority
sql: select job_name, end_time, priority, exec_seconds from my_jobs 
history_key: some/system/jobs
filters:
  - end_time:
      gte: "$[default(last_run_time, subtract_duration(now(), '24h'))]"
  - end_time:
      lt: "$[now()]"
metrics:
  - name: my_jobs/elapsed_sec
    value: exec_seconds
    timestamp: end_time
    dimensions:
      job-name:
        col: "job_name"
    tags:
      priory:      

# Complete configuration Sample

queries:
  - sql_file: odi/mysql/etl_status.sql
    history_key: odi/etl/sess_times
    dsn: ODI-PROD
    filters:
      - sess_status:
          in:
            - E
            - D
            - M
            - A
      - end_dt:
          gte: "{{default(last_run_time, subtract_duration(now(), '24h'))}}"
      - end_dt:
          lt: "{{now()}}"
    metrics:
      - name: odi/etl/completed_session/elapsed_sec
        value: sess_dur_sec
        timestamp: end_dt
        dimensions:
          sess_name:
          sess_status:
  - sql_file: odi/mysql/etl_status.sql
    history_key: odi/etl/sess_faiures
    dsn: ODI-PROD
    filters:
      - sess_status:
          in:
            - E
      - end_dt:
          gte: "{{default(last_run_time, subtract_duration(now(), '24h'))}}"
      - end_dt:
          lt: "{{now()}}"
    events:
      - name: odi/etl/session_failure
        value: sess_status
        timestamp: end_dt
        dimensions:
          sess_name:
          return_code:
            col: sess_rc

# Validate Configuration

sql-collector --run-conf /path/to/config/file.yaml --validate

# Testing

Run the plugin from the command line to get emitted metrics or events on stdout

sql-collector --run-conf /path/to/config/file.yaml

# List the metrics and events emitted by a plugin configuration

sql-collector --run-conf /path/to/config/file.yaml --metrics
sql-collector --run-conf /path/to/config/file.yaml --events
Last Updated: 3/31/2023, 12:34:01 PM