آموزش یکپارچگی TheHive و Wazuh
Wazuh یک راه حل امنیتی انعطاف پذیر است که به خوبی با راه حلهای دیگر یکپارچه میشود. Wazuh یک SIEM و XDR منبع باز است و به کاربران آزادی ساخت و استفاده از اسکریپت برای یکپارچه سازی سفارشی را میدهد. این پست نشان میدهد که Wazuh به خوبی با TheHive با کمک اسکریپتهای سفارشی ادغام میشود.
این ادغام بر روی Wazuh نسخه 4.2.5 و TheHive نسخه 4.1.18-1 تست شده است.
شما همانند تصویر زیر باید به کاربر تحت وب THE Hive خود وارد شوید، بر روی NEW Organisation کلیک کنید و یک سازمان جدید با نام و توضیحات مخصوص به آن سازمان درست کنید و آن را سیو کنید. که در این قسمت یک سازمان با نام Test Organization با سطح دسترسی Admin ساخته شده است.
این کاربر دارای مجوزهایی برای مدیریت Organisation ، از جمله ایجاد کاربران جدید، مدیریت پروندهها، و هشدارها و … است. همچنین یک رمز عبور برای کاربر ایجاد کنید تا بتوانید برای مشاهده داشبورد و مدیریت موارد وارد شوید. همان طور که در تصویر زیر مشاهده میکنید این کار با کلیک بر روی “رمز عبور جدید” در کنار حساب کاربری و وارد کردن رمز عبور مورد نظر انجام میشود .
یکپارچگی با Wazuh با کمک TheHive REST API امکان پذیر است. بنابراین، به کاربرساخته شده در TheHive باید بتواند از طریق API هشدار ایجاد کند. برای همین منظور یک حساب کاربری با امتیاز “تحلیلگر” برای این منظور ایجاد کنید.
در گام بعدی باید یک کلید همان طور که تصویر زیر آمده است برای API تعریف کنید.
شما میتوانید به منظور استخراج کلید API، کلید را برای مشاهده و کپی کردن آن برای استفاده در آینده آشکار کنید.
پیکر بندی Wazuh
ابتدا شما باید ماژول TheHive Python را نصب کنید برای اینکار شما میتوانید از دستور زیر استفاده کنید
sudo /var/ossec/framework/python/bin/pip3 install thehive4py==1.8.1
اسکریپت سفارشی یکپارچه شدن را با چسباندن کد پایتون custom-w2thive.py درمسیر /var/ossec/integrations/custom-w2thive.py ایجاد میکنیم کد پایتون custom-w2thive.py را شما میتوانید مشاهده کنید.
#!/var/ossec/framework/python/bin/python3 import json import sys import os import re import logging import uuid from thehive4py.api import TheHiveApi from thehive4py.models import Alert, AlertArtifact #start user config # Global vars #threshold for wazuh rules level lvl_threshold=0 #threshold for suricata rules level suricata_lvl_threshold=3 debug_enabled = False #info about created alert info_enabled = True #end user config # Set paths pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) log_file = '{0}/logs/integrations.log'.format(pwd) logger = logging.getLogger(__name__) #set logging level logger.setLevel(logging.WARNING) if info_enabled: logger.setLevel(logging.INFO) if debug_enabled: logger.setLevel(logging.DEBUG) # create the logging file handler fh = logging.FileHandler(log_file) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) def main(args): logger.debug('#start main') logger.debug('#get alert file location') alert_file_location = args[1] logger.debug('#get TheHive url') thive = args[3] logger.debug('#get TheHive api key') thive_api_key = args[2] thive_api = TheHiveApi(thive, thive_api_key ) logger.debug('#open alert file') w_alert = json.load(open(alert_file_location)) logger.debug('#alert data') logger.debug(str(w_alert)) logger.debug('#gen json to dot-key-text') alt = pr(w_alert,'',[]) logger.debug('#formatting description') format_alt = md_format(alt) logger.debug('#search artifacts') artifacts_dict = artifact_detect(format_alt) alert = generate_alert(format_alt, artifacts_dict, w_alert) logger.debug('#threshold filtering') if w_alert['rule']['groups']==['ids','suricata']: #checking the existence of the data.alert.severity field if 'data' in w_alert.keys(): if 'alert' in w_alert['data']: #checking the level of the source event if int(w_alert['data']['alert']['severity'])<=suricata_lvl_threshold: send_alert(alert, thive_api) elif int(w_alert['rule']['level'])>=lvl_threshold: #if the event is different from suricata AND suricata-event-type: alert check lvl_threshold send_alert(alert, thive_api) def pr(data,prefix, alt): for key,value in data.items(): if hasattr(value,'keys'): pr(value,prefix+'.'+str(key),alt=alt) else: alt.append((prefix+'.'+str(key)+'|||'+str(value))) return alt def md_format(alt,format_alt=''): md_title_dict = {} #sorted with first key for now in alt: now = now[1:] #fix first key last symbol dot = now.split('|||')[0].find('.') if dot==-1: md_title_dict[now.split('|||')[0]] =[now] else: if now[0:dot] in md_title_dict.keys(): (md_title_dict[now[0:dot]]).append(now) else: md_title_dict[now[0:dot]]=[now] for now in md_title_dict.keys(): format_alt+='### '+now.capitalize()+'\n'+'| key | val |\n| ------ | ------ |\n' for let in md_title_dict[now]: key,val = let.split('|||')[0],let.split('|||')[1] format_alt+='| **' + key + '** | ' + val + ' |\n' return format_alt def artifact_detect(format_alt): artifacts_dict = {} artifacts_dict['ip'] = re.findall(r'\d+\.\d+\.\d+\.\d+',format_alt) artifacts_dict['url'] = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+',format_alt) artifacts_dict['domain'] = [] for now in artifacts_dict['url']: artifacts_dict['domain'].append(now.split('//')[1].split('/')[0]) return artifacts_dict def generate_alert(format_alt, artifacts_dict,w_alert): #generate alert sourceRef sourceRef = str(uuid.uuid4())[0:6] artifacts = [] if 'agent' in w_alert.keys(): if 'ip' not in w_alert['agent'].keys(): w_alert['agent']['ip']='no agent ip' else: w_alert['agent'] = {'id':'no agent id', 'name':'no agent name'} for key,value in artifacts_dict.items(): for val in value: artifacts.append(AlertArtifact(dataType=key, data=val)) alert = Alert(title=w_alert['rule']['description'], tlp=2, tags=['wazuh', 'rule='+w_alert['rule']['id'], 'agent_name='+w_alert['agent']['name'], 'agent_id='+w_alert['agent']['id'], 'agent_ip='+w_alert['agent']['ip'],], description=format_alt , type='wazuh_alert', source='wazuh', sourceRef=sourceRef, artifacts=artifacts,) return alert def send_alert(alert, thive_api): response = thive_api.create_alert(alert) if response.status_code == 201: logger.info('Create TheHive alert: '+ str(response.json()['id'])) else: logger.error('Error create TheHive alert: {}/{}'.format(response.status_code, response.text)) if __name__ == "__main__": try: logger.debug('debug mode') # if debug enabled # Main function main(sys.argv) except Exception: logger.exception('EGOR')
متغیر lvl_threshold در اسکریپت زیر نشان دهنده حداقل سطح هشداری است که به TheHive ارسال میشود. متغیر را میتوان به گونه ای سفارشی کنید که فقط هشدارهای مربوطه به TheHive ارسال شوند.
حال شما یک bash scrip درمسیر /var/ossec/integrations/custom-w2thive ایجاد کنید
#!/bin/sh # Copyright (C) 2015-2020, Wazuh Inc. # Created by Wazuh, Inc. <info@wazuh.com>. # This program is free software; you can redistribute it and/or modify it under the terms of GP> WPYTHON_BIN="framework/python/bin/python3" SCRIPT_PATH_NAME="$0" DIR_NAME="$(cd $(dirname ${SCRIPT_PATH_NAME}); pwd -P)" SCRIPT_NAME="$(basename ${SCRIPT_PATH_NAME})" case ${DIR_NAME} in */active-response/bin | */wodles*) if [ -z "${WAZUH_PATH}" ]; then WAZUH_PATH="$(cd ${DIR_NAME}/../..; pwd)" fi PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py" ;; */bin) if [ -z "${WAZUH_PATH}" ]; then WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)" fi PYTHON_SCRIPT="${WAZUH_PATH}/framework/scripts/${SCRIPT_NAME}.py" ;; */integrations) if [ -z "${WAZUH_PATH}" ]; then WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)" fi PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py" ;; esac ${WAZUH_PATH}/${WPYTHON_BIN} ${PYTHON_SCRIPT} $@
شما باید دسترسی و مالکیت فایلها را تغییر میدهید تا مطمئن شوید Wazuh مجوزهای کافی برای دسترسی و اجرای آنها را دارد برای اینکار میتوانید از دستورات زیر استفاده کنید.
sudo chmod 755 /var/ossec/integrations/custom-w2thive.py sudo chmod 755 /var/ossec/integrations/custom-w2thive sudo chown root:ossec /var/ossec/integrations/custom-w2thive.py sudo chown root:ossec /var/ossec/integrations/custom-w2thive
برای اینکه به Wazuh اجازه دهید اسکریپت یکپارچه سازی را اجرا کند، خطوط زیر را به فایل پیکربندی مدیر واقع در /var/ossec/etc/ossec.conf اضافه کنید. آدرس IP سرور TheHive را به همراه کلید API که قبلاً ایجاد شده بود وارد وارد کنید.
… custom-w2thive http://TheHive_Server_IP:9000 RWw/Ii0yE6l+Nnd3nv3o3Uz+5UuHQYTM json …
کار یکپارچه سازی به پایان رسیده و فقط برای اعمال تغییرات Wazuh را باید ریستارت کنید.
sudo systemctl restart wazuh-manager
با حساب کاربری Test وارد TheHive شوید و میتوانیم هشدارهای تولید شده Wazuh را در برگه “Alerts” مشاهده کنید.