DIGITAL GENIE PROJECT GUARDIAN

DIGITAL GENIE PROJECT GUARDIAN

BackerLeader posted 7 min read

!/usr/bin/env python3

"""

♂️ DIGITAL GENIE PROJECT GUARDIAN ♂️

Advanced Multi-Layer Security Protection System
Inspired by Indian Cyber Security Excellence
Author: Digital Genie Security Team
Version: 2.0 Professional Edition
"""

import os
import sys
import hashlib
import json
import logging
import sqlite3
import requests
import subprocess
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
import psutil
import socket
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import secrets
import re
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

Beautiful Indian-Style ASCII Art

BANNER = """
╔══════════════════════════════════════════════════════════════════╗
║ डिजिटल जिनी प्रोजेक्ट गार्जियन ║
║ ║
║ ♂️ "हम आपके प्रोजेक्ट को सुरक्षित रखते हैं" ♂️ ║
║ ║
║ ⚡ Multi-Layer Security System ⚡ ║
║ ️ Real-time Threat Detection ️ ║
║ Advanced Encryption Protection ║
║ Intelligent Monitoring System ║
╚══════════════════════════════════════════════════════════════════╝
"""

class DigitalGenieGuardian:

"""
️ Main Security Guardian Class
Implementing Indian-style robust security architecture
"""

def __init__(self, project_path: str):
    self.project_path = Path(project_path)
    self.config_dir = self.project_path / ".genie_guard"
    self.log_file = self.config_dir / "security.log"
    self.db_file = self.config_dir / "security.db"
    self.key_file = self.config_dir / ".security_key"
    
    # ️ Initialize Security Infrastructure
    self._setup_security_infrastructure()
    self._setup_logging()
    self._setup_database()
    self._load_or_generate_encryption_key()
    
    #  Security Metrics
    self.threat_count = 0
    self.scan_count = 0
    self.protection_status = "ACTIVE"
    
    print(BANNER)
    self.log_security_event(" Digital Genie Guardian ACTIVATED!", "INFO")

def _setup_security_infrastructure(self):
    """️ Setup secure directory structure"""
    try:
        self.config_dir.mkdir(exist_ok=True, mode=0o700)
        # Create hidden security files
        (self.config_dir / ".gitkeep").touch()
        self.log_security_event("✅ Security infrastructure initialized", "INFO")
    except Exception as e:
        print(f"❌ Failed to setup infrastructure: {e}")

def _setup_logging(self):
    """ Setup advanced logging system"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s | %(levelname)s | ️ %(message)s',
        handlers=[
            logging.FileHandler(self.log_file, encoding='utf-8'),
            logging.StreamHandler(sys.stdout)
        ]
    )
    self.logger = logging.getLogger('DigitalGuardian')

def _setup_database(self):
    """️ Setup security events database"""
    try:
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        # Create security events table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS security_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                event_type TEXT NOT NULL,
                severity TEXT NOT NULL,
                description TEXT NOT NULL,
                file_path TEXT,
                threat_level INTEGER DEFAULT 0,
                action_taken TEXT,
                resolved BOOLEAN DEFAULT FALSE
            )
        ''')
        
        # Create file integrity table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS file_integrity (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                file_path TEXT UNIQUE NOT NULL,
                file_hash TEXT NOT NULL,
                last_modified DATETIME DEFAULT CURRENT_TIMESTAMP,
                size INTEGER,
                permissions TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
        self.log_security_event("✅ Security database initialized", "INFO")
    except Exception as e:
        self.log_security_event(f"❌ Database setup failed: {e}", "ERROR")

def _load_or_generate_encryption_key(self):
    """ Load or generate encryption key"""
    try:
        if self.key_file.exists():
            with open(self.key_file, 'rb') as f:
                self.encryption_key = f.read()
        else:
            # Generate new key
            password = f"DigitalGenie_{secrets.token_hex(16)}".encode()
            salt = os.urandom(16)
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=32,
                salt=salt,
                iterations=100000,
            )
            key = base64.urlsafe_b64encode(kdf.derive(password))
            
            # Save key securely
            with open(self.key_file, 'wb') as f:
                f.write(salt + key)
            os.chmod(self.key_file, 0o600)
            self.encryption_key = key
            
        self.cipher_suite = Fernet(self.encryption_key)
        self.log_security_event("✅ Encryption system ready", "INFO")
    except Exception as e:
        self.log_security_event(f"❌ Encryption setup failed: {e}", "ERROR")

def log_security_event(self, message: str, level: str = "INFO", 
                      file_path: str = None, threat_level: int = 0):
    """ Log security events with Indian-style formatting"""
    
    # Add beautiful emojis based on level
    emoji_map = {
        "INFO": "ℹ️",
        "WARNING": "⚠️",
        "ERROR": "❌",
        "CRITICAL": "",
        "SUCCESS": "✅"
    }
    
    formatted_message = f"{emoji_map.get(level, '')} {message}"
    
    if hasattr(self, 'logger'):
        getattr(self.logger, level.lower(), self.logger.info)(formatted_message)
    
    # Store in database
    try:
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO security_events 
            (event_type, severity, description, file_path, threat_level)
            VALUES (?, ?, ?, ?, ?)
        ''', ("SECURITY_LOG", level, message, file_path, threat_level))
        conn.commit()
        conn.close()
    except Exception as e:
        print(f"❌ Failed to log to database: {e}")

def calculate_file_hash(self, file_path: Path) -> str:
    """ Calculate SHA-256 hash of file"""
    try:
        hash_sha256 = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_sha256.update(chunk)
        return hash_sha256.hexdigest()
    except Exception as e:
        self.log_security_event(f"❌ Hash calculation failed for {file_path}: {e}", "ERROR")
        return ""

def scan_for_malicious_patterns(self, file_path: Path) -> list:
    """ Advanced malware pattern detection"""
    suspicious_patterns = [
        #  Common malware indicators
        rb'eval\s*\(',
        rb'exec\s*\(',
        rb'__import__\s*\(',
        rb'subprocess\.',
        rb'os\.system',
        rb'shell=True',
        
        #  Crypto mining indicators
        rb'stratum\+tcp://',
        rb'mining\.pool',
        rb'cryptonight',
        
        #  Network suspicious patterns
        rb'socket\.socket',
        rb'urllib\.request',
        rb'requests\.get.*http://\d+\.\d+\.\d+\.\d+',
        
        #  File system threats
        rb'shutil\.rmtree',
        rb'os\.remove',
        rb'\.encode\(\s*[\'"]base64[\'"]\s*\)',
    ]
    
    threats_found = []
    
    try:
        if file_path.suffix.lower() in ['.py', '.js', '.sh', '.bat', '.ps1']:
            with open(file_path, 'rb') as f:
                content = f.read()
                
            for pattern in suspicious_patterns:
                if re.search(pattern, content, re.IGNORECASE):
                    threats_found.append({
                        'pattern': pattern.decode('utf-8', errors='ignore'),
                        'file': str(file_path),
                        'severity': 'HIGH'
                    })
    except Exception as e:
        self.log_security_event(f"❌ Pattern scan failed for {file_path}: {e}", "ERROR")
    
    return threats_found

def check_file_integrity(self, file_path: Path) -> bool:
    """ Check file integrity against database"""
    try:
        current_hash = self.calculate_file_hash(file_path)
        current_size = file_path.stat().st_size
        current_perms = oct(file_path.stat().st_mode)[-3:]
        
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT file_hash, size, permissions FROM file_integrity 
            WHERE file_path = ?
        ''', (str(file_path),))
        
        result = cursor.fetchone()
        
        if result:
            stored_hash, stored_size, stored_perms = result
            if current_hash != stored_hash:
                self.log_security_event(
                    f" File integrity violation detected: {file_path}", 
                    "CRITICAL", str(file_path), threat_level=9
                )
                self.threat_count += 1
                return False
        else:
            # New file - add to database
            cursor.execute('''
                INSERT OR REPLACE INTO file_integrity 
                (file_path, file_hash, size, permissions)
                VALUES (?, ?, ?, ?)
            ''', (str(file_path), current_hash, current_size, current_perms))
            
            self.log_security_event(f"✅ New file registered: {file_path.name}", "INFO")
        
        conn.commit()
        conn.close()
        return True
        
    except Exception as e:
        self.log_security_event(f"❌ Integrity check failed for {file_path}: {e}", "ERROR")
        return False

def perform_deep_security_scan(self) -> dict:
    """ Comprehensive security scan with Indian precision"""
    self.log_security_event(" Starting comprehensive security scan...", "INFO")
    scan_start_time = time.time()
    
    scan_results = {
        'files_scanned': 0,
        'threats_found': [],
        'integrity_violations': 0,
        'suspicious_files': [],
        'scan_duration': 0,
        'recommendations': []
    }
    
    #  Scan all project files
    for file_path in self.project_path.rglob('*'):
        if file_path.is_file() and not str(file_path).startswith(str(self.config_dir)):
            scan_results['files_scanned'] += 1
            
            #  Check file integrity
            if not self.check_file_integrity(file_path):
                scan_results['integrity_violations'] += 1
            
            #  Scan for malicious patterns
            threats = self.scan_for_malicious_patterns(file_path)
            if threats:
                scan_results['threats_found'].extend(threats)
                scan_results['suspicious_files'].append(str(file_path))
            
            #  Progress indicator (Indian style!)
            if scan_results['files_scanned'] % 50 == 0:
                print(f" स्कैन प्रगति: {scan_results['files_scanned']} files scanned...")
    
    scan_results['scan_duration'] = round(time.time() - scan_start_time, 2)
    self.scan_count += 1
    
    #  Generate recommendations
    if scan_results['threats_found']:
        scan_results['recommendations'].append(" Immediate action required: Malicious patterns detected!")
        scan_results['recommendations'].append(" Run quarantine procedure for suspicious files")
        scan_results['recommendations'].append("️ Enable real-time monitoring")
    
    if scan_results['integrity_violations'] > 0:
        scan_results['recommendations'].append(" Review file integrity violations")
        scan_results['recommendations'].append(" Update authorized file signatures")
    
    self.log_security_event(
        f"✅ Security scan completed: {scan_results['files_scanned']} files, "
        f"{len(scan_results['threats_found'])} threats found", 
        "SUCCESS"
    )
    
    return scan_results

def quarantine_suspicious_file(self, file_path: str):
    """ Quarantine suspicious files"""
    try:
        quarantine_dir = self.config_dir / "quarantine"
        quarantine_dir.mkdir(exist_ok=True, mode=0o700)
        
        source_file = Path(file_path)
        quarantine_file = quarantine_dir / f"{source_file.name}.quarantined"
        
        # Encrypt and move to quarantine
        with open(source_file, 'rb') as f:
            encrypted_data = self.cipher_suite.encrypt(f.read())
        
        with open(quarantine_file, 'wb') as f:
            f.write(encrypted_data)
        
        # Remove original
        source_file.unlink()
        
        self.log_security_event(
            f" File quarantined successfully: {file_path}", 
            "SUCCESS", file_path, threat_level=8
        )
        
    except Exception as e:
        self.log_security_event(f"❌ Quarantine failed for {file_path}: {e}", "ERROR")

def setup_real_time_monitoring(self):
    """️ Setup real-time file system monitoring"""
    
    class SecurityEventHandler(FileSystemEventHandler):
        def __init__(self, guardian):
            self.guardian = guardian
        
        def on_modified(self, event):
            if not event.is_directory and not str(event.src_path).startswith(str(self.guardian.config_dir)):
                self.guardian.log_security_event(
                    f" File modified: {event.src_path}", "INFO", event.src_path
                )
                
                # Quick security check
                file_path = Path(event.src_path)
                threats = self.guardian.scan_for_malicious_patterns(file_path)
                if threats:
                    self.guardian.log_security_event(
                        f" Threat detected in modified file: {event.src_path}", 
                        "CRITICAL", event.src_path, threat_level=9
                    )
        
        def on_created(self, event):
            if not event.is_directory:
                self.guardian.log_security_event(
                    f"➕ New file created: {event.src_path}", "INFO", event.src_path
                )
    
    try:
        event_handler = SecurityEventHandler(self)
        observer = Observer()
        observer.schedule(event_handler, str(self.project_path), recursive=True)
        observer.start()
        
        self.log_security_event("️ Real-time monitoring activated!", "SUCCESS")
        return observer
        
    except Exception as e:
        self.log_security_event(f"❌ Monitoring setup failed: {e}", "ERROR")
        return None

def generate_security_report(self) -> str:
    """ Generate comprehensive security report"""
    try:
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        # Get security statistics
        cursor.execute('SELECT COUNT(*) FROM security_events WHERE severity = "CRITICAL"')
        critical_events = cursor.fetchone()[0]
        
        cursor.execute('SELECT COUNT(*) FROM security_events WHERE severity = "WARNING"')
        warning_events = cursor.fetchone()[0]
        
        cursor.execute('SELECT COUNT(*) FROM file_integrity')
        monitored_files = cursor.fetchone()[0]
        
        cursor.execute('''
            SELECT event_type, severity, description, timestamp 
            FROM security_events 
            ORDER BY timestamp DESC LIMIT 10
        ''')
        recent_events = cursor.fetchall()
        
        conn.close()
        
        #  Generate beautiful report
        report = f"""

╔══════════════════════════════════════════════════════════════════╗
║ ️ SECURITY STATUS REPORT ️ ║
╠══════════════════════════════════════════════════════════════════╣
║ ║
║ Project: {self.project_path.name} ║
║ Report Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ║
║ Guardian Status: {self.protection_status} ║
║ ║
╠══════════════════════════════════════════════════════════════════╣
║ SECURITY METRICS ║
╠══════════════════════════════════════════════════════════════════╣
║ ║
║ Critical Events: {critical_events:>6} ║
║ ⚠️ Warning Events: {warning_events:>6} ║
║ Monitored Files: {monitored_files:>6} ║
║ Total Scans: {self.scan_count:>10} ║
║ ️ Threats Blocked: {self.threat_count:>8} ║
║ ║
╚══════════════════════════════════════════════════════════════════╝

RECENT SECURITY EVENTS:
"""

        for event in recent_events:
            event_type, severity, description, timestamp = event
            emoji = {"CRITICAL": "", "WARNING": "⚠️", "INFO": "ℹ️", "SUCCESS": "✅"}.get(severity, "")
            report += f"  {emoji} [{timestamp}] {description}\n"
        
        report += f"""

SECURITY RECOMMENDATIONS:
✅ Keep running regular security scans
Update security signatures weekly
️ Monitor real-time alerts actively
Review quarantined files periodically
Maintain security event logs

♂️ "आपका प्रोजेक्ट सुरक्षित है - Digital Genie Guardian"
"""

        return report
        
    except Exception as e:
        self.log_security_event(f"❌ Report generation failed: {e}", "ERROR")
        return "❌ Could not generate security report"

def backup_project_securely(self):
    """ Create encrypted backup of project"""
    try:
        backup_dir = self.config_dir / "backups"

More Posts

The Audit Trail of Things: Using Hashgraph as a Digital Caliper for Provenance

Ken W. Algerverified - Apr 28

The 2026 Guide to AI Video Watermark Persistence: Protecting Digital Provenance

Dr Santu Roy - May 23
chevron_left

Related Jobs

View all jobs →

Commenters (This Week)

2 comments
1 comment
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!