NAME

dstore-dist - daemon to route/distribute protobuf messages from recursor and dnsdist

SYNOPSIS

dstore-dist [-config file] [-debug]

DESCRIPTION

dstore-dist acts as a router/distributor of the protobuf messages that are generated by recursor and dnsdist. It is configured using a YAML-based configuration file.

dstore-dist is configured with a set of destinations, which indicate all the possible destinations for a message. It is also configured with a set of routes; each route can send messages to one or more destinations, and can also be configured to perform filtering on the messages.

OPTIONS

-config file: Load configuration from file

-debug: Generate debug logging

-help: Display a helpful message and exit.

FILES

/etc/dstore-dist/dstore-dist.yml: Default location of the config file

CONFIGURATION FILE FORMAT

The following YAML fields are supported for configuration:

  • listen: The addresses that dstore-dist will listen on for new protobuf messages. The value is a list of address:port strings, in either v4 or v6 format. IPv6 addresses must be placed in square brackets like this [::1]. You can omit the address to listen on all local addresses.
listen:
    - ":2000"
    - "10.2.3.4:2001"
    - "[::1]:2000"
  • tls_listen: The addresses that dstore-dist will listen on for new protobuf messages using TLS. For each address, a tlsconfig struct consisting of the same fields as the ‘tls’ map of the kafka config (see below) can be configured. At a minimum, these must include the certificate and key to use.
tls_listen:
  - addr: ":2500"
    tlsconfig:
      cert_file: "/etc/tls/distcert.pem"
      key_file: "/etc/tls/distkey.pem"
  • http_address: The value is an address:port string, using the same format as for listen addresses.
http_address: ":8987"
  • history_num_batches: Number of batches to keep in memory for the history API. A batch is capped at 1 MB by default, see batch_buffer_size for the actual size. If this is unset or set to 0, the history API is disabled.

    The history API can be accessed at /api/history. By default this URL will print additional usage information, including how to switch to Protobuf or JSON output format, and how to filter on certain fields.

history_num_batches: 10
  • http_api_key: If set, an X-API-Key header is required to access the optional HTTP history API.
http_api_key: change-me
  • log: Configure logging
  • text: The default output is JSON, this setting will change to text-based key-value pairs
  • level: The default log level is Info. You can change this to either Debug, to generate higher levels of logging, or Warn or Error, which will generate less logging. Note that for debug logging to be generated, you will also need to use the -debug command-line flag.
  • stdout: The default output is to stderr, but you can change to stdout if preferred

Example:

log:
  level: Error
  text: true
  stdout: true
  • conn_timeout: The idle timeout for incoming connections to dstore-dist. Defaults to no timeout, i,e. only applies if explicitly configured.
  • batch_buffer_size: The size in bytes of the buffer(s) used to batch messages internally. Normally the default of 1048576 (1MB) will suffice, but when using Kafka, making this value around 90% of the value of the kafka max_msg_size is a good idea, to avoid fragmentation of the messages when sending to Kafka.
batch_buffer_size: 800000
  • ipsets: The value is a map of ipsets, each consisting of a filename and optional poll interval (which defaults to, and cannot be less than, 5s). The files contain newline separated IP prefixes (v4 or v6) which can be used for filtering events with various filters. The following gives an example of the configuration followed by an example file.
ipsets:
  foo:
    file: /etc/foo_ips.txt
    poll_interval: 10s
  bar:
    file: /etc/bar_ips.txt
127.0.0.1/16
128.243.0.0/16
# Comments beginning with # are allowed
fe80::1cc0:3e8c:119f:c2e1/18
  • destinations: The value is a map of destinations, thus each item in the map must have a unique key (the destination name), followed by the destination fields. The following are the possible destination fields:
  • type: This can be either pdns (the default), storage or kafka.

The following fields apply to any destination type:

  • blackhole: If true, all messages to this destination will be discarded. Defaults to false.
  • sample: Apply sampling to the protobuf messages, for example a value of 10 will only send one in 10 messages to the destination.
  • rate: Ratelimit protobuf messages to the specified number of messages per second. messages that exceed the ratelimit will be dropped.
  • burst: When rate limiting, allow bursts of up to this many messages.

The following fields apply only to destinations of type pdns:

  • distribute: This determines how messages are distributed between the specified addresses. Possible values are “roundrobin” (the default) or “sharded”. When “sharded” is specified, the query name is hashed to determine which address to send the message to.
  • addresses: A list of addresses in the same format as the top-level listen field.
  • framing: How to frame protobuf messages. Possible values are “16bit” (the default), “32bit” or “repeated” (which uses the protobuf repeated field type).
  • connect_timeout: How long to wait before timing out the connection, specified as a duration string. Defaults to 5s.
  • write_timeout: How long to wait before timing out writes, specified as a duration string. Defaults to 5s.
  • use_tls: Whether to use TLS to connect to the destination
  • tlsconfig: If use_tls is true, the options to use for the TLS connection, the same as for the ‘tlsconfig’ map in the kafka config, for example:
use_tls: true
tlsconfig:
  insecure_skip_verify: true

The following fields apply only to destinations of type storage:

  • storage: (storage) If type is “storage” this field must be present. The value is a map consisting of the following fields:

    • type: (Mandatory) Either “s3” or “fs”. Depending on the type, the options field is different.

    • options: (for type fs) The value is a map consisting of the following fields

      • root_path: The directory where files will be stored
    • options: (for type s3) The value is a map consisting of the following fields

      • access_key: S3 access key
      • secret_key: S3 secret key
      • access_key_file: Path to the file containing the access key
      • secret_key_file: Path to the file containing the secret key
      • secrets_refresh_interval: Time between each secrets retrieval. Minimum is 1s, and defaults to 15s.
      • region: S3 region (default: “us-east-1”)
      • bucket: Name of S3 bucket
      • create_bucket: (bool) Create bucket if it does not exist
      • global_prefix: Transparently apply a global prefix to all names before storage
      • endpoint_url: Use a custom endpoint URL, e.g. for Minio
      • tls: A map of options to use for the TLS connection, the same as for the ‘tlsconfig’ map in the kafka config
      • init_timeout: (duration) Time allowed for initialisation (default: “20s”)
    • encoding: The encoding to use for the file, one of “protobuf” (default), “json”, “bind” (referring to the bind query log format)

    • max_size: Maximum size of the file to be stored (before any compression)

    • use_compression: (bool) Whether to compress the file using gzip compression

    • num_workers: Number of workers

    • flush_interval: (duration) Time before flushing to storage if max size isn’t reached (default 300 seconds)

The following fields apply only to destinations of type kafka:

  • kafka: (kafka) If type is “kafka” this field must be present. The value is a map consisting of the following fields:
    • addresses: (Mandatory) A list of addresses in the same format as the top-level listen field.
    • topic: (Mandatory) The name of a kafka topic which will be used to store the protobuf messages.
    • sasl: If kafka is configured to use authentication, then this field value is a map of sasl configuration parameters.
      • type: The type of SASL authentication to use, one of plain (the default), scram256 or scram512
      • username: The username to use for authentication
      • password: The password to use for authentication
      • password_file: A file to read the password from. Used in preference to password if both are specified.
    • single_msgs: A boolean (default false) which indicates whether each Kafka Message contains only one protobuf message (true) or potentially multiple protobuf messages (false).
    • max_msg_size: The maximum size of a Kafka Message in bytes; dstore-dist will not send messages with a value greater than this value. Defaults to 900,000 bytes, but cannot be set lower than 65536 bytes as that is the maximum size of a single protobuf message.
    • balancer: Kafka Messages can be distributed between the available partitions. The default balancer is roundrobin, but leastbytes, fnv-1a, crc32 and murmur2 can also be specified.
    • max_attempts: The number of times that a message will be attempted to be written to Kafka. The default is 2, meaning a single retry.
    • batch_size: This specifies how many messages constitute a batch. It defaults to 10,000. The Kafka client used will wait for new messages until either the batch size is reached, or the batch_timeout is exceeded.
    • batch_timeout: The timeout before an incomplete batch is written to Kafka. Different units can be used, e.g. “10ms”, “100s” etc. Defaults to 1ms.
    • read_timeout: The timeout for reads from Kafka. Defaults to 10s.
    • write_timeout: The timeout for writes to Kafka. Defaults to 10s.
    • required_acks: How many acks are required from Kafka before a write returns a result. Can be “one” (only the primary needs to ack, and the default), “all” (the primary and all replicas need to ack), or “none”.
    • async: If true, Kafka writes never block and all responses from Kafka are ignored. Defaults to false.
    • num_workers: The number of go routines that will be started to process protobuf messages and send them to Kafka. Defaults to 2.
    • json_encode: If true, json encode the data before sending to Kafka
    • compression: The compression codec to use, one of “gzip”, “snappy”, “lz4”, or “zstd”. Defaults to no compression.
    • instance_name: If specified, a Header will be added to each Kafka Message with this value.
    • use_tls: If true, dstore-dist will attempt to use TLS to connect to Kafka
    • tlsconfig: If use_tls is true, then this field value is a map consisting of the tls configuration parameters. It is optional.
      • insecure_skip_verify: Controls whether a client verifies the server’s certificate chain and hostname. Defaults to false.
      • ca_file: Optional CA file to use (PEM).
      • ca: Optional CA to use specified as a string in PEM format.
      • add_system_ca_pool: Adds the system CA pool if private CAs are enabled, when set. Defaults to false.
      • cert_file: Optional certificate file to use (PEM).
      • cert: Optional certificate to use specified as a string in PEM format.
      • key_file: Optional key file to use (PEM).
      • key: Optional key to use specified as a string in PEM format.
      • watch_certs: If true, enables background reloading of certifcate files. Defaults to false.
      • watch_certs_poll_interval: If watch_certs is true, how often to check for changes. Defaults to 5 seconds.

The following is an example of destinations configuration:

destinations:
  # Arbitrary names that can be referred to in routes
  sample_pdns_dest_1:
    distribute: roundrobin
    sample: 10 # only distribute one out of 10 messages
    rate: 200 # rate limit to 200 msg/s
    burst: 10 # when rate limiting, allows burst size of 10 msg
    addresses:
      - "127.0.0.1:12000"
  sample_pdns_dest_2:
    type: pdns
    distribute: sharded
    addresses:
      - "127.0.0.1:12020"
      - "127.0.0.1:12021"
      - "127.0.0.1:12022"
  sample_kafka_dest_1:
    type: kafka
    kafka:
      addresses:
        - "127.0.0.1:9092"
      topic: pdns
  sample_kafka_dest_2:
    type: kafka
    kafka:
      addresses:
        - "broker:9093"
      topic: pdnssecure
      use_tls: true
  sample_pdns_dest_3:
    blackhole: true
  • routes: The value is a map of routes, thus each item in the map must have a unique key (the route name), followed by the route fields. The following are the possible route fields:
  • destinations: A list of destination names; these must match a destination specified in the top-level destinations field.
  • append_tags: A list of tags that will be added to the message for this route.
  • filters: A list of filters, all of which have to pass for the message to be sent (i.e. there is an implicit and of all the top-level filter fields. The possible filter fields are as follows:
  • not: The not filter inverts the match. The value is any filter field. For example:
not:
  is_tcp: true
not:
  and:
- tag: FOO
- tag: BAR
  • and: The and filter combines multiple filter fields with a logical AND. The value is a list comprising any filter field. For example:

    and:
      -  has_policy: true
      -  qname: foo.com
    and:
      not:
        qname: foo.com
      is_ipv4: true
      or:
        -  has_policy: false
        -  has_deviceid: true
    
  • or: The or filter combines multiple filter fields with a logical OR. The value is a list comprising any filter field. For example:

    or:
     -  has_policy: true
     -  qname: foo.com
    or:
      not:
        qname: foo.com
      is_ipv4: true
      and:
        -  has_policy: false
        -  has_deviceid: true
    
  • is_response: Boolean, true if the message is a response message (as opposed to a request message).

  • has_policy: Boolean, true if message has an appliedPolicy field in the response

  • has_tags: Boolean, true if message has any tags set

  • is_tcp: Boolean, true if the DNS query was received over TCP

  • is_udp: Boolean, true if the DNS query was received over UDP

  • is_ipv4: Boolean, true if the DNS query was received over IPv4

  • is_ipv6: Boolean, true if the DNS query was received over IPv6

  • has_deviceid: Boolean, true if the message has a deviceid field

  • has_requestorid: Boolean, true if the message has a requestorid field

  • qname: The value is a domain name; the filter matches if the query qname is an exact match (case-insensitive) for the specified domain name.

  • qname_sub: The value is a domain name; the filter matches if the query qname is a subdomain of the specified domain name. Matches are case-insensitive.

  • qtype: The value matches the query resource record type of the request. It can be specified as a string or an integer, as any string will be converted to an integer using the mapping specified in https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml. If the type is very new, you may need to use the integer version. For example the following qtypes are equivalent:

    qtype: AAAA
    qtype: 28
    
  • qtype_gte, qtype_gt, qtype_lte, qtype_lt: These perform integer comparisons on qtype, for example

    qtype_gte: A
    qtype_gte: 1 # This is equivalent to the above, since RR 'A' is 1
    qtype_lt: MX
    
  • qtype_not: This is simply the inverse of qtype

  • latency_msec: Match the latency of the DNS request

  • latency_msec_gte, latency_msec_gt, latency_msec_lte, latency_msec_lt: These perform integer comparisons on latency_msec.

  • latency_msec_not: This is simply the inverse of latency_msec

  • tag: Match a specific tag, for example to match all messages with the “gambling” tag:

    tag: gambling
    
  • tag_prefix: Match the start of a tag, for example to match all messages with a tag beginning with “social”:

    tag_prefix: social
    
  • from_in_ipset: Match all messages where the ‘from’ IP is in a range contained in the named IP set. For example to match all messages with a source IP in the range specified by the ipset “foo”:

    from_in_ipset: foo
    
  • reqsubnet_in_ipset: Match all messages where the ‘origRequestedSubnet’ IP is in a range contained in the named IP set. For example:

    reqsubnet_in_ipset: foo
    

The following is an example of routes configuration:

routes:
  filtered_route_1:
    destinations:
      - sample_pdns_dest_1
      - sample_kafka_dest_1
      - sample_pdns_dest_3

    filters:
      # All of these checks have to pass
      - is_response: true
      - has_deviceid: true
      - tag: REQUIRED_TAG
      - or:
        - tag: GAMBLING
        - tag: FASHION
      - not:
          and:
            - tag: FOO
            - tag: BAR

  route_2:
    destinations:
      - sample_pdns_dest_2
    filters:
      - is_response: true

The following is a valid simple configuration file:

http_address: ":8987"
listen:
  - ":2000"

destinations:
  filtered:
    addresses:
      - "127.0.0.1:12010"
      - "127.0.0.1:12002"
  kafka:
    type: kafka
    kafka:
      addresses:
        - "127.0.0.1:9092"
      topic: pdns

# Routing and filtering.
routes:
  filtered_example:
    destinations:
      - filtered

    filters:
      - is_response: true
      - tag: REQUIRED_TAG
      - or:
        - tag: GAMBLING
        - tag: FASHION

  kafka_example:
    destinations:
      - kafka

Network Protocol/Encoding

When sending messages to destinations, the protocol/encoding used is slightly different depending on the destination.

For destinations of type “pdns”, dstore-dist will send messages over a TCP stream as serialised protobuf messages preceded by a 16-bit network-byte-order length. The destination does not send any responses.

For destinations of type “kafka”, the protocol used is the Kafka protocol, and each Kafka Message is encoded as follows:

  • Key: This can be either “mm” or “sm”.

    • If the Key is “mm” then each Kafka Message contains potentially multiple protobuf messages, which are encoded as repeated Protobuf fields.
    • If the Key is “sm” then each Kafka Message contains only one protobuf message.
  • Value: The protobuf message(s), encoded as described above.

  • Headers: The following Kafka headers are set:

    • msgType: DNSMessage