2019-03-20 10:11 AM
We cannot figure this out as the ESPER command of 'output every n' does not work for what we are looking for. Not sure if we are going to need to create a persistent table that keeps rewriting itself.
Issue:
We have a few alerts that will trigger on multiple events and want to be alerted on it. An example would be if a single source IP is attempting to login with multiple usernames over a short period of time. We want to be able to alert on this but if it is a high velocity attack or a bad scan we could potentially be alerted 1000s of times if the attacker/tool is trying that many user. We want to be able to limit the alerts coming out of ESA based on that source IP for N period of time.
Using the output suppression built into ESA appears to stop ALL alerts from triggering which is not what we want. The reasoning of course being we want to be able to see if a new source IP starts doing the same activity with in the same time frame. We also have a unique case with the clients we host on our SIEM that if each of their sites had the same activity happening(different source IP) we would need to see those as well, yet we do not want to make hundreds of the same alerts so this needs to happening in one alert, if possible.
Example Data:
ip: 10.1.1.1 user: x1
ip: 10.1.1.1 user: x2
ip: 10.1.1.1 user: x3
ALERT on 10.1.1.1 and suppress with all 3 events shown to script/respond/email
ip: 10.1.1.1 user: x4
ip: 10.1.1.2 user: x1
ip: 10.1.1.2 user: x2
ip: 10.1.1.2 user: x3
ALERT on 10.1.1.2 and suppress with all 3 events shown to script/respond/email
Summary:
Alert on unique source IPs with multiple events and suppress unique source IP for N period of time.
Any thoughts?
2019-03-20 11:25 AM
Hey Sean,
The OUTPUT FIRST EVERY n MINUTES
fits this use case. Let's take the following events:
Event={ip_src='10.1.1.1', user_dst='user1'}
Event={ip_src='10.1.1.1', user_dst='user2'}
Event={ip_src='10.1.1.1', user_dst='user3'}
t=t.plus(2 minutes)
Event={ip_src='10.1.1.1', user_dst='user4'}
Event={ip_src='10.1.1.2', user_dst='user1'}
Event={ip_src='10.1.1.2', user_dst='user2'}
Event={ip_src='10.1.1.2', user_dst='user3'}
And the following EPL:
SELECT window(*) FROM Event(
ip_src IS NOT NULL
AND
user_dst IS NOT NULL
).std:groupwin(user_dst).std:unique(user_dst).win:time(5 min)
GROUP BY ip_src
HAVING COUNT(*) = 3
OUTPUT FIRST EVERY 5 min
The EPL above would alert when a single Source IP is seen connecting to three unique user accounts within 5 minutes, it will also take ALL three events that caused the alert to return true. It will subsequently suppress on that Source IP for 5 minutes, but continue to alert if that Source IP changes; this is because the GROUP BY and OUTPUT work in tandem together.
The following screen grab should demonstrate this. NOTE: The EPL Statements section is where I declare my EPL code, the Time And Event Sequence section is where I inject the events and move the engine time, the Scenario Results section is where I see the alerts (each Insert is an individual alert):
And here, I inject even more events, but because they are they same IP, they will not trigger:
Unless of course, that IP changes:
Cheers,
Lee
2019-03-20 11:55 AM
Looks like the big change was the SELECT window(*). I had given up on output every first every N min because it was only giving one event out of it with just SELECT *
And it seems to be working as intended, going to test a lot more to be sure with all use cases but this appears to be it.
Thanks so much!
2019-08-07 08:48 AM
Hello Lee,
Thank you for your post it was very helpful! I was trying to achieve same results in means of output and suppression, however my pattern was different. All was fine except somehow 24 hour suppression was not working properly. My pattern for exact same logic (>=3 unique ip_dst per user_src grouped 3 unique in one alert, suppression 24 hours):
For esper online:
create schema Event(ip_src string, user_dst string);
@Name('Alert')
SELECT * FROM Event(
ip_src IS NOT NULL
AND
user_dst IS NOT NULL
).std:groupwin(ip_src)
.std:unique(user_dst)
.win:time_length_batch(60 minutes, 3)
GROUP BY ip_src
HAVING count(*) >= 3;
Suppression I was trying to achieve with RSA functions (not testable in Esper online):
@Hint('reclaim_group_aged=86400,reclaim_group_freq=60')
@RSAAlert(oneInSeconds=86400, identifiers={"ip_src"})
Is my pattern wrong (using SELECT * FROM Event + .std:groupwin(ip_src).std:unique(user_dst).win:time_length_batch(60 minutes, 3)) for the same UCS is wrong or understanding of @RSAAlert(oneInSeconds=86400, identifiers={"ip_src"}) suppression is wrong?
2019-08-13 04:29 AM
Hey Nick,
I would make use of the OUTPUT term within Esper to control the rate of alerts, an example below:
@Name('Alert')
SELECT * FROM Event(
ip_src IS NOT NULL
AND
user_dst IS NOT NULL
).std:groupwin(ip_src)
.std:unique(user_dst)
.win:time_length_batch(60 minutes, 3)
GROUP BY ip_src
HAVING count(*) >= 3
OUTPUT FIRST EVERY 24 hours;
This will alert once every 24 hours per unique ip_src value, as the OUTPUT works in tandem with the GROUP BY.
Cheers,
Lee
2019-08-26 08:15 AM
Hello Lee,
Thanks for the tip, but output first + "select * from Event" will suppress all events and in output I will have only the first event from the chain (somehow even if you are using
.win:time_length_batch(60 minutes, 3)
):
Esper rule output:
Insert
Event={ip_src='10.1.1.2', user_dst='user1'}
While using "select window(*) from Event" + output first will get me 3 (or whatever I need) relevant events which will simplify analysts validation steps:
Esper rule output:
Insert
Event={ip_src='10.1.1.2', user_dst='user1'}
Event={ip_src='10.1.1.2', user_dst='user2'}
Event={ip_src='10.1.1.2', user_dst='user3'}
So your previous post has the exact pattern I was searching for, thanks again!
Regards,
Nick