Build a Python-Based Intrusion Detection System (IDS):
Thinking Like a Hacker to Protect Your Code
In modern software development, writing code that just "works" is no longer enough. Developers must practice Secure Coding from day one. To defend an application, you must first understand how an attacker thinks and what signatures their exploits leave behind.
This article introduces a lightweight, highly efficient Signature-Based Intrusion Detection System (IDS) and Web Application Firewall (WAF) simulator written in Python. It is designed to intercept, analyze, and neutralize the most critical web vulnerabilities.
Key Cybersecurity Concepts Implemented
1. Signature-Based Detection (Regex)
The core engine utilizes Python's Regular Expressions (re module) to scan incoming payloads for known malicious patterns—such as database manipulation syntax (UNION SELECT) or unauthorized directory navigation (../).
2. State-Based Rate Limiting
Security is not just about analyzing single payloads; it is also about monitoring behavior. The script maintains a live state tracking system (self.ip_attempts) to calculate request frequencies, allowing it to dynamically flags Brute Force and Denial of Service (DoS) patterns.
3. Dynamic Threat Scoring Matrix
Modeled after enterprise web application firewalls like ModSecurity, every detected vulnerability injects risk points into a calculation matrix. If an IP accumulates critical points, the engine shifts its defensive posture and triggers a permanent IP ban.
The Python Core Engine: cyber_detector.py
Here is the complete production-ready source code for the analyzer. It relies entirely on Python standard libraries, making it fully compatible with desktop environments and mobile terminals like Termux:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Professional Intrusion Detection & Attack Analysis System (IDS/WAF)
Purpose: Educational Security Framework & Input Validation Tool
"""
import re
import socket
import hashlib
import time
from datetime import datetime
from collections import Counter
class CyberAttackDetector:
"""️ Advanced Cyber Attack Analyzer & Mitigator"""
def __init__(self):
self.attacks_db = {
'SQL_INJECTION': {
'name': ' SQL Injection',
'patterns': [
r"(\bor\b|\band\b).*?=.*?",
r"union.*?select",
r"drop.*?table",
r"exec(\s|\+)+(s|x)p\w+",
r"';.*?--",
r"1=1",
r"admin'--"
],
'severity': ' Critical',
'level': 10,
'description': 'Attempt to inject malicious SQL commands into the database query engine.'
},
'XSS': {
'name': '⚡ Cross-Site Scripting',
'patterns': [
r"<script.*?>.*?</script>",
r"javascript:",
r"onerror\s*=",
r"onload\s*=",
r"<iframe.*?>",
r"alert\(.*?\)"
],
'severity': ' Medium-High',
'level': 7,
'description': 'Attempt to inject malicious JavaScript code into user-facing web pages.'
},
'PATH_TRAVERSAL': {
'name': ' Path Traversal',
'patterns': [
r"\.\./",
r"\.\.\\",
r"%2e%2e",
r"etc/passwd",
r"windows/system32"
],
'severity': ' High',
'level': 9,
'description': 'Attempt to navigate out of the web root folder to read sensitive system files.'
},
'COMMAND_INJECTION': {
'name': ' Command Injection',
'patterns': [
r";\s*ls",
r";\s*cat",
r"\|\s*whoami",
r"&&.*?rm",
r"`.*?`",
r"\$\(.*?\)"
],
'severity': ' Critical',
'level': 10,
'description': 'Attempt to execute unauthorized operating system commands directly on the host.'
},
'XXE': {
'name': ' XML External Entity',
'patterns': [
r"<!ENTITY.*?>",
r"<!DOCTYPE.*?\[",
r"SYSTEM.*?file://",
r"<!ELEMENT"
],
'severity': ' High',
'level': 9,
'description': 'Attempt to abuse weakly configured XML parsers to leak internal data.'
},
'SSRF': {
'name': ' Server-Side Request Forgery',
'patterns': [
r"localhost",
r"127\.0\.0\.1",
r"169\.254\.",
r"metadata\.google",
r"file:///",
r"dict://"
],
'severity': ' Medium',
'level': 8,
'description': 'Attempt to manipulate the server into initiating unauthorized requests to backend services.'
},
'BRUTE_FORCE': {
'name': ' Brute Force Attack',
'patterns': [],
'severity': ' Warning',
'level': 6,
'description': 'Multiple failed authentication or endpoint discovery attempts detected.'
},
'DOS': {
'name': ' Denial of Service',
'patterns': [],
'severity': ' High',
'level': 9,
'description': 'High frequency request patterns indicating an attempt to exhaust system resources.'
}
}
self.attack_log = []
self.ip_attempts = {}
self.blocked_ips = set()
def analyze_payload(self, payload, source_ip='Unknown', method='GET'):
""" Scans raw incoming payloads and calculates immediate security risk scores"""
print(f"\n{'='*80}")
print(f" INCOMING TRAFFIC ANALYSIS ENGINE")
print(f"{'='*80}")
print(f" Source IP: {source_ip} | Method: {method}")
print(f" Data Stream: {payload[:100]}..." if len(payload) > 100 else f" Data Stream: {payload}")
print(f"⏰ Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
detected_attacks = []
threat_score = 0
# 1. Signature Inspection Loop
for attack_type, attack_info in self.attacks_db.items():
if attack_info['patterns']:
for pattern in attack_info['patterns']:
if re.search(pattern, payload, re.IGNORECASE):
detected_attacks.append({
'type': attack_type,
'info': attack_info,
'pattern': pattern,
'matched': re.findall(pattern, payload, re.IGNORECASE)
})
threat_score += attack_info['level']
break
# 2. Behavioral Tracking (Brute Force / DoS)
if self._check_brute_force(source_ip):
detected_attacks.append({
'type': 'BRUTE_FORCE',
'info': self.attacks_db['BRUTE_FORCE'],
'pattern': 'Rate Limit Exceeded',
'matched': [f"{self.ip_attempts[source_ip]['count']} requests flagged"]
})
threat_score += 6
if self._check_dos_pattern(source_ip):
detected_attacks.append({
'type': 'DOS',
'info': self.attacks_db['DOS'],
'pattern': 'High frequency traffic anomalies',
'matched': ['DoS threshold exceeded']
})
threat_score += 9
# Log findings
self._log_attack(payload, source_ip, detected_attacks, threat_score)
# Render mitigation output
return self._display_results(detected_attacks, threat_score, source_ip)
def _check_brute_force(self, ip):
current_time = time.time()
if ip not in self.ip_attempts:
self.ip_attempts[ip] = {'count': 1, 'first_time': current_time}
return False
self.ip_attempts[ip]['count'] += 1
time_diff = current_time - self.ip_attempts[ip]['first_time']
if time_diff < 60 and self.ip_attempts[ip]['count'] > 10:
return True
if time_diff > 60:
self.ip_attempts[ip] = {'count': 1, 'first_time': current_time}
return False
def _check_dos_pattern(self, ip):
if ip in self.ip_attempts:
return self.ip_attempts[ip]['count'] > 50
return False
def _log_attack(self, payload, ip, attacks, score):
self.attack_log.append({
'timestamp': datetime.now().isoformat(),
'ip': ip,
'payload': payload,
'attacks': [a['type'] for a in attacks],
'threat_score': score
})
def _display_results(self, attacks, score, ip):
if not attacks:
print("✅ " + "="*78)
print("✅ TRAFFIC CLEAN: Payload matches standard baseline profiles.")
print("✅ " + "="*78)
return {'status': 'safe', 'threat_level': 0}
print(" " + "="*78)
print(" ALERT: CYBER ATTACK SIGNATURE DETECTED!")
print(" " + "="*78 + "\n")
if score >= 15:
threat_level = " Critical Threat Vector"
action = "⛔ Drop packet immediately & Permanent IP Block"
self.blocked_ips.add(ip)
elif score >= 10:
threat_level = " High Risk Vector"
action = "⚠️ Quarantine session & Route to honeypot"
elif score >= 5:
threat_level = " Medium Risk Vector"
action = " Passive auditing active"
else:
threat_level = " Low Risk Informational"
action = " Log profile updates only"
print(f" Global Threat Classification: {threat_level}")
print(f" Severity Risk Score: {score}/100")
print(f" Automated Action Triggered: {action}\n")
print(" Signature Extraction Logs:")
print("-" * 80)
for i, attack in enumerate(attacks, 1):
info = attack['info']
print(f"\n Exploit Context #{i}:")
print(f" Class: {info['name']}")
print(f" ⚠️ Inherent Severity: {info['severity']} (Base Level: {info['level']}/10)")
print(f" Threat Vectors: {info['description']}")
print(f" Matched Regex rule: {attack['pattern']}")
print(f" ✅ Raw Byte Matches: {attack['matched'][:3]}")
print("\n" + "="*80)
self._display_attacker_info(ip)
return {'status': 'mitigated', 'threat_level': score}
def _display_attacker_info(self, ip):
print("\n Target Host Intelligence Profile:")
print("-" * 80)
print(f" Host Address IP: {ip}")
print(f" Session Interaction Count: {self.ip_attempts.get(ip, {}).get('count', 1)}")
print(f" Active Firewall Block status: {'BLOCKED ❌' if ip in self.blocked_ips else 'ALLOWED ✅'}")
fingerprint = hashlib.md5(ip.encode()).hexdigest()[:16]
print(f" Immutable Host Fingerprint: {fingerprint}\n")
def get_attack_statistics(self):
if not self.attack_log:
return
print("\n" + "="*80)
print(" COMPREHENSIVE CYBER ATTACK METRICS REPORT")
print("="*80)
all_attack_types = []
for log in self.attack_log:
all_attack_types.extend(log['attacks'])
attack_counts = Counter(all_attack_types)
print(f" Total Exploit Signatures Intercepted: {len(self.attack_log)}")
print(f" Total Malicious Intruders Contained: {len(self.blocked_ips)}")
print(f"\n Core Threat Distribution Chart:")
for attack_type, count in attack_counts.most_common():
attack_name = self.attacks_db[attack_type]['name']
percentage = (count / len(all_attack_types)) * 100
bar = "█" * int(percentage / 2)
print(f" {attack_name}: {count} ({percentage:.1f}%) {bar}")
print("\n" + "="*80)
def main():
detector = CyberAttackDetector()
test_cases = [
{'payload': "SELECT * FROM users WHERE username='admin' OR '1'='1' --", 'ip': '192.168.1.100', 'method': 'POST'},
{'payload': "<script>alert('XSS!')</script>", 'ip': '10.0.0.50', 'method': 'GET'},
{'payload': "../../etc/passwd", 'ip': '172.16.0.20', 'method': 'GET'},
{'payload': "normal_user_profile_data_field", 'ip': '192.168.1.200', 'method': 'GET'}
]
print(" Simulating Live Production Traffic Stream...\n")
for test in test_cases:
detector.analyze_payload(test['payload'], test['ip'], test['method'])
time.sleep(0.1)
detector.get_attack_statistics()
if __name__ == "__main__":
main()
Breaking Down the Mitigation: Secure Coding Best Practices
This framework provides an excellent visual aid for developers to see exactly why defensive coding principles are necessary. Here are the core defensive takeaways for developers review:
Protecting Against SQL Injection (SQLi)
- The Vulnerability: Attackers insert parameters like ' OR '1'='1 to fool the SQL parser into returning unauthorized database records.
- The Secure Solution: Never concatenate input directly into database queries. Always use Parameterized Queries (Prepared Statements).
Preventing Cross-Site Scripting (XSS)
- The Vulnerability: Malicious payloads are stored or reflected back to other users, compromising their session tokens.
- The Secure Solution: Implement strict Context-Aware Output Encoding (escaping tags into HTML entities) and declare a robust Content Security Policy (CSP) header.
Blocking Path Traversal
- The Vulnerability: Input structures containing ../ allow an attacker to bypass file path restrictions and extract configuration files or passwords directly from the server OS.
- The Secure Solution: Use built-in framework utility functions to validate file paths against a strict whitelist directory, ensuring user input never directly names files on disk.
Conclusion
Integrating basic security logic inside your codebase dramatically reduces your attack surface before traffic ever reaches deeper layers of your system. Build defensively, sanitize aggressively, and never trust raw user input!