When working with RSA Live ESA or the ESA Rule Builder, you should not need to know the EPL syntax used within the rules. However, if your use case exceeds either of these capabilities, you should become familiar with at least the basics of the EsperTech EPL language used with ESA.
Note: NetWitness Platform 11.3 uses Esper 7.1. Earlier releases use Esper 5.3.
Inefficiently written EPL rules can have a detrimental impact on how the ESA appliance functions. Therefore, it is important to write effective EPL rules. See the following topics in the Alerting Using ESA Guide for details:
This document contains hints, use cases, and FAQs related to developing statements using the Esper event processing language (EPL) with NetWitness ESA.
If you use any custom Esper Java libraries, and are having issues with accessing them in the NetWitness Platform, see the following Knowledge Base article that applies to your version of NetWitness:
In ESA rules that do not select every piece of meta from the session (that is, rules that do not use select *
), you may see that data privacy (if enabled) and the Pivot to Investigate > Navigate link accessed from a context tooltip in the Respond Incident Details view does not work. For details on how to fix this, see the "Update any ESA Rule that Selects Only Certain Meta Keys from the Session to Include event_source_id" section in the Alerting with ESA Correlation Rules User Guide.
The following table outlines some areas where you can make performance optimizations to the most common statement syntax.
Recommendation | Example Rule | Esper Documentation |
Use group by for aggregation |
Successful Logins |
Use only group by for aggregation. Avoid use of the view for std:groupwin Documentation links:
Use hint with view .std:groupwin |
HTTP Uploads - Office Documents Volume – High |
If you must use the view std:groupwin for aggregation, add a Hint to reclaim groups. If you are also using a time batch window view, the time in seconds should double the time window value. Documentation Links:
Avoid the creation of many grouping windows |
Excessive upload to File Cloud Services – High |
Avoid creating many grouping windows (e.g., hundreds of thousands) over a long period if many unique values are expected. |
statements- Avoid |
- |
Avoid alerts or general filter conditions that could cause hundreds of thousands of events to be added to a window. You could use the filter on the Investigation over the proposed time window to gauge the level of events that may match. For health and wellness type alerts, see: Health and Wellness Alarms View and Health and Wellness Monitoring View. |
Write strict MATCH RECOGNIZE patterns |
Account Created And Deleted In Short Period Of Time |
Avoid using quantifiers of + or * within MATCH RECOGNIZE statements if possible. The system will need to track fewer states. Consider writing a PATTERN-based statement if you need to use a loose pattern based on requirements. Documentation Links:
Suppress Duplicate Matches |
Detecting Dyreza infections |
Use every-distinct for pattern matches or @SuppressOverlappingMatches annotation.
Use output rate limiting; |
Sources Communicating With Known Bad Hosts – High |
Use output rate limiting such as OUTPUT FIRST EVERY X MINUTES to suppress generation of alerts if use case allows. This saves database space.
Keep in mind that .win:time() behaves differently than .win:time_length_batch Events are not removed from the window after logic is triggered and may create additional alerts with the same events. There are different ways to control generation of alerts within ESA. ‘output first every x minutes’ only alerts on the first set of events that match the statement. Subsequent alerts during that suppression timeframe will not be stored. |
Add functions such as .toLowerCase to meta keys only as needed |
Using functions adds processing overhead: use them only when needed. For example, you do not need case-insensitive matching for meta keys storing IP addresses. Also, there are keys such as device.type with known, static case, where using a function would not produce different results. |
For more performance tips, see the following Esper Reference documentation:
Consider the following statements: one using groupwin, and the other without groupwin.
Using view groupwin:
Without view groupwin:
The second statement (without groupwin) works the same as the first, but uses less memory.
The @Hint("reclaim_group_aged=age_in_seconds") hint instructs the engine to discard an aggregation state that has not been updated for age_in_seconds seconds. The age_in_seconds value should match the time window added to the statement.
SELECT * FROM Event( medium = 32 |
The pattern defined for the match recognize statement should be strict. This means you should try to eliminate repetition operators such as + or *.
SELECT * FROM Event( medium = 32 AND ).win:time(7200 seconds) match_recognize ( partition by ); |
SELECT * FROM Event( medium = 32 AND ).win:time(7200 seconds) match_recognize ( partition by ); |
In the following statement, a new thread is created for every a event. This means multiple a events will match with the same b event. This could result in an unexpected and undesirable number of alerts for the same user during the time window.
SELECT * FROM PATTERN [ every a = AND host_dst = 'icanhazip.com') |
Instead, we recommend using the hint @SuppressOverlappingMatches with the PATTERN syntax using every.
SELECT * FROM PATTERN @SuppressOverlappingMatches [ every AND host_dst = 'icanhazip.com') |
If a rule is triggering frequently, you may only want to store the first occurrence within a time period per unique meta value. This means the alerts are not stored and taking up space in the database. This is separate from notification suppression, which can be done within the UI and does not influence alert storage.
The alerts below are suppressed per user_dst using output first every syntax. Only the first alert within the 60 minute time window will be stored in the database and alerted. Allow constituent events to be retained within the alert by using window aggregation window(*). The result without window aggregation would be only the first of the 20 events per user_dst. If you do not need to maintain all events for analysis, then use select * instead of window aggregation.
@Hint('reclaim_group_aged=300') SELECT * FROM Event( medium = 32 |
SELECT window(*) FROM Event( medium = 32 |
The solutions are explained through a series of comments within the solution boxes. Comments are surrounded by /* */.
Statement Summary:
Alert after receiving 10 different IDS events from the same source within 10 minutes, but only if within those 10 minutes, we do not see a TCP RST sent from the destination IP.
This is an example of correlating packet and log data. Our F5's will do a TCP RST on the inbound web requests for unknown paths, so in this instance, we only want to be alerted when a source receives 10 unique attacks to a single destination and that destination has not responded to the web requests.
/* Required annotation to trigger an alert with the advanced statement */ @RSAAlert /* Instruct the engine remove duplicate matches to the first ‘a’ event */ SELECT * FROM pattern @SuppressOverlappingMatches /* Intrushield policy event */ every a=Event ( /* The 10 minute time window following the first event */ timer:interval(600 seconds) AND /* 9 more Instrushield policy violations each with a unique policy_name and the same ip_src and ip_dst. */ [9] b= Event ( /* Both the statement for event b and event c must evaluate to true for the syntax to match. In other words, no TCP RST can occur to match the pattern. */ AND NOT /* Ensure all b events for unique policy names between them. */ where b.distinctOf(i => i.policy_name).countOf() = 9; |
Statement Summary:
Correlate 3 events that populate the same ip_dst and occur within 30 minutes of each other, in any order.
For details on inner joins, see the following Esper Reference documentation:
/* /* Create a window to store the IPS, nonstandard traffic and ECAT alerts */ @Name('create') /* Insert into the window the IPS, nonstandard traffic and ECAT alerts */ @Name('insert') |
/* Alert to the combination of all three events: IPS, nonstandard traffic and ECAT alerts */ @RSAAlert INSERT INTO HttpsIntrusionTrigger /* Delete all events from the joined window that caused the alert so they won't be reused */ @Name('delete') |
Statement Summary:
We want a rule to only fire if the event occurs within business hours.
For details on inner joins, see the following Esper Reference documentation:
Define non-working hours:
create context NotWorkingHours start (0, 18, *, *, *) end (0, 9, *, *, *); context |
Solution: Added parenthesis around the second and third events to require both the 2nd and 3rd events to occur after the 1st event.
For details, see the following Esper Reference documentation:
In the following example, we:
SELECT * from pattern @SuppressOverlappingMatches [ every e1=Event(device_class IN ('Windows Hosts') AND reference_id IN ('4720', '4722', '4724','4738') ) -> ( timer:interval(1 Seconds) AND e2=Event(event_cat_name LIKE 'Config.Changes%' AND user_dst=e1.user_src) -> timer:interval(1 Seconds) AND e3=Event(device_class IN ('Windows Hosts') AND reference_id IN ('4726') AND user_src=e2.user_dst) ) ] |
The NetWitness Platform has an Enrichments feature. After you define an enrichment, you can use it within a rule condition using the Rule Builder or through an Advanced rule. Below is a sample rule that is a part of the base ESA installation. This demonstrates using the Rule Builder to whitelist German sources.
You can create a Named Window to store IPs and update the window based on matching filter criteria. Only trigger if a second event occurs and the IP is on the watchlist. The user is only kept on the watchlist for 15 minutes. User can delete from a named window based on a triggering event.
For details, see the following Esper Reference documentation:
create window WatchListIPs.win:time(15 min) (ip_src string); insert into WatchListIPs select ip_src from Event(category LIKE '%scan%'); @RSAAlert select * from Event(category LIKE '%malicious%') WHERE ip_src in (SELECT ip_src from WatchListIPs ); |
This section describes how to compute percentages, rations, averages, counts, min, and max within a given time window.
Note: Computations over a large number of events and/or time periods are performance- and memory-intensive. Use caution when deploying such rules.
One way is to use named windows. However, this stores events in memory, which may cause issues if storing over a long period of time or a large number of events.
CREATE WINDOW SizePerIP.win:length(100) (ip_src string,size long); INSERT INTO SizePerIP SELECT ip_src AS ip_src, sum(size) AS size FROM Event.win:time_batch(1 minute) GROUP BY ip_src; @RSAAlert(oneInSeconds=0) |
Using a non-overlapping context does not retain events in memory, and thus is the preferred solution.
/* Create a non-overlapping context to store data by second */ create context Per Second start @now end after 1 second; /* Sum session size per second */ context PerSecond insert into OneSecondBucket select /* Alert if the total size for one second within an hour is two times greater than average */ @RSAAlert |
Question: What Regex filters are supported?
Answer: See: The 'regexp' Keyword (Esper 7.1) or The 'regexp' Keyword (Esper 5.3).
The rexexp function matches the entire region against the pattern via java.util.regex.Matcher.matches() method. For more details, consult the Java API documentation or refer to http://www.regular-expressions.info/refflavors.htmlRegular Expression Flavors.
Question: What is the difference between the count vs. time length batch?
Answer: Consider the following example.
SELECT * FROM Event(filter_criteria) .win:time_batch(1 minute) GROUP BY ip_src HAVING count(*) > 10; |
When the time window of 1 minute is reached, the system outputs everything that matches an ip_src.
The HAVING count clause instructs the engine to only output after the time window if the count of events is greater than 10.
The GROUP BY ip_src aggregation clause instructs the count to apply to only a single ip_src, rather than across all ip_src values that match the filter criteria.
Question: Can the parameters of ESA Live rules be changed?
Answer: You can't change the ESA rules that are provided on Live. However, you can view a rule's syntax, copy it to a new rule, and use this source to tune the rule for your needs.
Question: How can I export alerts from the ESA Mongo Database?
Answer: You can use the following command.
mongoexport -d respond-server --authenticationDatabase admin -u deploy_admin -p netwitness -c alert --out alerts_by_rule_time.csv
Question: I have few ESA Rules where I filter many source IP addresses. I want to create one List and use it List into all my ESA Rules. Can I do it? How?
Answer: From 11.1 onwards, you can use Context Hub Lists in ESA Rules. Details are provided in the Configure Context Hub List as an Enrichment Source topic.
Question: How I can exclude a subnet in a ESA Rule? Example: ip_src IS NOT ""
Answer: Use ip_src NOT REGEXP “10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}”
Question: How can I specify an IP range in a list to be used in an ESA Rule?
Answer: This has been answered in a blog post on: Interpreting Regex for IP range. The contents of that blog post are reproduced here:
{1,3} represents 3 digit number
[0-9] represents range number starting from 0 to 9
[0-9]{1,3} represents 3 digit number where each digit starts from 0 to 9.
Example: 000,001,002,....,997,998,999
Let me start below regex. I have highlighted the last 2 octets in red color and the first two octets highlighted in black color.
Thus, this example regex cover the following Private IP ranges:
RSA NetWitness supports multi-valued meta keys. A multi-valued meta key is the meta field with the array type.
Note the following:
To see if a particular key is typed as multi-valued, go to Configure > ESA Rules > Settings > Meta Key References, then filter by String Array:
For more details about strings and string arrays in ESA, see the ESA Configuration Guide.
To use standard Esper syntax within EPL, you could convert the ESA Vector to an array. This is especially helpful when working with lambda expressions or contained event selection.
Note: This functionality is supported on RSA NetWitness versions 11.3 and higher.
Available functions:
(asStringArray(alias_host)).anyOf(v => v = 'bad.malwaredomain.com')
You can do an exact match for values in mutli-valued meta keys:
Use ANY(meta) if you want at least one of the values in the meta key to match the value:
@RSAAlert SELECT * FROM Event (
‘Bo’ = ANY(username) OR ‘Joe’ = ANY(username)
Use ALL(meta) if you want every value within the meta key to exactly match:
@RSAAlert SELECT * FROM Event (
‘Bo’ = ALL(username) OR ‘Joe’ = ALL(username)
For ESA version 10.6 and later, the following custom functions are available:
isOneOfIgnoreCase: triggers when at least of one the lower-case list of values are contained within the values of the meta key, no matter the case.
@RSAAlert SELECT * from Event(
So, for example, if the username meta key contains the value Beta, the alert is triggered.
isNotOneOfIgnoreCase: triggers if none of the lower-case list of values are contained within the values of the meta key, no matter the case.
@RSAAlert SELECT * from Event(
For example, if the username meta key does not contain alpha, beta or gamma—no matter the case for any of those strings—the alert is triggered. So, if the username meta key contains Alpha or BETA or gAmmA, the alert is not triggered.
RSA NetWitness contains custom Esper functions developed by RSA that you can use for fuzzy matching.
matchLike: Performs a LIKE comparison against multi-valued meta keys.
@RSAAlert SELECT * from Event(
matchLike(alias_host, '%www.xn-%')
The above example triggers if any of the values in the alias_host meta key contain a sub-string of www.xn-
matchRegex: Performs regular expression comparison against multi-valued meta keys.
@RSAAlert SELECT * from Event(
matchRegex(alias_host, '.*www.xn-.*')
The above example functions the same as the matchLike example: it triggers if any of the values in the alias_host meta key contain a sub-string of www.xn-
Intersection is an expression comparing two multi-valued meta keys and returning true if there is a least one element common between them.
RSA NetWitness includes a custom intersection function. Additionally, you can use other Java-supported syntax, such as getIntersection(alias_host, e1.alias_host).contains("abc")
/* Statement: FireEye WebMPS Exploit.Kit */
e1=Event(policy_name .toLowerCase() LIKE 'exploit.kit%' A
AND user_agent .toLowerCase() NOT IN ( 'sam proxy check' )
/* Statement: followed by Proxy Log NOT 403 */
e2=Event(device_type .toLowerCase() IN ( 'mcafeewg' )
AND result_code .toLowerCase() NOT IN ( '403' )
AND ip_src=e1.ip_src AND getIntersection(url, e1.url).size() > 0)
where time:within(60 Minutes)
The following examples illustrate various concepts to consider and use when you write or extend ESA Rules in the RSA NetWitness Platform.
If you need to aggregate individual values stored within a mutli-valued meta key, you should use contained event selection.
/* |
Load previously created and enabled context hub lists with policy names |
*/ |
@UsesEnrichment(name="policy1") |
@UsesEnrichment(name="policy2") |
/* |
The multi-valued meta for email_src will be broken into individual strings and the type must be defined |
*/ |
CREATE SCHEMA EmailContainer(email_str string); |
/* |
Create the window to hold the policy names by type (policy1 or policy2) |
*/ |
@Name('Create') |
CREATE WINDOW PolicyViolations.win:time(1 hour) (policy1 string, policy2 string, policy_name string, email_str string); |
/* |
Insert into the window the policy names, email source string and whether the event matches to policy1 or policy 2 |
*/ |
@Name('Filter') |
ON Event ( |
device_type = 'symantecdlp' AND |
email_src IS NOT NULL AND |
( |
EXISTS (SELECT * FROM policy1 WHERE (LIST = Event.policy_name)) |
OR EXISTS (SELECT * FROM policy2 WHERE (LIST = Event.policy_name)) |
) |
) [select policy_name, email_str from asStringArray(email_src)@type(EmailContainer)] as e |
INSERT INTO PolicyViolations select policy_name, "policy1" as policy1, NULL as policy2, email_str where EXISTS (SELECT * FROM policy1 WHERE (LIST = e.policy_name)) |
INSERT INTO PolicyViolations select policy_name, NULL as policy1, "policy2" as policy2, email_str where EXISTS (SELECT * FROM policy2 WHERE (LIST = e.policy_name)); |
/* |
Alert when policy1 violations are at least 1 and policy2 violations with at least 3 |
*/ |
@RSAAlert |
@Name('Alert') |
INSERT INTO PolicyAlert |
FROM PolicyViolations as pv |
GROUP BY email_str |
HAVING COUNT(policy1) >= 1 AND |
COUNT(policy2) >= 3; |
/* |
Delete all events from the window that caused the alert so they won't be reused in subsequent alerts |
*/ |
@Name('Delete') |
on PolicyAlert as pa delete from PolicyViolations as pv where pa.email_str =pv.email_str; |
The above example shows the individual string for the email_src meta key being split into a named window. The filter criteria for matching policy_name to either of the Context Hub lists for policy1 and policy2 must be met before the insertion into the named window occurs. On the alert statement, if at least one event matches policy1 and 3 or more events match policy2 for the individual email_str, then an alert is generated.
The contained event selection statement uses a custom RSA function of asStringArray() since multi-valued meta in ESA is typed as a vector. It must first be converted to an array in order to use this syntax.
Note: The custom function is only available in RSA NetWitness 11.3 and higher.
For details on contained-event selection, , see the following Esper Reference documentation:
For more details on using Context Hub Lists in ESA Rules, see Context Hub Lists in ESA Rules.
If you need a timed, data window over a long time frame, it is better, in terms of memory usage, to use a named window instead.
The following example contains rewritten code for a rule that stored 12 hours of events in memory, to use instead a named data window instead.
If you need to hold a lot of events over a long time period, you can conserve memory usage by using a named window instead of a data window. The following example creates a window and stores email_src from the event stream matching the filter criteria for 12 hours. The example uses syntax for contained event selection, and splits the multi-valued meta key email_src into individual strings for analysis within the named window.
/* |
The multi-valued meta for email_src will be broken into individual strings and the type must be defined |
*/ |
CREATE SCHEMA PersonalEmailContainer(email_str string); |
/* |
Create the window to hold the email addresses violating the policy |
*/ |
@Name('Create') |
CREATE WINDOW PersonalEmail.win:time(12 hours) (email_str string); |
/* |
Create the window to hold the email addresses |
*/ |
@Name('Filter') |
INSERT INTO PersonalEmail |
SELECT * FROM Event ( |
device_type = 'symantecdlp' AND |
policy_name = 'personal_email_smtp_monitor' AND |
(isNotOneOfIgnoreCase(email_src,{ 'postmaster@abc.com', 'mail delivery subsystem', 'donotreply@abc.com' })) |
) [asStringArray(email_src)@type(PersonalEmailContainer)] ; |
/* |
Alert when the email address is reported at least 10 times within a 12 hour period |
*/ |
@RSAAlert |
@Name('Alert') |
INSERT INTO PersonalEmailAlert |
FROM PersonalEmail as pe |
GROUP BY email_str |
HAVING COUNT(*) >= 10 |
/* |
Delete all events from the window that caused the alert so they won't be reused in subsequent alerts |
*/ |
@Name('Delete') |
on PersonalEmailAlert as pe delete from PersonalEmail as pe where pe.email_str =pe.email_str; |
This rule filters for internal log events from Intrushield and Sourcefire event sources, over ports 80, 443 or 8080, that are non-informational. After the events are filtered from the stream, they are stored in a data window for 15 minutes and aggregated by ip_src. When there is a count of 5 or more unique msg_ids across a single ip_src, then an alert is generated.
Alerts are suppressed with the syntax output first every 12 hours. This means that only the first alert per ip_src is generated within the specified, 12-hour time period. The syntax for SELECT window(*) means to select all events within the data window:; otherwise, only the last event within the window will be returned.
@RSAAlert |
SELECT window(*) FROM |
Event ( |
device_type IN ('intrushield', 'sourcefire') AND |
direction = 'lateral' AND |
msg_id IS NOT NULL AND |
severity != 'Informational' AND |
(ip_dstport in (80, 443, 8080) OR ip_srcport in (80, 443, 8080)) |
).win:time_batch(15 minutes) |
GROUP BY ip_src |
HAVING count(distinct msg_id) >= 5 |
OUTPUT first every 12 hours; |
Documentation Links:
This alert is very general, which means that depending on the amount of traffic, there could be a performance impact. If so, adding to a report or report alert may be a better option. The view for unique seems unnecessary, because only the first event per ip_src is returned over the 24-hour period.
The following example removes the view from the alert.
SELECT * FROM Event ( |
device_type = 'bitsight' |
) |
GROUP BY ip_src |
OUTPUT first every 24 hours; |
The custom function isOneOfIgnoreCase removes case sensitivity from a list of values.
In the following example, we have the following structures:
The following code performs a case-insensitive comparison of the values in the array and the list. It is selecting from the Context Hub list, mylist, and returning values from the list that match with values in alias_host. If the value does not exist (that is, it is not whitelisted), then the alert may be generated.
SELECT * FROM mylist |
where isOneOfIgnoreCase(Event.alias_host,{LIST} |
) |
Table of Contents > Content Development > Procedures > ESA > ESA Rule Writing Best Practices