mirror of
https://github.com/Oxbian/SIDPS.git
synced 2025-05-17 22:18:13 +02:00
150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
import importlib.util
|
|
import os
|
|
import time
|
|
import tcp
|
|
import database
|
|
import json
|
|
import protection
|
|
|
|
from scapy.all import sniff, TCP, IP
|
|
from scapy.config import conf
|
|
conf.debug_dissector = 2
|
|
|
|
|
|
def check_frame_w_rules(packet, rules_functions, objets):
|
|
"""Appliquer chaque règle des fonctions au paquet capturé.
|
|
@param packet: Paquet actuel à analyser
|
|
@param rules_functions: liste de fonctions de règles
|
|
@param objets: Dictionnaire contenant le dictionnaire de config, la liste des paquets tcp précédents,
|
|
la db, et le gestionnaire de règles Iptables"""
|
|
|
|
for rule_func in rules_functions:
|
|
try:
|
|
rule_func(packet, objets)
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'exécution de la règle : {e}")
|
|
|
|
|
|
def packet_callback(packet, rules_functions, objets):
|
|
"""Callback réception d'un paquet
|
|
@param packet: Paquet actuel à classer
|
|
@param rules_functions: liste des fonctions de règles
|
|
@param objets: Dictionnaire contenant le dictionnaire de config, la liste des paquets tcp précédents,
|
|
la db, et le gestionnaire de règles Iptables"""
|
|
|
|
# Nettoyage des règles et paquets TCP dépassé
|
|
objets["iptables_manager"].del_rules()
|
|
objets["tcp_packets"].clean_old_packets()
|
|
|
|
if IP in packet and TCP in packet:
|
|
packet_origin = objets["tcp_packets"].add_packet(packet[IP].src, packet[TCP].sport, packet[IP].dst, packet[TCP].dport, packet[TCP].flags, time.time())
|
|
|
|
# Stockage du paquet originel lié à ce paquet pour identifier la provenance de l'attaque
|
|
objets['pkt_origin'] = packet_origin
|
|
|
|
check_frame_w_rules(packet, rules_functions['TCP'], objets)
|
|
|
|
|
|
def load_rules(rules_dirpath = "/app/idps/rules"):
|
|
"""Charger les fonctions de règles du répertoire de règles et des sous répertoires
|
|
@param rules_dirpath: Répertoire contenant les fichiers python de règles
|
|
"""
|
|
|
|
if not os.path.exists(rules_dirpath):
|
|
raise ValueError(f"Le chemin spécifié n'existe pas: {rules_dirpath}")
|
|
|
|
if not os.path.isdir(rules_dirpath):
|
|
raise ValueError(f"Le chemin spécifié n'est pas un répertoire: {rules_dirpath}")
|
|
|
|
rules_functions = {}
|
|
# Liste de répertoires à explorer
|
|
dirs_to_explore = [rules_dirpath]
|
|
|
|
# Explorer chaque répertoire / sous répertoire à la rechercher de fichier de règles
|
|
while dirs_to_explore:
|
|
current_dir = dirs_to_explore.pop()
|
|
|
|
try:
|
|
for entry in os.scandir(current_dir):
|
|
# Ignorer les liens symboliques
|
|
if entry.is_symlink():
|
|
continue
|
|
|
|
# Ajouter les répertoires dans la liste à explorer
|
|
if entry.is_dir():
|
|
dirs_to_explore.append(entry.path)
|
|
elif entry.is_file() and entry.name.endswith(".py"):
|
|
# Suppression de l'extension .py
|
|
module_name = entry.name[:-3]
|
|
|
|
# Déterminer le protocole à partir du répertoire parent
|
|
if "TCP" in entry.path:
|
|
parent_dir = "TCP"
|
|
else:
|
|
parent_dir = "WTF"
|
|
|
|
if parent_dir not in rules_functions:
|
|
rules_functions[parent_dir] = []
|
|
|
|
# Chargement du module
|
|
spec = importlib.util.spec_from_file_location(module_name, entry.path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Vérification que le module possède bien la fonction rule
|
|
if hasattr(module, "rule"):
|
|
rules_functions[parent_dir].append(module.rule)
|
|
|
|
except PermissionError:
|
|
raise PermissionError(f"Permission refusée pour accéder au répertoire: {current_dir}")
|
|
except OSError as e:
|
|
raise OSError(f"Erreur lors de l'accès au répertoire {current_dir}: {e}")
|
|
|
|
return rules_functions
|
|
|
|
|
|
def read_config(config_filepath='config.json'):
|
|
"""Charge les configurations depuis le fichier de config"""
|
|
|
|
try:
|
|
with open(config_filepath, 'r', encoding='utf-8') as file:
|
|
config = json.load(file)
|
|
return config
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(f"Le fichier JSON {config_filepath} n'a pas été trouvé.")
|
|
except json.JSONDecodeError:
|
|
raise json.JSONDecodeError("Erreur lors de la lecture du fichier JSON. Le format peut être incorrect.")
|
|
|
|
|
|
def start_idps():
|
|
"""Charge les règles et démarre l'IDPS"""
|
|
|
|
print("Récupération des configurations")
|
|
config = read_config()
|
|
print("Configurations chargées")
|
|
|
|
print("Chargement des règles...")
|
|
rules_functions = load_rules(config["rules_dirpath"])
|
|
print("Les règles sont chargées")
|
|
|
|
print("Connexion à la base de données")
|
|
db = database.Database(config)
|
|
print("Connexion réussite à la base de données")
|
|
|
|
tcp_packets = tcp.TCP(300)
|
|
protection_system = protection.Protection(config["protection"])
|
|
|
|
objets = {"config": config, "database": db, "tcp_packets": tcp_packets, "iptables_manager": protection_system}
|
|
|
|
# Lancer scapy & envoyer le paquet à chaque règle de l'IDPS
|
|
sniff(iface=config["ifaces"], prn=lambda packet: packet_callback(packet, rules_functions, objets), store=0)
|
|
|
|
|
|
def main():
|
|
print("Démarrage de l'IDPS")
|
|
start_idps()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|