Files
siem-integrations/alert.py

81 lines
2.8 KiB
Python
Raw Normal View History

from enum import Enum
class IOC:
def __init__(self, value: str, desc: str, c_type: int, cat: int):
self.ioc_value = value
self.ioc_description = desc
self.ioc_type_id = c_type
self.ioc_tlp_id = cat
def to_dict(self):
return {
"ioc_value": self.ioc_value,
"ioc_description": self.ioc_description,
"ioc_type_id": self.ioc_type_id,
"ioc_tlp_id": self.ioc_tlp_id
}
class Alert:
def __init__(self, data):
self.data = data
self.iocs = []
def _level_convert(self, alert_level: int):
if alert_level < 5:
severity = 2
elif alert_level >= 5 and alert_level < 7:
severity = 3
elif alert_level >= 7 and alert_level < 10:
severity = 4
elif alert_level >= 10 and alert_level < 13:
severity = 5
elif alert_level >= 13:
severity = 6
else:
severity = 1
return severity
class WebAlert(Alert):
def __init__(self, data):
super().__init__(data)
self.id: str = data.get("_id", "NONE")
self.srcip: str = data.get("data", {}).get("srcip", "unknown")
self.url: str = data.get("data", {}).get("url", "invalid")
self.title: str = data.get("rule", {}).get("description", "No description provided")
self.technique: str = ' '.join(data.get("rule", {}).get("technique", {}))
self.timestamp: str = data.get("timestamp", "")
self.protocol: str = data.get("data", {}).get("protocol", "invalid")
self.full_log: str = data.get("full_log", "Full log unknown")
self.severity: int = int(data.get("rule", {}).get("level", -1))
self.iocs = []
self._generateIOCs()
def _generateIOCs(self):
if self.srcip != "unknown":
self.iocs.append(IOC(self.srcip, "Source IP", 79, 5))
if self.url != "invalid":
self.iocs.append(IOC(self.url, "url", 141, 5))
def to_IRIS(self) -> dict:
return {
"alert_title": self.title,
"alert_note": "Suspicious web activity was detected.",
"alert_iocs": [i.to_dict() for i in self.iocs],
"alert_source": "NGINX web activity",
"alert_severity_id": self._level_convert(self.severity),
"alert_status_id": 3,
"alert_customer_id": 1,
"alert_description": self.full_log,
"alert_source_content": self.data,
}
class AlertProcessor:
ALERT_TYPES = {
"web-accesslog": WebAlert,
}
def process(self, body: dict) -> Alert:
decoder_name = body.get("decoder", {}).get("name", "N/A")
alert_class = self.ALERT_TYPES.get(decoder_name, Alert)
return alert_class(body)