!/usr/bin/env python3
"""
♂️ DIGITAL GENIE PROJECT GUARDIAN ♂️
Advanced Multi-Layer Security Protection System
Inspired by Indian Cyber Security Excellence
Author: Digital Genie Security Team
Version: 2.0 Professional Edition
"""
import os
import sys
import hashlib
import json
import logging
import sqlite3
import requests
import subprocess
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
import psutil
import socket
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import secrets
import re
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
Beautiful Indian-Style ASCII Art
BANNER = """
╔══════════════════════════════════════════════════════════════════╗
║ डिजिटल जिनी प्रोजेक्ट गार्जियन ║
║ ║
║ ♂️ "हम आपके प्रोजेक्ट को सुरक्षित रखते हैं" ♂️ ║
║ ║
║ ⚡ Multi-Layer Security System ⚡ ║
║ ️ Real-time Threat Detection ️ ║
║ Advanced Encryption Protection ║
║ Intelligent Monitoring System ║
╚══════════════════════════════════════════════════════════════════╝
"""
class DigitalGenieGuardian:
"""
️ Main Security Guardian Class
Implementing Indian-style robust security architecture
"""
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.config_dir = self.project_path / ".genie_guard"
self.log_file = self.config_dir / "security.log"
self.db_file = self.config_dir / "security.db"
self.key_file = self.config_dir / ".security_key"
# ️ Initialize Security Infrastructure
self._setup_security_infrastructure()
self._setup_logging()
self._setup_database()
self._load_or_generate_encryption_key()
# Security Metrics
self.threat_count = 0
self.scan_count = 0
self.protection_status = "ACTIVE"
print(BANNER)
self.log_security_event(" Digital Genie Guardian ACTIVATED!", "INFO")
def _setup_security_infrastructure(self):
"""️ Setup secure directory structure"""
try:
self.config_dir.mkdir(exist_ok=True, mode=0o700)
# Create hidden security files
(self.config_dir / ".gitkeep").touch()
self.log_security_event("✅ Security infrastructure initialized", "INFO")
except Exception as e:
print(f"❌ Failed to setup infrastructure: {e}")
def _setup_logging(self):
""" Setup advanced logging system"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)s | ️ %(message)s',
handlers=[
logging.FileHandler(self.log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger('DigitalGuardian')
def _setup_database(self):
"""️ Setup security events database"""
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# Create security events table
cursor.execute('''
CREATE TABLE IF NOT EXISTS security_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
event_type TEXT NOT NULL,
severity TEXT NOT NULL,
description TEXT NOT NULL,
file_path TEXT,
threat_level INTEGER DEFAULT 0,
action_taken TEXT,
resolved BOOLEAN DEFAULT FALSE
)
''')
# Create file integrity table
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_integrity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT UNIQUE NOT NULL,
file_hash TEXT NOT NULL,
last_modified DATETIME DEFAULT CURRENT_TIMESTAMP,
size INTEGER,
permissions TEXT
)
''')
conn.commit()
conn.close()
self.log_security_event("✅ Security database initialized", "INFO")
except Exception as e:
self.log_security_event(f"❌ Database setup failed: {e}", "ERROR")
def _load_or_generate_encryption_key(self):
""" Load or generate encryption key"""
try:
if self.key_file.exists():
with open(self.key_file, 'rb') as f:
self.encryption_key = f.read()
else:
# Generate new key
password = f"DigitalGenie_{secrets.token_hex(16)}".encode()
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
# Save key securely
with open(self.key_file, 'wb') as f:
f.write(salt + key)
os.chmod(self.key_file, 0o600)
self.encryption_key = key
self.cipher_suite = Fernet(self.encryption_key)
self.log_security_event("✅ Encryption system ready", "INFO")
except Exception as e:
self.log_security_event(f"❌ Encryption setup failed: {e}", "ERROR")
def log_security_event(self, message: str, level: str = "INFO",
file_path: str = None, threat_level: int = 0):
""" Log security events with Indian-style formatting"""
# Add beautiful emojis based on level
emoji_map = {
"INFO": "ℹ️",
"WARNING": "⚠️",
"ERROR": "❌",
"CRITICAL": "",
"SUCCESS": "✅"
}
formatted_message = f"{emoji_map.get(level, '')} {message}"
if hasattr(self, 'logger'):
getattr(self.logger, level.lower(), self.logger.info)(formatted_message)
# Store in database
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO security_events
(event_type, severity, description, file_path, threat_level)
VALUES (?, ?, ?, ?, ?)
''', ("SECURITY_LOG", level, message, file_path, threat_level))
conn.commit()
conn.close()
except Exception as e:
print(f"❌ Failed to log to database: {e}")
def calculate_file_hash(self, file_path: Path) -> str:
""" Calculate SHA-256 hash of file"""
try:
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
except Exception as e:
self.log_security_event(f"❌ Hash calculation failed for {file_path}: {e}", "ERROR")
return ""
def scan_for_malicious_patterns(self, file_path: Path) -> list:
""" Advanced malware pattern detection"""
suspicious_patterns = [
# Common malware indicators
rb'eval\s*\(',
rb'exec\s*\(',
rb'__import__\s*\(',
rb'subprocess\.',
rb'os\.system',
rb'shell=True',
# Crypto mining indicators
rb'stratum\+tcp://',
rb'mining\.pool',
rb'cryptonight',
# Network suspicious patterns
rb'socket\.socket',
rb'urllib\.request',
rb'requests\.get.*http://\d+\.\d+\.\d+\.\d+',
# File system threats
rb'shutil\.rmtree',
rb'os\.remove',
rb'\.encode\(\s*[\'"]base64[\'"]\s*\)',
]
threats_found = []
try:
if file_path.suffix.lower() in ['.py', '.js', '.sh', '.bat', '.ps1']:
with open(file_path, 'rb') as f:
content = f.read()
for pattern in suspicious_patterns:
if re.search(pattern, content, re.IGNORECASE):
threats_found.append({
'pattern': pattern.decode('utf-8', errors='ignore'),
'file': str(file_path),
'severity': 'HIGH'
})
except Exception as e:
self.log_security_event(f"❌ Pattern scan failed for {file_path}: {e}", "ERROR")
return threats_found
def check_file_integrity(self, file_path: Path) -> bool:
""" Check file integrity against database"""
try:
current_hash = self.calculate_file_hash(file_path)
current_size = file_path.stat().st_size
current_perms = oct(file_path.stat().st_mode)[-3:]
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
SELECT file_hash, size, permissions FROM file_integrity
WHERE file_path = ?
''', (str(file_path),))
result = cursor.fetchone()
if result:
stored_hash, stored_size, stored_perms = result
if current_hash != stored_hash:
self.log_security_event(
f" File integrity violation detected: {file_path}",
"CRITICAL", str(file_path), threat_level=9
)
self.threat_count += 1
return False
else:
# New file - add to database
cursor.execute('''
INSERT OR REPLACE INTO file_integrity
(file_path, file_hash, size, permissions)
VALUES (?, ?, ?, ?)
''', (str(file_path), current_hash, current_size, current_perms))
self.log_security_event(f"✅ New file registered: {file_path.name}", "INFO")
conn.commit()
conn.close()
return True
except Exception as e:
self.log_security_event(f"❌ Integrity check failed for {file_path}: {e}", "ERROR")
return False
def perform_deep_security_scan(self) -> dict:
""" Comprehensive security scan with Indian precision"""
self.log_security_event(" Starting comprehensive security scan...", "INFO")
scan_start_time = time.time()
scan_results = {
'files_scanned': 0,
'threats_found': [],
'integrity_violations': 0,
'suspicious_files': [],
'scan_duration': 0,
'recommendations': []
}
# Scan all project files
for file_path in self.project_path.rglob('*'):
if file_path.is_file() and not str(file_path).startswith(str(self.config_dir)):
scan_results['files_scanned'] += 1
# Check file integrity
if not self.check_file_integrity(file_path):
scan_results['integrity_violations'] += 1
# Scan for malicious patterns
threats = self.scan_for_malicious_patterns(file_path)
if threats:
scan_results['threats_found'].extend(threats)
scan_results['suspicious_files'].append(str(file_path))
# Progress indicator (Indian style!)
if scan_results['files_scanned'] % 50 == 0:
print(f" स्कैन प्रगति: {scan_results['files_scanned']} files scanned...")
scan_results['scan_duration'] = round(time.time() - scan_start_time, 2)
self.scan_count += 1
# Generate recommendations
if scan_results['threats_found']:
scan_results['recommendations'].append(" Immediate action required: Malicious patterns detected!")
scan_results['recommendations'].append(" Run quarantine procedure for suspicious files")
scan_results['recommendations'].append("️ Enable real-time monitoring")
if scan_results['integrity_violations'] > 0:
scan_results['recommendations'].append(" Review file integrity violations")
scan_results['recommendations'].append(" Update authorized file signatures")
self.log_security_event(
f"✅ Security scan completed: {scan_results['files_scanned']} files, "
f"{len(scan_results['threats_found'])} threats found",
"SUCCESS"
)
return scan_results
def quarantine_suspicious_file(self, file_path: str):
""" Quarantine suspicious files"""
try:
quarantine_dir = self.config_dir / "quarantine"
quarantine_dir.mkdir(exist_ok=True, mode=0o700)
source_file = Path(file_path)
quarantine_file = quarantine_dir / f"{source_file.name}.quarantined"
# Encrypt and move to quarantine
with open(source_file, 'rb') as f:
encrypted_data = self.cipher_suite.encrypt(f.read())
with open(quarantine_file, 'wb') as f:
f.write(encrypted_data)
# Remove original
source_file.unlink()
self.log_security_event(
f" File quarantined successfully: {file_path}",
"SUCCESS", file_path, threat_level=8
)
except Exception as e:
self.log_security_event(f"❌ Quarantine failed for {file_path}: {e}", "ERROR")
def setup_real_time_monitoring(self):
"""️ Setup real-time file system monitoring"""
class SecurityEventHandler(FileSystemEventHandler):
def __init__(self, guardian):
self.guardian = guardian
def on_modified(self, event):
if not event.is_directory and not str(event.src_path).startswith(str(self.guardian.config_dir)):
self.guardian.log_security_event(
f" File modified: {event.src_path}", "INFO", event.src_path
)
# Quick security check
file_path = Path(event.src_path)
threats = self.guardian.scan_for_malicious_patterns(file_path)
if threats:
self.guardian.log_security_event(
f" Threat detected in modified file: {event.src_path}",
"CRITICAL", event.src_path, threat_level=9
)
def on_created(self, event):
if not event.is_directory:
self.guardian.log_security_event(
f"➕ New file created: {event.src_path}", "INFO", event.src_path
)
try:
event_handler = SecurityEventHandler(self)
observer = Observer()
observer.schedule(event_handler, str(self.project_path), recursive=True)
observer.start()
self.log_security_event("️ Real-time monitoring activated!", "SUCCESS")
return observer
except Exception as e:
self.log_security_event(f"❌ Monitoring setup failed: {e}", "ERROR")
return None
def generate_security_report(self) -> str:
""" Generate comprehensive security report"""
try:
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# Get security statistics
cursor.execute('SELECT COUNT(*) FROM security_events WHERE severity = "CRITICAL"')
critical_events = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM security_events WHERE severity = "WARNING"')
warning_events = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM file_integrity')
monitored_files = cursor.fetchone()[0]
cursor.execute('''
SELECT event_type, severity, description, timestamp
FROM security_events
ORDER BY timestamp DESC LIMIT 10
''')
recent_events = cursor.fetchall()
conn.close()
# Generate beautiful report
report = f"""
╔══════════════════════════════════════════════════════════════════╗
║ ️ SECURITY STATUS REPORT ️ ║
╠══════════════════════════════════════════════════════════════════╣
║ ║
║ Project: {self.project_path.name} ║
║ Report Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ║
║ Guardian Status: {self.protection_status} ║
║ ║
╠══════════════════════════════════════════════════════════════════╣
║ SECURITY METRICS ║
╠══════════════════════════════════════════════════════════════════╣
║ ║
║ Critical Events: {critical_events:>6} ║
║ ⚠️ Warning Events: {warning_events:>6} ║
║ Monitored Files: {monitored_files:>6} ║
║ Total Scans: {self.scan_count:>10} ║
║ ️ Threats Blocked: {self.threat_count:>8} ║
║ ║
╚══════════════════════════════════════════════════════════════════╝
RECENT SECURITY EVENTS:
"""
for event in recent_events:
event_type, severity, description, timestamp = event
emoji = {"CRITICAL": "", "WARNING": "⚠️", "INFO": "ℹ️", "SUCCESS": "✅"}.get(severity, "")
report += f" {emoji} [{timestamp}] {description}\n"
report += f"""
SECURITY RECOMMENDATIONS:
✅ Keep running regular security scans
Update security signatures weekly
️ Monitor real-time alerts actively
Review quarantined files periodically
Maintain security event logs
♂️ "आपका प्रोजेक्ट सुरक्षित है - Digital Genie Guardian"
"""
return report
except Exception as e:
self.log_security_event(f"❌ Report generation failed: {e}", "ERROR")
return "❌ Could not generate security report"
def backup_project_securely(self):
""" Create encrypted backup of project"""
try:
backup_dir = self.config_dir / "backups"