from enum import Enum class IOC: def __init__(self, value: str, desc: str, c_type: int, cat: int): self.ioc_value = value self.ioc_description = desc self.ioc_type_id = c_type self.ioc_tlp_id = cat def to_dict(self): return { "ioc_value": self.ioc_value, "ioc_description": self.ioc_description, "ioc_type_id": self.ioc_type_id, "ioc_tlp_id": self.ioc_tlp_id } class Alert: def __init__(self, data): self.data = data self.iocs = [] def _level_convert(self, alert_level: int): if alert_level < 5: severity = 2 elif alert_level >= 5 and alert_level < 7: severity = 3 elif alert_level >= 7 and alert_level < 10: severity = 4 elif alert_level >= 10 and alert_level < 13: severity = 5 elif alert_level >= 13: severity = 6 else: severity = 1 return severity class WebAlert(Alert): def __init__(self, data): super().__init__(data) self.id: str = data.get("_id", "NONE") self.srcip: str = data.get("data", {}).get("srcip", "unknown") self.url: str = data.get("data", {}).get("url", "invalid") self.title: str = data.get("rule", {}).get("description", "No description provided") self.technique: str = ' '.join(data.get("rule", {}).get("technique", {})) self.timestamp: str = data.get("timestamp", "") self.protocol: str = data.get("data", {}).get("protocol", "invalid") self.full_log: str = data.get("full_log", "Full log unknown") self.severity: int = int(data.get("rule", {}).get("level", -1)) self.iocs = [] self._generateIOCs() def _generateIOCs(self): if self.srcip != "unknown": self.iocs.append(IOC(self.srcip, "Source IP", 79, 5)) if self.url != "invalid": self.iocs.append(IOC(self.url, "url", 141, 5)) def to_IRIS(self) -> dict: return { "alert_title": self.title, "alert_note": "Suspicious web activity was detected.", "alert_iocs": [i.to_dict() for i in self.iocs], "alert_source": "NGINX web activity", "alert_severity_id": self._level_convert(self.severity), "alert_status_id": 3, "alert_customer_id": 1, "alert_description": self.full_log, "alert_source_content": self.data, } class AlertProcessor: ALERT_TYPES = { "web-accesslog": WebAlert, } def process(self, body: dict) -> Alert: decoder_name = body.get("decoder", {}).get("name", "N/A") alert_class = self.ALERT_TYPES.get(decoder_name, Alert) return alert_class(body)