From 6e4ef51973c097fdacd70dd5d60fecdb3cddb538 Mon Sep 17 00:00:00 2001 From: 0x221E Date: Tue, 27 Jan 2026 20:48:02 +0100 Subject: [PATCH] Initial commit: Wazuh-IRIS integration scripts --- alert.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++ custom-wazuh.py | 51 ++++++++++++++++++++++++++++++ iris_api.py | 83 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+) create mode 100644 alert.py create mode 100644 custom-wazuh.py create mode 100644 iris_api.py diff --git a/alert.py b/alert.py new file mode 100644 index 0000000..94da350 --- /dev/null +++ b/alert.py @@ -0,0 +1,80 @@ +from enum import Enum + +class IOC: + def __init__(self, value: str, desc: str, c_type: int, cat: int): + self.ioc_value = value + self.ioc_description = desc + self.ioc_type_id = c_type + self.ioc_tlp_id = cat + + def to_dict(self): + return { + "ioc_value": self.ioc_value, + "ioc_description": self.ioc_description, + "ioc_type_id": self.ioc_type_id, + "ioc_tlp_id": self.ioc_tlp_id + } + +class Alert: + def __init__(self, data): + self.data = data + self.iocs = [] + + def _level_convert(self, alert_level: int): + if alert_level < 5: + severity = 2 + elif alert_level >= 5 and alert_level < 7: + severity = 3 + elif alert_level >= 7 and alert_level < 10: + severity = 4 + elif alert_level >= 10 and alert_level < 13: + severity = 5 + elif alert_level >= 13: + severity = 6 + else: + severity = 1 + return severity + +class WebAlert(Alert): + def __init__(self, data): + super().__init__(data) + self.id: str = data.get("_id", "NONE") + self.srcip: str = data.get("data", {}).get("srcip", "unknown") + self.url: str = data.get("data", {}).get("url", "invalid") + self.title: str = data.get("rule", {}).get("description", "No description provided") + self.technique: str = ' '.join(data.get("rule", {}).get("technique", {})) + self.timestamp: str = data.get("timestamp", "") + self.protocol: str = data.get("data", {}).get("protocol", "invalid") + self.full_log: str = data.get("full_log", "Full log unknown") + self.severity: int = int(data.get("rule", {}).get("level", -1)) + self.iocs = [] + self._generateIOCs() + + def _generateIOCs(self): + if self.srcip != "unknown": + self.iocs.append(IOC(self.srcip, "Source IP", 79, 5)) + if self.url != "invalid": + self.iocs.append(IOC(self.url, "url", 141, 5)) + + def to_IRIS(self) -> dict: + return { + "alert_title": self.title, + "alert_note": "Suspicious web activity was detected.", + "alert_iocs": [i.to_dict() for i in self.iocs], + "alert_source": "NGINX web activity", + "alert_severity_id": self._level_convert(self.severity), + "alert_status_id": 3, + "alert_customer_id": 1, + "alert_description": self.full_log, + "alert_source_content": self.data, + } + +class AlertProcessor: + ALERT_TYPES = { + "web-accesslog": WebAlert, + } + + def process(self, body: dict) -> Alert: + decoder_name = body.get("decoder", {}).get("name", "N/A") + alert_class = self.ALERT_TYPES.get(decoder_name, Alert) + return alert_class(body) diff --git a/custom-wazuh.py b/custom-wazuh.py new file mode 100644 index 0000000..c9dbfb4 --- /dev/null +++ b/custom-wazuh.py @@ -0,0 +1,51 @@ +import sys +import alert +import logging +import iris_api +from datetime import datetime + +logging.basicConfig(filename='/var/ossec/logs/integrations.log', level=logging.INFO, + format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + +def main(): + if len(sys.argv) < 4: + logging.error("Insufficient arguments provided. Exiting.") + sys.exit(1) + + alert_file = sys.argv[1] + api_key = sys.argv[2] + hook_url = sys.argv[3] + + try: + with open(alert_file) as f: + alert_json = json.load(f) + except Exception as e: + logging.error(f"Failed to read alert file: {e}") + sys.exit(1) + + client = iris_api.IrisClient(hook_url, api_key) + + processor = alert.AlertProcessor() + + formatted_alert = processor.process(alert_json) + + alert_result = client.alert(a.to_IRIS()) + + match = None + + for case in client.cases_list(): + if a.srcip in case["case_name"]: + match = case + + if match == None: + client.case_new(a.srcip, a.title) + else: + iocs = [] + + for ioc in alert_result.get("iocs", {}): + iocs.append(ioc.get("ioc_uuid", "N/A")) + + client.merge_alert_to_case(alert_result.get("alert_id", -1), match.get("case_id", -1), iocs) + +if __name__ == "__main__": + main() diff --git a/iris_api.py b/iris_api.py new file mode 100644 index 0000000..bb90122 --- /dev/null +++ b/iris_api.py @@ -0,0 +1,83 @@ +import requests + +class IrisClient: + def __init__(self, url: str, token: str): + self.url = url + self.token = token + self.post_headers = {"Authorization": f"Bearer {self.token}", "content-type": "application/json"} + + def alert(self, body: dict): + resp = requests.post(f"{self.url}/alerts/add", headers=self.post_headers, json=body, verify=False) + + if resp.status_code != 200: + print(resp) + return -1 + + resp = resp.json() + + if resp["status"] != "success": + print(resp["message"]) + return -1 + + print(f"Success: {resp}") + return resp["data"] + + def case_new(self, ip: str, brief_desc: str): + body = { + "case_soc_id": "SOC_1", + "case_customer": 1, + "case_name": f"{ip} - WEB", + "case_description": f"Case trigger: {brief_desc}" + } + + resp = requests.post(f"{self.url}/manage/cases/add", headers=self.post_headers, json=body, verify=False) + if resp.status_code != 200: + print(resp) + return -1 + + resp = resp.json() + + if resp["status"] != "success": + print(resp["message"]) + return -1 + + print(f"Success: {resp}") + return resp["data"] + + def cases_list(self): + resp = requests.get(f"{self.url}/manage/cases/list", headers=self.post_headers, verify=False) + + if resp.status_code != 200: + print(resp) + return -1 + + resp = resp.json() + + if resp["status"] != "success": + print(f"Not successful: {resp}") + return -1 + + return resp["data"] + + def merge_alert_to_case(self, alert: int, case: int, iocs: list): + body = { + "iocs_import_list": iocs.copy(), + "assets_import_list": [], + "note": "auto-triggered event.", + "import_as_event": True, + "target_case_id": str(case), + } + + resp = requests.post(f"{self.url}/alerts/merge/{alert}", headers=self.post_headers, verify=False, json=body) + + if resp.status_code != 200: + print(resp.text) + return -1 + + resp = resp.json() + + if resp["status"] != "success": + print(f"Not successful: {resp}") + return -1 + + return resp["data"]