feat: adding cooldown to alerts + more precise pattern matching for TCP

This commit is contained in:
2024-11-19 16:42:08 -05:00
parent 48e9554d38
commit 3e75cb8927
3 changed files with 77 additions and 40 deletions

View File

@ -1,4 +1,6 @@
from datetime import datetime from datetime import datetime
import time
def rule(packet, tcp_packets, db): def rule(packet, tcp_packets, db):
"""Règle SYNScan: """Règle SYNScan:
@ -6,10 +8,21 @@ def rule(packet, tcp_packets, db):
Si le port est ouvert alors le serveur répondra: Syn ACK, puis le client Reset la connexion Si le port est ouvert alors le serveur répondra: Syn ACK, puis le client Reset la connexion
Sinon le port est fermé et le serveur répondera: Reset ACK Sinon le port est fermé et le serveur répondera: Reset ACK
""" """
if (rule.cooldown + rule.time_window > time.time()):
return
time_window = db.get_key("synscan_time", 180) # Vérification si nécessaire de récupérer les variables depuis la config
seuil = db.get_key("synscan_count", 5) if (rule.seuil == 0 and rule.time_window == 0):
rule.time_window = db.get_key("synscan_time", 180)
rule.seuil = db.get_key("synscan_count", 5)
if (tcp_packets.count_packet_of_type("RA", time_window) + tcp_packets.count_packet_of_type("SA", time_window)) + tcp_packets.count_packet_of_type("R", time_window) >= seuil: if tcp_packets.count_packet_of_type(["S", "RA"], rule.time_window, True) + tcp_packets.count_packet_of_type(["S", "SA", "RA"], rule.time_window, True) >= rule.seuil:
db.send_alert(datetime.now(), 5, None, "Syn scan", packet['IP'].src, packet['IP'].dst, proto="TCP", reason="Détection de nombreux patterns de Syn->SynACK->Reset ACK et Syn->Reset ACK", act="Alerte") db.send_alert(datetime.now(), 5, None, "Syn scan", packet['IP'].src, packet['IP'].dst, proto="TCP", reason="Détection de nombreux patterns de Syn->SynACK->Reset ACK et Syn->Reset ACK", act="Alerte")
print(f"Alerte, seuil dépassés, risque de SynScan") print(f"Alerte, seuil dépassés, risque de SynScan")
rule.cooldown = time.time()
# Variables statiques
rule.cooldown = 0
rule.time_window = 0
rule.seuil = 0

View File

@ -1,4 +1,6 @@
from datetime import datetime from datetime import datetime
import time
def rule(packet, tcp_packets, db): def rule(packet, tcp_packets, db):
"""Règle TCPConnect Scan: """Règle TCPConnect Scan:
@ -6,9 +8,22 @@ def rule(packet, tcp_packets, db):
Si le port est ouvert le serveur acceptera la connexion SYN -> SYN ACK -> ACK -> Reset -> ACK Si le port est ouvert le serveur acceptera la connexion SYN -> SYN ACK -> ACK -> Reset -> ACK
Sinon le port est fermé et le serveur refusera la connexion SYN -> Reset ACK Sinon le port est fermé et le serveur refusera la connexion SYN -> Reset ACK
""" """
time_window = db.get_key("tcpconnectscan_time", 180)
seuil = db.get_key("tcpconnectscan_count", 5)
if (tcp_packets.count_packet_of_type("A", time_window) + tcp_packets.count_packet_of_type("RA", time_window)) >= seuil: if (rule.cooldown + rule.time_window > time.time()):
return
# Vérification si nécessaire de récupérer les variables depuis la config
if (rule.seuil == 0 and rule.time_window == 0):
rule.time_window = db.get_key("tcpconnectscan_time", 180)
rule.seuil = db.get_key("tcpconnectscan_count", 5)
if tcp_packets.count_packet_of_type(["S", "SA", "A", "RA"], rule.time_window, True) + tcp_packets.count_packet_of_type(["S", "RA"], rule.time_window, True) >= rule.seuil:
db.send_alert(datetime.now(), 5, None, "TCPConnect Scan", packet['IP'].src, packet['IP'].dst, proto="TCP", reason="Détection de nombreux patterns de Syn->SynACK->ACK->Reset->ACK et Syn->Reset ACK", act="Alerte") db.send_alert(datetime.now(), 5, None, "TCPConnect Scan", packet['IP'].src, packet['IP'].dst, proto="TCP", reason="Détection de nombreux patterns de Syn->SynACK->ACK->Reset->ACK et Syn->Reset ACK", act="Alerte")
print(f"Alerte, seuils dépassés, risque de TCPConnectScan") print(f"Alerte, seuils dépassés, risque de TCPConnectScan")
rule.cooldown = time.time()
# Variables statiques
rule.cooldown = 0
rule.time_window = 180
rule.seuil = 5

View File

@ -19,74 +19,81 @@ class TCP:
self.packets[ip_src] = [] self.packets[ip_src] = []
if flags == "S": if flags == "S":
self.packets[ip_src].append([port_src, ip_dst, port_dst, [flags], timestamp]) self.packets[ip_src].append([port_src, ip_dst, port_dst, ["S"], timestamp])
return return
elif flags == "SA": elif flags == "SA":
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "S", True) i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "S")
if i is not None: if i is not None:
self.packets[ip_dst][i][3].append("SA") print(f"i: {i}, {ip_src}:{port_src}->{ip_dst}:{port_dst}, paquets: \n{self.packets}")
self.packets[ip_dst][i][4] = timestamp self.packets[ip][i][3].append("SA")
self.packets[ip][i][4] = timestamp
return return
else: else:
self.packets[ip_src].append([port_src, ip_dst, port_dst, [flags], timestamp]) self.packets[ip_src].append([port_src, ip_dst, port_dst, ["SA"], timestamp])
return return
elif flags == "A": elif flags == "A":
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "SA") i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "SA")
if i is None: if i is None:
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "R", True) i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "R")
if i is not None: if i is not None:
self.packets[ip_src][i][3].append("A") print(f"i: {i}, {ip_src}:{port_src}->{ip_dst}:{port_dst}, paquets: \n{self.packets}")
self.packets[ip_src][i][4] = timestamp self.packets[ip][i][3].append("A")
self.packets[ip][i][4] = timestamp
return return
else: else:
self.packets[ip_src].append([port_src, ip_dst, port_dst, [flags], timestamp]) self.packets[ip_src].append([port_src, ip_dst, port_dst, ["A"], timestamp])
return return
elif flags == "RA": elif flags == "RA":
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "A") i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "A")
if i is None: if i is None:
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "S") i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "S")
if i is not None: if i is not None:
self.packets[ip_src][i][3].append("RA") print(f"i: {i}, {ip_src}:{port_src}->{ip_dst}:{port_dst}, paquets: \n{self.packets}")
self.packets[ip_src][i][4] = timestamp self.packets[ip][i][3].append("RA")
self.packets[ip][i][4] = timestamp
return return
else: else:
self.packets[ip_src].append([port_src, ip_dst, port_dst, [flags], timestamp]) self.packets[ip_src].append([port_src, ip_dst, port_dst, ["RA"], timestamp])
return return
elif flags == "R": elif flags == "R":
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "A") i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "A")
if i is None: if i is None:
i = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "S") i, ip = self.find_packet_to_replace(ip_src, port_src, ip_dst, port_dst, "S")
if i is not None: if i is not None:
self.packets[ip_src][i][3].append("R") print(f"i: {i}, {ip_src}:{port_src}->{ip_dst}:{port_dst}, paquets: \n{self.packets}")
self.packets[ip_src][i][4] = timestamp self.packets[ip][i][3].append("R")
self.packets[ip][i][4] = timestamp
return return
else: else:
self.packets[ip_src].append([port_src, ip_dst, port_dst, [flags], timestamp]) self.packets[ip_src].append([port_src, ip_dst, port_dst, ["R"], timestamp])
return return
def find_packet_to_replace(self, ip_src, port_src, ip_dst, port_dst, flags, reverse=False): def find_packet_to_replace(self, ip_src, port_src, ip_dst, port_dst, flags):
"""Cherche l'indice du paquet dont le flag doit être remplacé""" """Cherche l'indice et le port de source du paquet dont le flag doit être remplacé"""
if reverse is True:
ip_src, ip_dst = ip_dst, ip_src
port_src, port_dst = port_dst, port_src
if ip_src not in self.packets.keys(): # Recherche dans le sens src->dst
return None if ip_src in self.packets.keys():
for i, [p_s, ip_d, p_d, f, stamp] in enumerate(self.packets[ip_src]):
if p_s == port_src and ip_d == ip_dst and p_d == port_dst and flags in f:
return i, ip_src
for i, [p_s, ip_d, p_d, f, stamp] in enumerate(self.packets[ip_src]): # Recherche dans le sens dst->src
if p_s == port_src and ip_d == ip_dst and p_d == port_dst and flags in f: if ip_dst in self.packets.keys():
return i for i, [p_d, ip_s, p_s, f, stamp] in enumerate(self.packets[ip_dst]):
return None if p_s == port_src and ip_s == ip_src and p_d == port_dst and flags in f:
return i, ip_dst
return None, None
def clean_old_packets(self): def clean_old_packets(self):
"""Supprime les paquets qui date de plus longtemps que le temps de clean""" """Supprime les paquets qui date de plus longtemps que le temps de clean"""
@ -108,14 +115,16 @@ class TCP:
if not self.packets[ip_src]: if not self.packets[ip_src]:
del self.packets[ip_src] del self.packets[ip_src]
def count_packet_of_type(self, flag, time_treshold): def count_packet_of_type(self, flag, time_treshold, isList = False):
"""Compte les paquets qui ont le flag choisi et qui sont dans la fenêtre de temps""" """Compte les paquets qui ont le flag choisi et qui sont dans la fenêtre de temps"""
count = 0 count = 0
current_timestamp = time.time() current_timestamp = time.time()
for ip in list(self.packets.keys()): for ip in list(self.packets.keys()):
for packet in self.packets[ip]: for packet in self.packets[ip]:
if flag in packet[3] and packet[4] >= current_timestamp - time_treshold: if isList and set(flag) == set(packet[3]) and packet[4] >= current_timestamp - time_treshold:
count += 1
elif not isList and flag in packet[3] and packet[4] >= current_timestamp - time_treshold:
count += 1 count += 1
return count return count