# Elastic, Syslog, and Kafka

## Elastic

Vectra Stream creates new indices in your Elasticsearch cluster so as not to conflict with existing indices. It creates daily indices for each metadata stream to make it easy to clean up old indices from elastic based on your retention needs. The index names are as follows, where the date is in the format of year.month.day (e.g. 2018.02.19).

| **Metadata Type** | **Index Name**                    |
| ----------------- | --------------------------------- |
| DCE/RPC           | metadata\_dcerpc-\<date>          |
| DHCP              | metadata\_dhcp-\<date>            |
| DNS               | metadata\_dns-\<date>             |
| HTTP              | metadata\_httpsessioninfo-\<date> |
| iSession          | metadata\_isession-\<date>        |
| Kerberos          | metadata\_kerberos\_txn-\<date>   |
| LDAP              | metadata\_ldap-\<date>            |
| NTLM              | metadata\_ntlm-\<date>            |
| RDP               | metadata\_rdp-\<date>             |
| SMB Files         | metadata\_smbfiles-\<date>        |
| SMB Mapping       | metadata\_smbmapping-\<date>      |
| SSL/TLS           | metadata\_ssl-\<date>             |
| X509              | metadata\_x509-\<date>            |
| Beacon            | metadata\_beacon-\<date>          |
| SSH               | metadata\_ssh-\<date>             |
| SMTP              | metadata\_smtp-\<date>            |

It is important that you enable auto-indexing in your elastic cluster, otherwise elastic will fail to create the new indices as it receives the metadata. You can enable automatic indexing by modifying the `auto_create_index` in Elasticsearch. This can be enabled globally or for a specific index name as shown below:

`"action.auto_create_index": "+metadata_*"`

Once auto creation on indices is enabled, you can create index patterns in your Kibana UI if you are accessing the data through Kibana.

**Example Elasticsearch document**

```
{
    "ts": 1550784090826,
    "uid": "1hg0WNjNh5gtu6wo",
    "id": {
        "orig_h": "10.1.4.42",
        "orig_p": 34578,
        "resp_h": "192.168.7.195",
        "resp_p": 443
     },
     "method": "HEAD",
     "uri": "/",
     "request_body_len": 0,
     "response_body_len": 182,
     "status_code": 400,
     "status_msg": "Bad Request",
     "resp_mime_types": [
        "text/html"
     ],
     "request_header_count": 0,
     "response_header_count": 5,
     "orig_ip_bytes": "19",
     "resp_ip_bytes": "161",
     "orig_pkts": 1,
     "resp_pkts": 1,
     "orig_hostname": "IP-10.1.4.42",
     "resp_hostname": "test.com",
     "resp_huid": "YB82moJ4",
     "orig_sluid": "1YlUXzvv",
     "resp_sluid": "1YJUjBDb",
     "local_orig": true,
     "local_resp": true
}
```

## Syslog

Vectra Stream sends metadata over syslog (TCP or SSL). On the receiver (e.g. Splunk), create a syslog receiver on a specific port and protocol (TCP or SSL). Once created, you can input the IP address and port of the syslog receiver in the Cognito Stream setting in the UI.

**Example Syslog Output**

```
2019-02-21T20:08:06Z COGNITO_STREAM COGNITO_STREAM 1 vectra_metadata_httpsessioninfo - METADATA_HTTPSESSIONINFO [ host="192.168.0.200:8080" id.orig_h="192.168.0.252" id.orig_p="37236" id.resp_h="192.168.0.200" id.resp_p="8080" local_orig="true" local_resp="true" method="POST" orig_hostname="test-vm" orig_huid="QGaS-FLs" orig_ip_bytes="180" orig_mime_types="application/x-binary" orig_pkts="1" orig_sluid="1YJU6Kwr" request_body_len="14808" request_header_count="6" resp_hostname="test.com" resp_huid="QkUR001n" resp_ip_bytes="17" resp_pkts="1" resp_sluid="1YJUK94N" response_body_len="0" response_header_count="0" status_code="100" status_msg="Continue" ts="1550779663394" uid=".jM-Oy2n16rnew1j" uri="/inform" user_agent="test user agent" ]
```

## Kafka

Vectra Stream sends each metadata type as its own topic to the Kafka broker. The content of the topic contains key value pairs for all the attributes for the particular metadata. Users can optionally specify a user defined string that will be used as a prepend for all Kafka topic names (max of 32 characters). This allows for each identification of topics downstream

The following are the names of the topics per metadata type:

| **Metadata Type** | **Index Name**            |
| ----------------- | ------------------------- |
| DCE/RPC           | metadata\_dcerpc          |
| DHCP              | metadata\_dhcp            |
| DNS               | metadata\_dns             |
| HTTP              | metadata\_httpsessioninfo |
| iSession          | metadata\_isession        |
| Kerberos          | metadata\_kerberos\_txn   |
| LDAP              | metadata\_ldap            |
| NTLM              | metadata\_ntlm            |
| RDP               | metadata\_rdp             |
| SMB Files         | metadata\_smbfiles        |
| SMB Mapping       | metadata\_smbmapping      |
| SSL/TLS           | metadata\_ssl             |
| X509              | metadata\_x509            |
| Beacon            | metadata\_beacon          |
| SSH               | metadata\_ssh             |
| SMTP              | metadata\_smtp            |

Users can optionally specify a user defined string that will be used as a prepend for all Kafka topic names (max of 32 characters). This allows for each identification of topics downstream.

You either need to name automatic topic creation enabled in Kafka, or manually create these topics before enabling forwarding to Kafka. To enable automatic topic creation, set the `auto.create.topics.enable` attribute to true in the Kafka properties config file.

**Example Kafka Message:**

```
{
  "ts": 1550784365369,
  "uid": "5nU.dqmGh5gtu6wo",
  "id": {
    "orig_h": "192.168.51.134",
    "orig_p": 59614,
    "resp_h": "192.168.7.195",
    "resp_p": 443
  },
  "method": "HEAD",
  "uri": "/",
  "request_body_len": 0,
  "response_body_len": 182,
  "status_code": 400,
  "status_msg": "Bad Request",
  "resp_mime_types": [
    "text/html"
  ],
  "request_header_count": 0,
  "response_header_count": 5,
  "orig_ip_bytes": "19",
  "resp_ip_bytes": "161",
  "orig_pkts": 1,
  "resp_pkts": 1,
  "orig_hostname": "IP-192.168.51.134",
  "resp_hostname": "test.com",
  "resp_huid": "YB82moJ4",
  "orig_sluid": "1YJUweYM",
  "resp_sluid": "1YJUjBDb",
  "local_orig": true,
  "local_resp": true
}
```

The Kafka publisher supports SASL and TCP or SSL protocols:

![](https://4227135129-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FHJ1ltuWFvsArFWtevnRn%2Fuploads%2Fgit-blob-acda45943e662250bb293e37d39fc54b52284c2a%2Felastic-syslog-and-kafka-1.png?alt=media)
