feat: redis fake data script

This commit is contained in:
Oxbian 2024-11-04 17:47:31 -05:00
parent 0cd9fc7adf
commit ed2ce209df
Signed by: Oxbian
GPG Key ID: 0E8F319FD43747E5
4 changed files with 136 additions and 3 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.venv
.venv/*

View File

@ -30,9 +30,36 @@ docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:lat
*Installation interface web & noyau de l'IDS*
## Structure de la base de données Redis
La base de données Redis est structurée de la façon suivante:
- stream `logs:alertes` contenant toutes les alertes envoyés par le noyau de l'IDPS
- stream `logs:correlations` contenant toutes les corrélations d'alertes, avec un message de corrélation et les alertes corrélées
Ces deux streams suivent la norme CEF, mais sont structurées sous la forme d'objet et non d'une seule chaine de caractère. Afin de faciliter le parsing par la suite.
## Interface de tests d'alertes
Un script python `tests/cef-generator.py` permet de générer des alertes CEF dans la base de données Redis.
Ce script peut être utile pour le développement d'interface d'affichage des alertes. Pour l'utiliser il faut une base de donnée redis, et mettre les identifiants dans le script.
De plus, ce script à besoin de la librairie `redis` pour pouvoir ajouter / faire des requêtes à la base de données Redis.
Pour cela, utiliser les commandes suivantes:
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Puis executer le script `tests/cef-generator.py` avec le python3 du l'environnement virtuel.
```bash
.venv/bin/python3 tests/cef-generator.py
```
## TODO
- Choix du format d'enregistrement des données dans Redis
- Noyau d'analyse de l'IDS
- Interface web pour visualiser les alertes / rechercher dedans
- Noyau d'analyse de l'IDS
- Interface web pour visualiser les alertes / rechercher dedans
- Moteur de corrélation des alertes (récupération + renvoi dans Redis).

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
hiredis==3.0.0
redis==5.2.0

102
tests/cef-generator.py Normal file
View File

@ -0,0 +1,102 @@
# Générateur d'alertes CEF (Common Event Format)
# Pratique pour le moteur de corrélation et le site web
# Une alerte CEF est formattée de cette façon:
# CEF:Version|Device Vendor|Device Product|Device Version|Device Event Class ID|Name|Severity|[Extension]
import redis
import time
import random
def generate_alert(alert_type):
# Dictionnaire pour différents types d'alertes réseau et fichiers
alert_templates = {
"network": {
"Syn Flood": {
"Device_event_class_id": "1001",
"name": "Syn Flood Detected",
"src" : f"{generate_ip()}",
"dst" : f"{generate_ip()}",
"agent_severity": "8"
},
"Port Scanning": {
"Device_event_class_id": "1002",
"name": "Port Scanning Activity",
"src" : f"{generate_ip()}",
"dst" : f"{generate_ip()}",
"cs1" : f"{generate_ports()}",
"agent_severity": "5"
}
},
"file": {
"Suspicious File Creation": {
"Device_event_class_id": "2001",
"name": "Suspicious File Created",
"fname" : f"{generate_filename()}",
"fsize" : f"{random.randint(10,1000)}kb",
"agent_severity": "7"
},
"Critical File Deletion Attempt": {
"Device_event_class_id": "2002",
"name": "Critical File Deletion Attempt",
"fname" : f"{generate_filename()}",
"agent_severity": "9"
}
}
}
# Sélectionner le bon template en fonction du type d'alerte
category = "network" if alert_type in alert_templates["network"] else "file"
alert_info = alert_templates[category].get(alert_type, {})
if not alert_info:
raise ValueError(f"Unknown alert type: {alert_type}")
return alert_info
def generate_ip():
# Générer une adresse IP aléatoire
return ".".join(str(random.randint(0, 255)) for _ in range(4))
def generate_ports():
# Générer une liste de ports scannés
return ",".join(str(random.randint(20, 1024)) for _ in range(5))
def generate_filename():
# Générer un nom de fichier aléatoire
filenames = ["config.txt", "database.db", "system32.dll", "passwd", "shadow", "sensitive_data.doc"]
return random.choice(filenames)
def generate_alerts(db, main_headers):
# Récupérer ces données depuis une fonction
alertes = ["Syn Flood", "Port Scanning", "Suspicious File Creation", "Critical File Deletion Attempt"]
while (1):
data = generate_alert(random.choice(alertes))
merged = main_headers.copy()
merged.update(data)
# Ajout dans redis
response = db.xadd("logs:alertes", merged)
time.sleep(random.randint(1, 10))
def main():
# Connexion à Redis (si besoin changer l'host et le port)
db = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Pour une db en production (https://redis.io/docs/latest/operate/oss_and_stack/management/security/acl/)
#db = redis.Redis(host="my-redis.cloud.redislabs.com", port=6379, username="default", password="secret", ssl=True, ssl_certfile="./redis_user.crt", ssl_keyfile="./redis_user_private.key", ssl_ca_certs="./redis_ca.pem")
CEF_version=1
Device_vendor="ArKa"
Device_product="SIDPS"
Device_version="vAlpha"
main_headers = {"CEF": CEF_version, "Device Vendor" : Device_vendor, "Device Product" : Device_product, "Device_version" : Device_version}
generate_alerts(db, main_headers)
if __name__ == "__main__":
main()