Build Custom JSON Parser
This section is intended for advanced programmers who want to build their own JSON parser. It describes how to build a Logstash parser for a sample device. We use the Linux device as an example throughout.
Note: This chapter is optional: you do not need to build a custom JSON parser from scratch to input logs from Logstash to NetWitness.
Major sections in this document:
- Configure a filter by defining several required pieces of metadata.
- Examine a sample log message from the Linux device
- Walk through creating the parser, based on the sample log message
- View the parsed meta from the sample log message, as it appears on the Log Decoder
Sample JSON Log Received on Log Decoder
Let's examine a sample log and discuss its contents.
<13>1 - Centos7 linux - LOGSTASH001 [lc@36807 lc.ctime="1585886465037" lc.cid="Centos7" lc.ctype="logstash"] {"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond hostname=? addr=? terminal=cron res=success'", "user":{ "email":"john.deaux@test.com", "username":"CORP\\deauxj" }, "host": { "name": "Centos7", "hostname": "Centos7", "containerized": false, "architecture": "x86_64", "id": "d1059ac783b24eb7bbde70a41fa572c9", "os": { "name": "CentOS Linux", "kernel": "3.10.0-1062.el7.x86_64", "version": "7 (Core)", "codename": "Core", "platform": "centos", "family": "redhat" } }, "@timestamp": "2020-04-03T04:01:05.037Z", "files": [ "test1.log", "test2.log", "test3.log" ],"machine_details" : { "1" : { "hostname" : "USXXLinux" }, "2" : { "hostname" : "USXXWindows" }}}
The first portion of the log is the RFC-5424 header:
<13>1 - Centos7 linux - LOGSTASH001 [lc@36807 lc.ctime="1585886465037" lc.cid="Centos7" lc.ctype="logstash"]
This header contains the information that we used in setting our fields above:
- nw_source_host: Centos7 (Hostname)
- nw_type: linux (Device Type)
- nw_msgid: LOGSTASH001 (Message ID)
The remainder of the log is the JSON Payload.
JSON Payload
{
"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond hostname=? addr=? terminal=cron res=success'",
"user": {
"email": "john.deaux@test.com",
"username": "CORP\\deauxj"
},
"host": {
"name": "Centos7",
"hostname": "Centos7",
"containerized": false,
"architecture": "x86_64",
"id": "d1059ac783b24eb7bbde70a41fa572c9",
"os": {
"name": "CentOS Linux",
"kernel": "3.10.0-1062.el7.x86_64",
"version": "7 (Core)",
"codename": "Core",
"platform": "centos",
"family": "redhat"
}
},
"@timestamp": "2020-04-03T04:01:05.037Z",
"files": [
"test1.log",
"test2.log",
"test3.log"
],
"machine_details": {
"1": { "hostname": "USXXLinux"},
"2": { "hostname": "USXXWindows"}
}
}
Create the JSON Parser for a Linux Device
Now that we have the sample log from the Linux device, we can construct a filter plugin for this device.
Initial Parser to Match Message ID and Device Type
The parser name should match the device type. We call this initial parser v20_linuxmsg.xml, which matches the message ID from the event. We set content to a variable, logstash_json_payload, which represents the JSON payload. We will parse the payload later in the process.
Message ID and Device Type Parsing
<?xml version="1.0" encoding="ISO-8859-1"?>
<DEVICEMESSAGES
name="linux"
displayname="Linux"
group="Unix">
<VERSION device="2.0"/>
<MESSAGE
id1="LOGSTASH001"
id2="LOGSTASH001"
content="<logstash_json_payload>" />
<!-- Additional logic to parse JSON payload -->
</DEVICEMESSAGES>
Map Payload Contents to Datatypes
We create datatypes to map each element from the payload to meta that can be saved to the NetWitness database.
The entire payload is assigned to the FileBeatsEvent datatype.
<VARTYPE name="logstash_json_payload" dataType="FileBeatsEvent"/>
The timestamp is parsed an assigned to the InternetTime datatype.
<DataType name="InternetTime"dateTime="%W-%M-%DT%H:%T:%S.%V%E" />
Parse the Message String
Using the parser above, we parse (using the FineParse type defined above) the message string from the log file.
"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=\"root\"
exe=\"/usr/sbin/crond\" hostname=? addr=? terminal=cron res=success'",
The following code extracts the values from the string and saves them to meta keys:
- op is saved to operation.id
- acct is saved to service.account
- res is saved to result
- exe is saved to process.src
Note that he search flag is set to true to parse key/value pars regardless of their order in the string.
Extract Values: save to Meta
<DataType name="TagValParse" regex="(?: |^)(?:exe=(\S+)|acct=(\S+)|res=(\S+)|op=(\S+))" search="true">
<Capture index="1" meta="process.src" />
<Capture index="2" meta="service.account" />
<Capture index="3" meta="result" />
<Capture index="4" meta="operation.id" />
</DataType>
The following code:
- Assigns the whole key as message and maps it to the FineParse type.
- The FineParse type is then mapped to TagValParse type.
<DataType name="FineParse" regex="msg='(.*)'">
<Capture index="1" type="TagValParse" />
</DataType>
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
<Capture key="/message" type="FineParse" meta="message"/>
<Capture key="/user/email" meta="email" />
<Capture key="/user/username" type="DomainUser"/>
<Capture key="/files/" meta="sourcefile" />
<Capture key="/machine_details//hostname" meta="host.dst"/>
</DataType>
Give the previous string and the code above, the output on the Log Decoder is as follows:
message: msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond
hostname=? addr=? terminal=cron res=success'
service.account: root
process.src: /usr/sbin/crond
result: success
operation.id: PAM:accounting
Parse an Array in JSON
In our sample JSON log file from earlier, one section contained an array object:
"files": [
"test1.log",
"test2.log",
"test3.log"
]
To fetch all the values of an array, you need to define a capture key enclosed in forward slashes to fetch all values, for example /files/.
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
<Capture key="/files/" meta="sourcefile" />
</DataType>
Using the code above on the sample array, the following would be the output on the Log Decoder:
sourcefile: test1.log
sourcefile: test2.log
sourcefile: test3.log
Parse a Nested JSON Object
Let's look at an example nested object from our sample log file from earlier:
"host": {
"name": "Centos7",
"hostname": "Centos7",
"containerized": false,
"architecture": "x86_64",
"id": "d1059ac783b24eb7bbde70a41fa572c9",
"os": {
"name": "CentOS Linux",
"kernel": "3.10.0-1062.el7.x86_64",
"version": "7 (Core)",
"codename": "Core",
"platform": "centos",
"family": "redhat"
}
}
To fetch nested values, you need to build a path that contains the keys from each nested level. For example, to fetch the OS name from our example, you use the following code:
<DataType name="ElasticCommonSchemaSubset">
<Capture key="/host/os/name"> meta="OS" />
</DataType>
Using the code above on the sample nested object, the following would be the output on the Log Decoder:
OS: CentOS Linux
Capture Data That Has Varying Parent Key
When capturing structured data types like JSON, instead of a numbered capture index, you can provide a field name path that uses the key attribute. For example, assume we want to capture the hostname from machine_details and ignore the indexed key:
"machine_details": {
"1": { "hostname": "USXXLinux"},
"2": { "hostname": "USXXWindows"}
}
To fetch the required values, which have a varying parent key name, we leave the parent key empty in the path:
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
<Capture key="/machine_details//hostname"> meta="host.dst" />
</DataType>
Using the code above on the sample, the following would be the output on the Log Decoder:
host.dst: USXXLinux
host.dst: USXXWindows
The Parsed Example Event on the Log Decoder
Assuming the sample log message from the beginning of this document, and using the parser that we have built, the image below details the event as it would appear on the Log Decoder.
The following representation of the sample log has meta values highlighted.
<13>1 - Centos7 linux - LOGSTASH001 [lc@36807 lc.ctime="1585886465037" lc.cid="Centos7" lc.ctype="logstash"] {"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond hostname=? addr=? terminal=cron res=success'", "user":{ "email":"john.deaux@test.com", "username":"CORP\\deauxj" }, "host": { "name": "Centos7", "hostname": "Centos7", "containerized": false, "architecture": "x86_64", "id": "d1059ac783b24eb7bbde70a41fa572c9", "os": { "name": "CentOS Linux", "kernel": "3.10.0-1062.el7.x86_64", "version": "7 (Core)", "codename": "Core", "platform": "centos", "family": "redhat" } }, "@timestamp": "2020-04-03T04:01:05.037Z", "files": [ "test1.log", "test2.log", "test3.log" ],"machine_details" : { "1" : { "hostname" : "USXXLinux" }, "2" : { "hostname" : "USXXWindows" }} }
Example Parser Listing
The following code represents the complete parser, including the components we built earlier in this document.
Example Parser Listing
<?xml version="1.0" encoding="ISO-8859-1"?>
<DEVICEMESSAGES
name="linux"
displayname="Linux"
group="Unix">
<VERSION device="2.0" />
<MESSAGE
id1="LOGSTASH001"
id2="LOGSTASH001"
content="<logstash_json_payload>" />
<VARTYPE name="logstash_json_payload" dataType=FileBeatsEvent"/>
<DataType name="InternetTime" dateTime="%W-%M-%DT%H:%T:%S.%V%E" />
<DataType name="CollectionTime" type="InternetTime" meta="lc.ctime"/>
<DataType name="ElasticCommonSchemaSubset" format="JSON">
<Capture key="/@timestamp"> type="CollectionTime" />
<Capture key="/host/hostname"> meta="alias.host" />
<Capture key="/host/id"> meta="hardware.id" />
<Capture key="/host/os/name"> meta="OS" />
</DataType>
<DataType name="DomainUser" regex="(?:(\w+)\\)?(\w+)">
<Capture index="0" meta="user" />
<Capture index="1" meta="domain" />
<Capture index="2" meta="username" />
</DataType>
<DataType name="TagValParse" regex="(?: |^)(?:exe=(\S+)|acct=(\S+)|res=(\S+)|op=(\S+))" search="true">
<Capture index="1" meta="process.src" />
<Capture index="2" meta="service.account" />
<Capture index="3" meta="result" />
<Capture index="4" meta="operation.id" />
</DataType>
<DataType name="FineParse" regex="msg='(.*)'">
<Capture index="1" type="TagValParse" />
</DataType>
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
<Capture key="/message" type="FineParse" meta="message"/>
<Capture key="/user/email" meta="email" />
<Capture key="/user/username" type="DomainUser"/>
<Capture key="/files/" meta="sourcefile" />
<Capture key="/machine_details//hostname" meta="host.dst" />
</DataType>
</DEVICEMESSAGES>
Deploy JSON parser
After you have built or changed a JSON parser, you need to upload it to the NetWitness Log Decoder.
- SSH to the Log Decoder system.
-
Copy the custom parser file to the following folder:
/etc/netwitness/ng/envision/etc/devices/eventsource
where eventsource is the name of the event source. You may need to create the folder if it doesn't already exist.
For example, we need to create linux folder under /etc/netwitness/ng/envision/etc/devices directory and copy the v20_linuxmsg.xml parser file to /etc/netwitness/ng/envision/etc/devices/linux directory.
- To get the new parser loaded into memory, you need to reload the parsers on the Log Decoder.
Reload Parsers from REST
From a browser, run the REST reload command by entering the following URL:
http://<logdecoder_ip>:50102/decoder/parsers?msg=reload
For example, if your Log Decoder IP address is 10.10.100.101, use the following string:
http://10.10.100.101:50102/decoder/parsers?msg=reload
If the call is successful, you should see a REST response, "The parsers have been reloaded."
Reload Parsers from NetWitness UI
You can also reload your parsers from the UI as follows.
-
In the NetWitness UI, navigate to (Admin) > Services.
The Services view is displayed.
- Select the Log Decoder to which your want to reload the parsers, and click View > Explore.
- In the left pane, navigate to decoder > parsers.
- Right-click parsers and select Properties.
-
From the drop-down menu in the Properties panel, select reload.
- Click Send.