# Example: Logging with synced timestamps import datetime print(f"Log Time: {datetime.datetime.utcnow().isoformat()} - User Login") # Ensures consistent timestamps in logs for correlation.
# Simulate checking unnecessary service services = ["Telnet", "RDP"] if "Telnet" in services: print("Disable Telnet to harden OS") # Disabling insecure legacy services is key to OS security.
# Example: Alert if container lacks resource limits container_config = {"memory": None} if not container_config["memory"]: print("Set memory limits to prevent abuse") # Protects containers from resource exhaustion.
# Example: enforce zero trust policy user_authenticated = False if not user_authenticated: print("Access denied: Re-authentication required") # Zero trust denies by default.
# Example: enforcing MFA login_success = True mfa_passed = False if login_success and not mfa_passed: print("MFA required before granting access") # MFA reduces unauthorized access risk.
# Example: verify certificate expiration cert_expiry_days = 10 if cert_expiry_days < 30: print("Renew certificate to maintain secure communications") # Prevents SSL expiration issues.
# Example: flagging credit card numbers message = "My card is 4111-1111-1111-1111" if "4111" in message: print("DLP Alert: Potential CHD detected") # Detects and flags sensitive data.
# Simulate SIEM alert logic alert = {"source": "firewall", "severity": "high"} if alert["severity"] == "high": print("SOAR: Auto-isolate endpoint") # SOAR enables automated defense actions.
# Example: EDR behavior alert process_behavior = "keylogger" if process_behavior == "keylogger": print("EDR Alert: Keylogger behavior detected") # Detects malicious endpoint behavior.
# Simulate security dashboard summary alerts = ["EDR", "Firewall", "SIEM"] print(f"Dashboard summary: {len(alerts)} alerts from multiple sources") # Centralized alert overview.
# Example: detect beaconing intervals intervals = [60, 61, 60, 62] if all(i in range(58, 63) for i in intervals): print("Suspicious beaconing pattern detected") # Regular outbound patterns raise alerts.
# Example: detect non-standard port use port = 8081 if port not in [80, 443]: print("Unusual port usage detected") # Flagging rogue port activity.
# Example: detect lateral connections connections = ["192.168.1.5", "192.168.1.6"] if len(connections) > 1: print("Lateral movement suspected") # Multiple internal hops may imply compromise.
# Example: lookup domain reputation reputation_score = 2 # scale 0-10 if reputation_score < 3: print("Malicious domain flagged") # Threat intel helps correlate DNS activity.
# Example: flag abnormal data size mb_sent = 1500 if mb_sent > 1000: print("Potential exfiltration detected") # Data egress alert for review.
# Example: alert on high CPU usage cpu_usage = 92 if cpu_usage > 90: print("High CPU usage detected") # Review top processes consuming CPU.
# Example: detect blacklisted process running_processes = ["explorer.exe", "malicious_tool.exe"] if "malicious_tool.exe" in running_processes: print("Unauthorized process running") # Flag unexpected binaries.
# Example: detect file modification modified_files = ["C:/Windows/System32/drivers/etc/hosts"] if "hosts" in modified_files[0]: print("Hosts file modified") # Investigate for malicious redirection.
# Example: detect unknown task scheduled_tasks = ["\Microsoft\Windows\UpdateTask", "\MaliciousScript"] if "\MaliciousScript" in scheduled_tasks: print("Suspicious scheduled task found") # Audit task scheduler entries.
# Example: detect large archive creation archive_size_gb = 5 if archive_size_gb > 1: print("Potential data exfiltration") # Check archive destination and upload logs.
# Example: detect suspicious account creation new_accounts = ["admin2"] if "admin2" in new_accounts: print("Unexpected admin account created") # Investigate account origin and privileges.
# Example: detect abnormal app connections app_connections = ["192.168.1.100", "203.0.113.45"] if "203.0.113.45" in app_connections: print("Suspicious external connection detected") # Log and alert unfamiliar destinations.
# Example: detect service failure services_status = {"WebApp": "Stopped"} if services_status["WebApp"] == "Stopped": print("Web application service disrupted") # Restart and review logs.
# Example: detect missing logs log_entries = ["Error 1", "Error 2"] if len(log_entries) < 10: print("Possible log tampering detected") # Validate log integrity.
# Example: Behavior baseline check normal_hours = range(8, 18) login_hour = 3 # Detected login at 3 AM if login_hour not in normal_hours: print("⚠️ Alert: Login outside of baseline hours") # Unusual behavior is detected and flagged.
# Example: Check SPF status from header email_header = "Received-SPF: fail" if "fail" in email_header: print("❌ SPF failed - possible spoofed email") # Failing SPF can indicate email spoofing.
import hashlib # Example: Generate SHA256 hash of a file with open("file.exe", "rb") as f: file_hash = hashlib.sha256(f.read()).hexdigest() print("File hash:", file_hash) # Use this hash to compare with threat intel databases.
# Example: Python automation to block IP malicious_ip = "10.10.10.10" def block_ip(ip): print(f"Blocking IP: {ip}") # In production, insert firewall rule here block_ip(malicious_ip) # Automates action based on detection logic.
import re log_entry = "Failed login from IP 192.168.1.100" match = re.search(r"\d+\.\d+\.\d+\.\d+", log_entry) if match: print("🔍 IP extracted:", match.group()) # Extracts the IP address from the log using regex.
# Example: detect impersonation keyword in emails email_content = "Urgent request from IT department" if "urgent" in email_content.lower() and "IT" in email_content: print("Possible social engineering detected") # Check sender domain and urgency flags.
# Example: header mismatch detection from_header = "support@secure.com" reply_to = "attack@fake.com" if from_header.split('@')[1] != reply_to.split('@')[1]: print("Email header mismatch detected - phishing risk") # Alert and quarantine the email.
# Example: detect base64 patterns url = "http://example.com/?q=aHR0cDovL3ZpY3RpbS5jb20=" if "aHR0c" in url: print("Obfuscated base64 payload detected") # Decode and review payload.
# Example: detect lookalike domains domain = "g00gle.com" if "google" in domain and "0" in domain: print("Possible spoofed domain detected") # Flag for investigation.
# Example: detect unexpected scripting types script_type = "Powershell" if script_type not in ["Bash", "Python", "Shell"]: print("Uncommon scripting usage - possible emerging threat") # Review source and execution context.
# Example: shell snippet using tcpdump # tcpdump -i eth0 port 443 -w capture.pcap # This captures HTTPS traffic on eth0 # Useful for post-event packet analysis.
# Example SIEM rule concept (pseudocode) if failed_logins > 5 and source_ip != whitelisted: alert("Brute force attempt from unknown IP") # Reduces false positives from allowed devices.
# Example: monitor suspicious process tree parent = "explorer.exe" child = "powershell.exe" if parent == "explorer.exe" and child == "powershell.exe": print("Suspicious endpoint process chain") # Investigate PowerShell activity.
# Example: VirusTotal lookup (pseudo) hash = "abcd1234" if vt_lookup(hash) == "malicious": print("File flagged as malware") # Verify signature and uploader metadata.
# Example: sandbox alert (pseudocode) if sandbox_report.contains("suspicious network call"): print("Malicious behavior detected in sandbox") # Block hash and alert SOC.
# No automated processes; mostly manual monitoring print("Security ops at ad hoc stage; focus on awareness and documentation")
# Example: Standard checklist for incident response def incident_checklist(): steps = ["Identify", "Contain", "Eradicate", "Recover"] for step in steps: print(f"Perform: {step}") incident_checklist() # Follows documented process steps for incidents
# Example: Role assignment in SOC soc_roles = {"Analyst": "Monitor alerts", "Responder": "Handle incidents"} print("SOC Roles:", soc_roles) # Defines clear security roles
# Calculate average MTTR from sample data mttr_values = [30, 45, 25, 60] average_mttr = sum(mttr_values) / len(mttr_values) print(f"Average MTTR: {average_mttr} minutes") # Uses metrics to improve performance
# Simulate automated incident response trigger incident_detected = True if incident_detected: print("Triggering automated containment playbook") # Automation reduces manual intervention
# Sample threat intel feed processing threat_feed = ["malicious_ip_1", "malicious_ip_2"] detected_ip = "malicious_ip_1" if detected_ip in threat_feed: print("Alert: IP flagged by threat intelligence") # Enhances detection accuracy
# Simulate IR playbook step def run_playbook(): steps = ["Detect", "Analyze", "Contain", "Eradicate", "Recover"] for step in steps: print(f"Executing: {step}") run_playbook() # Structured approach to incidents
# Example: Notify stakeholders def notify_team(message): print(f"Notify SOC team: {message}") notify_team("New phishing campaign detected") # Enables rapid collaboration
# Simple continuous monitor simulation import time def monitor(): print("Monitoring network traffic...") time.sleep(2) print("No anomalies detected") monitor() # Constant vigilance on network activity
# Sample dashboard data metrics = {"MTTD": 15, "MTTR": 45, "Alerts": 200} for key, value in metrics.items(): print(f"{key}: {value}") # Visualize SOC effectiveness
# Calculate MTTD example detection_times = [10, 20, 15, 12] # in minutes mttd = sum(detection_times) / len(detection_times) print(f"Mean Time to Detect: {mttd} minutes")
# Calculate MTTR example response_times = [30, 40, 25, 35] mttr = sum(response_times) / len(response_times) print(f"Mean Time to Respond: {mttr} minutes")
# Track alert count alerts = 500 print(f"Alerts generated today: {alerts}")
# Calculate false positive rate false_positives = 50 total_alerts = 500 false_positive_rate = (false_positives / total_alerts) * 100 print(f"False Positive Rate: {false_positive_rate}%")
# Calculate true positive rate true_positives = 450 tpr = (true_positives / total_alerts) * 100 print(f"True Positive Rate: {tpr}%")
# Incident count example incidents = 25 print(f"Confirmed incidents this month: {incidents}")
# Analyst efficiency calculation alerts_handled = 200 hours_worked = 8 efficiency = alerts_handled / hours_worked print(f"Alerts handled per hour: {efficiency}")
# Time to contain example containment_times = [20, 30, 15] avg_contain_time = sum(containment_times) / len(containment_times) print(f"Average Time to Contain: {avg_contain_time} minutes")
# Time to recovery example recovery_times = [40, 50, 35] avg_recovery_time = sum(recovery_times) / len(recovery_times) print(f"Average Time to Recovery: {avg_recovery_time} minutes")
# Alert triage time example triage_times = [5, 7, 6] avg_triage_time = sum(triage_times) / len(triage_times) print(f"Average Alert Triage Time: {avg_triage_time} minutes")
# Example KPI definition dictionary kpis = { "Incident Response Time": "Average time to resolve incidents", "Alert Accuracy": "Ratio of true alerts to total alerts" } print("Defined KPIs:", kpis)
# Example SLO for alert response slo_response_time = 15 # minutes print(f"SLO: Respond to alerts within {slo_response_time} minutes")
# Simulate KPI data collection alerts_handled = 200 incidents_resolved = 30 print(f"Alerts handled: {alerts_handled}, Incidents resolved: {incidents_resolved}")
# Simple print dashboard mockup dashboard = {"MTTR": 25, "MTTD": 12, "Alerts": 150} for metric, value in dashboard.items(): print(f"{metric}: {value}")
# SLA example sla = {"Response Time": "Less than 30 minutes"} print("SLA terms:", sla)
# Prioritize alerts example alerts = [{"id":1, "severity":"high"}, {"id":2, "severity":"low"}] high_priority = [a for a in alerts if a["severity"] == "high"] print("High priority alerts:", high_priority)
# SLA breach check response_time = 40 if response_time > 30: print("SLA breached: Investigate and improve process")
# Validate KPI data example def validate(value): if value < 0: return False return True print("Data valid?", validate(10))
# Simple trend calculation data = [20, 22, 25, 30] trend = data[-1] - data[0] print(f"Trend increase over period: {trend}")
# KPI review example def review_kpi(current, target): if current > target: print("KPI on track") else: print("KPI needs improvement") review_kpi(25, 20)
# Example role assignment soc_team = {"Manager": "Oversees SOC activities and strategy"} print("SOC Manager Role:", soc_team["Manager"])
# Level 1 analyst duties def level1_monitor(alert): if alert == "high": print("Escalate to Level 2") else: print("Monitor alert") level1_monitor("medium")
# Level 2 analyst investigation def investigate(incident): print(f"Investigating incident: {incident}") investigate("Suspicious login")
# Threat hunting example def threat_hunt(data): suspicious = [d for d in data if d == "anomaly"] print("Threat hunting results:", suspicious) threat_hunt(["normal", "anomaly", "normal"])
# Incident response simulation def respond(incident): print(f"Containing threat: {incident}") respond("Malware detected")
# Forensics example def analyze_evidence(evidence): print(f"Analyzing evidence: {evidence}") analyze_evidence("Disk image")
# Threat intel processing def process_intel(intel): print(f"Processing intel: {intel}") process_intel("New phishing campaign")
# SOC tool config def configure_tool(tool): print(f"Configuring tool: {tool}") configure_tool("SIEM")
# Compliance check def check_compliance(): print("Compliance with GDPR and PCI-DSS verified") check_compliance()
# Training session def train(topic): print(f"Training on: {topic}") train("Incident response procedures")
# Simulate false positive filtering alerts = ["true", "false", "false", "true"] true_alerts = [a for a in alerts if a == "true"] print(f"Filtered true alerts: {len(true_alerts)}")
# Basic context check simulation def evaluate_context(event): if "suspicious" in event: print("Needs human review") else: print("Auto-processed") evaluate_context("suspicious login attempt")
# Example complex decision def complex_incident(incident): print(f"Escalate complex incident: {incident}") complex_incident("Advanced persistent threat")
# Data validation def validate_data(data): if not data: print("Invalid data - manual review needed") else: print("Data accepted for automation") validate_data([])
# Maintenance alert print("Schedule weekly update and tuning of automation rules")
# Integration check example tools = ["SIEM", "SOAR", "EDR"] print(f"Integrating tools: {', '.join(tools)}")
# Warning message print("Balance automation with human oversight")
# Rule update reminder print("Review and update detection rules monthly")
# Privacy compliance check print("Ensure automation complies with GDPR and HIPAA")
# Cost-benefit analysis mockup cost = 10000 benefit = 15000 print("Automation ROI:", benefit - cost)
# PDCA cycle simulation steps = ["Plan", "Do", "Check", "Act"] for step in steps: print(f"Executing: {step}")
# Simple RCA example def root_cause(issue): causes = {"Malware": "Phishing email", "Breach": "Unpatched system"} return causes.get(issue, "Unknown cause") print(root_cause("Malware"))
# Feedback collection feedback = ["Improve alert clarity", "Faster incident response"] for f in feedback: print("Feedback:", f)
# Metrics review mockup kpis = {"MTTR": 30, "MTTD": 15} print("Reviewing KPIs:") for k, v in kpis.items(): print(f"{k}: {v} minutes")
# Training reminder print("Schedule monthly SOC skill development sessions")
# Automation tuning log print("Automation rules updated based on recent incident trends")
# Post-mortem template def post_mortem(incident): print(f"Analyzing incident: {incident}") print("Lessons learned and next steps") post_mortem("Ransomware attack")
# Doc update note print("Incident response playbook updated after latest attack")
# Tech assessment tools = ["SIEM", "SOAR", "EDR"] print("Evaluating tools:", ", ".join(tools))
# Threat feed simulation threats = ["New malware variant", "Phishing wave"] for threat in threats: print("Monitor threat:", threat)
# Example: simulate log aggregation from two sources logs_source1 = ['login_success', 'file_access'] logs_source2 = ['login_fail', 'network_connect'] all_logs = logs_source1 + logs_source2 print("Aggregated logs:", all_logs)
# Example: normalize log entries raw_logs = [{'src': '192.168.1.5', 'msg': 'Login success'}, {'source_ip': '192.168.1.6', 'message': 'Login fail'}] normalized_logs = [] for log in raw_logs: normalized = { 'ip': log.get('src') or log.get('source_ip'), 'event': log.get('msg') or log.get('message') } normalized_logs.append(normalized) print(normalized_logs)
# Example: alert on failed login attempts events = ['login_fail', 'login_fail', 'login_success'] fail_count = events.count('login_fail') if fail_count > 1: print("Alert: Multiple failed login attempts detected!")
# Example: correlate failed login followed by successful login events = [('login_fail', 'user1'), ('login_fail', 'user1'), ('login_success', 'user1')] fail_attempts = [e for e in events if e[0] == 'login_fail' and e[1] == 'user1'] success = any(e for e in events if e == ('login_success', 'user1')) if len(fail_attempts) > 1 and success: print("Potential brute force detected for user1")
# Example: simple log retention filter import datetime logs = [{'date': datetime.date(2024, 7, 1), 'event': 'login'}, {'date': datetime.date(2023, 1, 1), 'event': 'file_access'}] cutoff_date = datetime.date(2024, 1, 1) recent_logs = [log for log in logs if log['date'] >= cutoff_date] print("Logs retained:", recent_logs)
# Example: generate simple compliance report events = ['login', 'file_access', 'unauthorized_access'] compliance_events = [e for e in events if e != 'unauthorized_access'] print(f"Compliance events: {len(compliance_events)} out of {len(events)}")
# Example: find timeline of login events timeline = [('2024-07-20 10:00', 'login_fail'), ('2024-07-20 10:05', 'login_success')] for time, event in timeline: print(f"At {time}, event: {event}")
# Example: check IP against threat feed threat_ips = ['203.0.113.10'] event_ip = '203.0.113.10' if event_ip in threat_ips: print("Alert: Event from known malicious IP")
# Example: simulate simple log queue processing log_queue = ['event1', 'event2', 'event3'] while log_queue: event = log_queue.pop(0) print(f"Processing {event}")
# Example: basic anomaly score calculation events = [1, 1, 1, 5, 1] avg = sum(events)/len(events) for e in events: if e > avg * 2: print(f"Anomaly detected: event value {e} is unusually high")
# Example: simulate simple automation trigger event = "phishing_email_detected" if event == "phishing_email_detected": print("Trigger: Quarantine email and alert SOC team")
# Example: simple playbook steps def quarantine_endpoint(): print("Endpoint quarantined") def alert_team(): print("SOC team alerted") # Execute playbook quarantine_endpoint() alert_team()
# Example: prioritize incident by severity incidents = {'incident1': 3, 'incident2': 7} # severity scale 1-10 priority = max(incidents, key=incidents.get) print(f"Highest priority incident: {priority}")
# Example: pseudo API call to block IP def block_ip(ip): print(f"Blocking IP: {ip}") block_ip("192.168.1.100")
# Example: enrich alert with user info alert = {"ip": "203.0.113.10"} user_info = {"owner": "John Doe", "role": "Admin"} alert.update(user_info) print(alert)
# Example: automated password reset def reset_password(user): print(f"Password reset for {user}") reset_password("john.doe")
# Example: update case status case = {"id": 101, "status": "open"} case["status"] = "investigating" print(case)
# Example: send alert notification def notify_team(message): print(f"Notification sent: {message}") notify_team("Incident #101 requires immediate attention")
# Example: calculate average response time response_times = [5, 7, 4] avg_time = sum(response_times) / len(response_times) print(f"Average response time: {avg_time} minutes")
# Example: validate playbook steps before execution playbook_steps = ['validate_alert', 'quarantine_host', 'notify_team'] if 'quarantine_host' in playbook_steps: print("Playbook valid, ready to execute")
# Example: Simple alert detection script alert = "unauthorized_access" if alert == "unauthorized_access": print("Incident detected: Unauthorized access") # Alert triggers incident response
# Example: Classify incident severity based on type incident_type = "malware" severity_map = {"malware": 5, "phishing": 3, "dos": 4} severity = severity_map.get(incident_type, 1) print(f"Incident severity: {severity}") # Assign severity for prioritization
# Example: Lookup response action for incident type response_plans = {"malware": "isolate system", "phishing": "block sender"} incident = "malware" print(f"Response action: {response_plans[incident]}") # Plan guides remediation steps
# Example: Escalate if severity is high severity = 7 if severity > 5: print("Escalate incident to SOC Manager") # Timely escalation improves handling
# Example: Isolate a compromised host def isolate_host(host_id): print(f"Host {host_id} isolated from network") isolate_host("host123") # Immediate containment action
# Example: Remove malware from a system def clean_malware(system_id): print(f"Malware removed from {system_id}") clean_malware("host123") # System cleaned and ready for recovery
# Example: Document root cause of incident root_cause = "phishing email" print(f"Root cause identified: {root_cause}") # Essential for continuous improvement
# Example: Create incident report summary report = {"id": 101, "status": "closed", "details": "Malware outbreak contained"} print(report) # Documentation aids future preparedness
# Example: Schedule a security training session def schedule_training(topic): print(f"Training scheduled on: {topic}") schedule_training("Phishing Awareness") # Education empowers users
# Example: Update incident response playbook playbook = ["detect", "contain", "eradicate"] playbook.append("review") print(f"Updated playbook steps: {playbook}") # Continuous improvement cycle
# Example: Simple log monitor logs = ["login success", "failed login", "file accessed"] for log in logs: if "failed login" in log: print("Alert: Failed login detected") # Detect suspicious login attempts
# Example: Simulate log collection def collect_logs(agent_installed): if agent_installed: print("Logs collected via agent") else: print("Logs collected agentlessly") collect_logs(True) # Different collection methods
# Example: Log rotation simulation def rotate_logs(log_count): if log_count > 1000: print("Rotating logs to archive") else: print("Log size within limits") rotate_logs(1500) # Ensure logs don’t grow uncontrollably
# Example: Centralized log receipt central_logs = [] def receive_log(log): central_logs.append(log) print(f"Received log: {log}") receive_log("User login") # Aggregate logs centrally
# Example: Correlate multiple logs logs = ["failed login", "failed login", "account locked"] if logs.count("failed login") > 1 and "account locked" in logs: print("Multiple failed logins followed by lockout") # Detect brute-force attacks
# Example: Trigger alert on threshold failed_attempts = 5 if failed_attempts > 3: print("Alert: Multiple failed login attempts") # Prompt security team to investigate
# Example: Send event to SIEM event = {"type": "login", "status": "fail"} def send_to_siem(event): print(f"Event sent to SIEM: {event}") send_to_siem(event) # Centralized security event management
# Example: Monitor suspicious port usage port = 23 if port == 23: print("Alert: Telnet traffic detected, consider blocking") # Telnet is insecure and often blocked
# Example: Check for unauthorized process running_processes = ["chrome.exe", "unknown.exe"] if "unknown.exe" in running_processes: print("Alert: Unknown process running") # Endpoint security check
# Example: Update monitoring rules monitoring_rules = ["rule1", "rule2"] monitoring_rules.append("rule3") print(f"Updated rules: {monitoring_rules}") # Adapt to evolving threat landscape
# Example: Basic disaster recovery checklist disaster_plan = ["backup data", "restore servers", "test failover"] print("Disaster recovery plan steps:") for step in disaster_plan: print(f"- {step}") # Preparing recovery procedures
# Example: Identify critical business functions critical_functions = ["customer support", "order processing", "IT operations"] print(f"Critical functions to maintain: {critical_functions}") # Prioritize business continuity efforts
# Example: Perform incremental backup def backup(data, last_backup): if data != last_backup: print("Performing incremental backup") else: print("No changes detected") backup("file_v2", "file_v1") # Efficient backup approach
# Example: Check if recovery meets objectives rto = 2 # hours actual_recovery_time = 1.5 if actual_recovery_time <= rto: print("Recovery within RTO") else: print("Recovery delayed") # Measure recovery effectiveness
# Example: Switch to backup server def failover(active, backup): if not active: print("Failing over to backup server") else: print("Primary server operational") failover(False, True) # Maintain service continuity
# Example: Notify stakeholders def notify(message): print(f"Notification sent: {message}") notify("Data center outage detected, recovery underway") # Effective communication reduces confusion
# Example: Conduct recovery drill def conduct_drill(): print("Starting disaster recovery drill") print("All systems restored successfully") conduct_drill() # Validate readiness
# Example: Verify backup integrity backup_hash = "abc123" current_hash = "abc123" if backup_hash == current_hash: print("Backup data integrity verified") else: print("Backup data corrupted") # Prevent recovery issues
# Example: Check compliance status compliant = True if compliant: print("DR plan meets compliance requirements") else: print("Compliance gaps detected") # Avoid regulatory penalties
# Example: Update DR plan plan_steps = ["backup", "restore", "test"] plan_steps.append("review") print(f"Updated DR plan steps: {plan_steps}") # Keep plans current and effective
# Example: Basic active ping scan simulation import os def ping_host(ip): response = os.system(f"ping -c 1 {ip} > /dev/null 2>&1") if response == 0: print(f"{ip} is alive") else: print(f"{ip} is unreachable") ips = ["192.168.1.1", "192.168.1.2", "192.168.1.3"] for ip in ips: ping_host(ip) # Simple script to detect live hosts on a subnet
# Example: Simulate internal vs external scan def scan_scope(scope): if scope == "internal": print("Scanning internal network assets...") elif scope == "external": print("Scanning public-facing IPs...") else: print("Unknown scan scope") scan_scope("internal") scan_scope("external") # Different focus depending on scan type
# Example: Represent agent vs agentless scanning class Scanner: def __init__(self, method): self.method = method def scan(self, target): if self.method == "agent": print(f"Scanning {target} with installed agent for detailed data") elif self.method == "agentless": print(f"Scanning {target} remotely without agent") else: print("Unknown scanning method") scanner1 = Scanner("agent") scanner2 = Scanner("agentless") scanner1.scan("Host A") scanner2.scan("Host B") # Illustrates different scanning approaches
# Example: Check scanning type and output detail level def perform_scan(credentialed): if credentialed: print("Performing credentialed scan: deep system checks") else: print("Performing non-credentialed scan: external view only") perform_scan(True) perform_scan(False) # Shows differences in scan depth based on credentials
# Example: Simulating passive vs active scan behavior def scan_network(scan_type): if scan_type == "passive": print("Listening to network traffic quietly (passive scan).") elif scan_type == "active": print("Sending probes to devices (active scan).") else: print("Unknown scan type.") scan_network("passive") scan_network("active") # Demonstrates difference between passive and active scanning
# Example: Pseudocode illustrating static vs dynamic analysis def analyze_code(code, analysis_type): if analysis_type == "static": print("Analyzing code without execution (static analysis).") # Example: check for hardcoded passwords if "password" in code: print("Warning: Hardcoded password found.") elif analysis_type == "dynamic": print("Running code and monitoring behavior (dynamic analysis).") # Example: simulate runtime error detection try: exec(code) except Exception as e: print(f"Runtime error detected: {e}") sample_code = "print('Hello')\npassword = '1234'" analyze_code(sample_code, "static") analyze_code("print(1/0)", "dynamic") # Shows differences between static and dynamic code evaluation
# Example: Simulate critical infrastructure scan with caution flag def scan_critical_system(system_name, cautious=True): if cautious: print(f"Performing low-impact scan on {system_name} to avoid disruption.") else: print(f"Performing standard scan on {system_name}. Use with care!") scan_critical_system("Power Grid SCADA") scan_critical_system("Water Treatment System", cautious=False) # Highlights importance of careful scanning in critical infrastructure
# Example: Identify safe scan mode for OT systems def ot_scan_mode(system_type): if system_type in ["OT", "ICS", "SCADA"]: print(f"Using passive scan mode for {system_type} system to prevent disruptions.") else: print("Standard scan mode applicable.") ot_scan_mode("SCADA") ot_scan_mode("Corporate Network") # Emphasizes the importance of scan mode based on system type
# Example: Simple network device mapping simulation industrial_devices = { "PLC1": "192.168.10.10", "RTU2": "192.168.10.20", "HMI": "192.168.10.30" } for device, ip in industrial_devices.items(): print(f"Found device {device} at IP {ip}") # Lists discovered industrial control devices and their IPs
# Example: Compare current config to baseline baseline_config = {"firewall_enabled": True, "patch_level": "2024-07"} current_config = {"firewall_enabled": True, "patch_level": "2024-06"} for setting in baseline_config: if baseline_config[setting] != current_config.get(setting): print(f"Configuration drift detected for {setting}") else: print(f"{setting} is compliant with baseline") # Detects configuration differences affecting compliance
# Example: Basic segmentation check simulation def check_access(segment, data_sensitivity): if segment == "public" and data_sensitivity == "high": print("Warning: Sensitive data exposed on public segment!") else: print("Segmentation appropriate for data sensitivity.") check_access("public", "high") check_access("restricted", "high") # Highlights importance of proper segmentation for sensitive data
# Example: Compliance check pseudocode pci_requirements = {"firewall": True, "encryption": True} system_config = {"firewall": True, "encryption": False} for control in pci_requirements: if pci_requirements[control] != system_config.get(control): print(f"Non-compliant: {control} requirement not met") else: print(f"{control} compliant") # Checks system settings against PCI DSS requirements
# Example: Simulate tool selection logic tools = ["Nessus", "OpenVAS", "Qualys"] budget = 1000 for tool in tools: if tool == "OpenVAS": print(f"{tool} is open-source and free.") elif tool == "Nessus" and budget > 500: print(f"{tool} selected based on budget and features.") elif tool == "Qualys" and budget > 1000: print(f"{tool} is premium and chosen for enterprise use.") # Illustrates selecting vulnerability assessment tools based on budget and features
# Example: Basic scan run simulation def run_scan(tool): print(f"Starting scan using {tool}...") # Simulate scanning duration import time time.sleep(1) print(f"Scan with {tool} complete. Vulnerabilities found: 3") run_scan("Nessus") run_scan("OpenVAS") run_scan("Qualys") # Demonstrates simple scan execution for different tools
# Example: Simulate simple web scan process web_tools = ["Burp Suite", "ZAP", "Nikto"] for tool in web_tools: print(f"Launching {tool} for web vulnerability scan...") # Simulated scanning action print(f"{tool} scan completed. Issues detected: 5\n") # Illustrates workflow of web vulnerability scanners
# Example: Basic usage simulation for Nmap and Metasploit def tool_action(tool): if tool == "Nmap": print("Scanning network for live hosts and open ports...") elif tool == "Metasploit": print("Launching exploit module against target host...") tool_action("Nmap") tool_action("Metasploit") # Demonstrates common usage scenarios for multipurpose tools
# Example: Cloud tool usage overview cloud_tools = ["ScoutSuite", "Prowler", "Pacu"] for tool in cloud_tools: print(f"Running {tool} to assess AWS security posture...") # Highlights roles of cloud security tools in auditing and penetration testing
# Example: Pseudocode for reverse engineering steps def reverse_engineer(binary): print(f"Loading binary {binary} into debugger...") print("Setting breakpoints...") print("Analyzing function calls and memory usage...") print("Documenting findings for vulnerability assessment.") reverse_engineer("malware_sample.exe") # Illustrates basic reverse engineering workflow with debugger
# Example: Filter scan results for critical severity scan_results = [ {"id":1, "severity":"medium"}, {"id":2, "severity":"critical"}, {"id":3, "severity":"low"}, ] critical_findings = [r for r in scan_results if r["severity"] == "critical"] print(f"Critical issues found: {len(critical_findings)}") # Focuses remediation efforts on highest severity vulnerabilities
# Example: Identify repeated vulnerabilities across hosts scan_data = [ {"host":"host1", "vuln":"CVE-2021-1234"}, {"host":"host2", "vuln":"CVE-2021-1234"}, {"host":"host3", "vuln":"CVE-2022-5678"}, ] vuln_counts = {} for entry in scan_data: vuln = entry["vuln"] vuln_counts[vuln] = vuln_counts.get(vuln, 0) + 1 for vuln, count in vuln_counts.items(): print(f"{vuln} found on {count} hosts") # Highlights frequently occurring vulnerabilities for priority fixing
# Example: Validate scan results for false positives scan_results = [{"id":1, "vulnerable":True}, {"id":2, "vulnerable":False}] def validate_results(results): for res in results: if res["vulnerable"]: print(f"Vulnerability confirmed on ID {res['id']}") else: print(f"Potential false positive on ID {res['id']} - needs manual review") validate_results(scan_results) # Helps differentiate true vulnerabilities from false alerts
# Example: Prioritize by asset type scan_results = [ {"asset":"database", "vuln_score":7}, {"asset":"workstation", "vuln_score":5}, ] priority_order = {"database": 1, "workstation": 2} sorted_results = sorted(scan_results, key=lambda x: (priority_order[x["asset"]], -x["vuln_score"])) for r in sorted_results: print(f"Asset: {r['asset']}, Score: {r['vuln_score']}") # Sorts vulnerabilities prioritizing critical assets first
# Example: Link vulnerabilities to threats vulnerabilities = [{"id":1, "cve":"CVE-2023-1234"}] threat_db = {"CVE-2023-1234": "Active exploitation by ransomware"} for vuln in vulnerabilities: threat = threat_db.get(vuln["cve"], "No known threat") print(f"Vulnerability {vuln['cve']} linked to threat: {threat}") # Helps security teams understand real-world impact of vulnerabilities
# Example: Simple consolidation logic reports = [ {"id": 1, "vuln": "X", "severity": "high"}, {"id": 2, "vuln": "X", "severity": "high"}, {"id": 3, "vuln": "Y", "severity": "medium"}, ] unique_vulns = {} for r in reports: unique_vulns[r["vuln"]] = r for vuln, details in unique_vulns.items(): print(f"Vulnerability: {vuln}, Severity: {details['severity']}") # Removes duplicates to create concise reports
# Example: Prioritize vulnerabilities by CVSS score vulns = [{"name":"vuln1", "cvss":9.0}, {"name":"vuln2", "cvss":5.4}] sorted_vulns = sorted(vulns, key=lambda x: x["cvss"], reverse=True) for v in sorted_vulns: print(f"{v['name']} with CVSS score {v['cvss']}") # Sorts vulnerabilities from highest to lowest risk
# Example: Simplified CVSS vector interpretation cvss_vector = {"AC": "Low", "PR": "None", "UI": "None", "C": "High", "I": "High", "A": "High"} def interpret_vector(vec): if vec["AC"] == "Low" and vec["PR"] == "None": print("Easy to exploit vulnerability") if vec["C"] == "High" and vec["I"] == "High" and vec["A"] == "High": print("High impact on system security") interpret_vector(cvss_vector) # Helps assess exploitability and impact dimensions of CVSS
# Example: Assess exploitability and weaponization exploit_info = {"exploit_available": True, "complexity": "Low"} if exploit_info["exploit_available"] and exploit_info["complexity"] == "Low": print("High weaponization potential - urgent fix needed") else: print("Lower urgency based on exploitability") # Helps determine threat from active exploits
# Example: Calculate risk score with asset value vulnerability_score = 7.0 asset_value = {"database": 10, "workstation": 5} asset = "database" risk_score = vulnerability_score * asset_value.get(asset, 1) print(f"Risk score for {asset}: {risk_score}") # Combines vulnerability and asset importance for prioritization
# Example: Zero-day mitigation logic zero_day_detected = True if zero_day_detected: print("Apply workarounds and increase monitoring") else: print("Regular patching process applies") # Emphasizes quick action for unpatched critical vulnerabilities
# Example: Prioritize remediation by combined risk factors vulnerabilities = [ {"id":1, "cvss":9.0, "asset_value":10, "exploit":True}, {"id":2, "cvss":6.0, "asset_value":3, "exploit":False}, ] def risk_score(vuln): score = vuln["cvss"] * vuln["asset_value"] if vuln["exploit"]: score *= 1.5 return score sorted_vulns = sorted(vulnerabilities, key=risk_score, reverse=True) for v in sorted_vulns: print(f"Vuln ID {v['id']} prioritized with risk score: {risk_score(v):.2f}") # Ensures high-risk vulnerabilities fixed first based on multiple factors
# Example: firewall + antivirus + encryption layers # No direct code; architecture concept
# Example: VLAN config snippet (conceptual) # VLAN 10 for finance, VLAN 20 for dev
# RBAC example concept: role "admin" vs "user"
# IDS alert pseudocode if packet.is_malicious(): alert_admin() block_packet()
# Command line example: apt update && apt upgrade sudo apt update sudo apt upgrade -y
# Antivirus scan command example clamscan -r /home/user
# Example: disable ssh root login in sshd_config PermitRootLogin no
# Log monitoring with tail command tail -f /var/log/auth.log
# No code, this is organizational control
# Backup example using rsync rsync -av --delete /important/data /backup/location
# Example: Python regex to allow only alphanumeric input import re def validate_input(input_str): pattern = "^[a-zA-Z0-9]+$" return re.match(pattern, input_str) is not None print(validate_input(""; return <div>{name}</div>
# Example: avoid eval usage // Instead of eval(code), use safer alternatives
# Example: DOMPurify in JS to sanitize input const clean = DOMPurify.sanitize(userInput);
# Use automated tools like OWASP ZAP zap-cli quick-scan http://example.com
# HTTP header example X-XSS-Protection: 1; mode=block
# Unsafe C example prone to buffer overflow char buffer[10]; strcpy(buffer, "This string is way too long!");
# Example: malloc misuse in C (conceptual) char *ptr = malloc(10); strcpy(ptr, "Overflowing this buffer causes heap corruption");
# Classic stack overflow vulnerability in C void func() { char buffer[10]; gets(buffer); // unsafe function }
# Safer alternative example strncpy(buffer, input, sizeof(buffer)-1); buffer[sizeof(buffer)-1] = '\0';
# Enabled via OS configuration; no code snippet
# Compiler flag -fstack-protector enables this protection gcc -fstack-protector source.c -o program
# Enabled in OS or compiler; example config execstack -c program
# Compiler-level support, no direct code
# Modern malloc implementations provide protections
# Tools like Valgrind or AddressSanitizer clang -fsanitize=address source.c -o program
# Unsafe LDAP filter construction example (Python) user_input = "*)(&(objectClass=*))" ldap_filter = "(cn=" + user_input + ")" # This could manipulate the filter to return all entries
# Unsafe XML parsing (Python lxml example) user_input = "<!ENTITY xxe SYSTEM 'file:///etc/passwd'>" xml_data = "<data>" + user_input + "</data>" # Parsing this can cause sensitive file disclosure
# Vulnerable code example (Python Flask) filename = request.args.get('file') with open('/var/www/uploads/' + filename) as f: data = f.read() # If filename = "../../etc/passwd" attacker can read system files
# Secure SQL example using Python sqlite3 cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
# Example: sanitize filename to prevent directory traversal import os filename = os.path.basename(user_input)
# Example with SQLAlchemy ORM user = session.query(User).filter(User.name == user_input).first()
# WAF rule example (conceptual) if request.matches_pattern(".*('|\").*;.*--.*"): block_request()
# Logging example in Python if suspicious_input_detected: logger.warning(f"Possible injection attempt: {user_input}")
# Conceptual example: lack of permission check if user.is_authenticated(): show_sensitive_data() # Missing role or ownership checks allow unauthorized access
# Vulnerable Python code example import requests url = request.args.get('url') response = requests.get(url) # No validation on URL
# Basic URL validation example from urllib.parse import urlparse allowed_hosts = ['example.com'] url = urlparse(user_input_url) if url.hostname not in allowed_hosts: raise Exception("Blocked URL")
# Unsafe eval example in Python eval(user_input) # Extremely dangerous
# Example: use JSON parsing instead of eval import json data = json.loads(user_input)
# Example: check ownership before access if resource.owner_id != current_user.id: raise PermissionError("Access denied")
# Example: Django permission decorators from django.contrib.auth.decorators import login_required, permission_required @login_required @permission_required('app.view_resource') def view_resource(request): ...
# Logging example logger.info(f"Unauthorized access attempt by user {user.id}")
# Role assignment example user.assign_role('read_only')
# Use tools like OWASP ZAP or Burp Suite for testing
# SDLC phases: Requirements, Design, Implementation, Testing, Deployment, Maintenance
# Example: STRIDE methodology # Spoofing, Tampering, Repudiation, Information Disclosure, Denial of Service, Elevation of Privilege
# Use tools like GitHub pull requests with security checklists
# Example tool: Bandit for Python bandit -r project/
# Example: OWASP ZAP scan against running web app zap-cli quick-scan http://localhost:8000
# Example: Python safety check safety check
# Examples: input validation, output encoding, error handling
# Safe error response example try: ... except Exception: return "An error occurred. Please try again later."
# Example: mask sensitive info in logs logger.info(f"User login: {user.id}, IP: {request.ip}")
# GitHub Actions example with SAST name: Security Scan on: [push] jobs: bandit_scan: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run Bandit run: bandit -r .
# Example: run nmap script to detect vulns nmap --script vuln target.com
# Example scoring: CVSS (Common Vulnerability Scoring System) # CVSS score ranges from 0 to 10
# Schedule during maintenance windows to reduce downtime
# Use ITSM tools like Jira or ServiceNow
# Deploy patches first in staging environments
# Example: email or ticket updates
# Re-scan with vulnerability tools post-patch nmap --script vuln target.com
# Use centralized repositories or ticketing systems
# Continuous vulnerability scanning example with OpenVAS openvas-start
# Post-incident review meetings and reports
# Example: physical access logs when electronic badge system is down
# Formal risk acceptance forms or decisions
# Combining monitoring and manual processes
# E.g., manual reviews of access logs
# Physical escorting of visitors
# Enhanced audit trails
# Temporary SOP updates
# Awareness sessions
# Formal documentation and sign-offs
# Periodic audits and testing
# Lifecycle steps ensure thorough patch application
# Use staging environments for validation
# Night or weekend patch windows
# Example Puppet manifest to apply updates exec { 'update_system': command => '/usr/bin/apt-get update && /usr/bin/apt-get upgrade -y', }
# Keep backup images before patching
# Patch logs with timestamps and results
# Email alerts or ticket notifications
# Automated compliance reports
# Scan systems for patch status
# Lessons learned incorporated into next cycles
# Scheduled during low user activity to reduce disruption
# Email notifications or meetings before window
# Backup systems before changes
# Example: Sunday 2 AM - 4 AM maintenance
# Ticketing system updates for traceability
# Verify backup integrity before patching
# Status updates via email or dashboards
# Run test scripts to check service availability
# Maintenance logs and incident reports
# Post-mortem meetings for lessons learned
try: result = 10 / 0 except ZeroDivisionError: print("Handled division by zero error")
# Risk acceptance form example (pseudo) risk = {"description": "Legacy system vulnerability", "accepted_by": "CTO"}
import logging logging.error("Exception occurred", exc_info=True)
# Example: Python list of assets assets = ["webserver", "database", "API endpoint"]
import nmap scanner = nmap.PortScanner() scanner.scan('192.168.1.0/24')
# Example: uptime monitoring if uptime_percentage < 99.9: alert_team()
# Python log retention example import shutil, os log_dir = "/var/log/security" archive = "/backup/security_logs.zip" shutil.make_archive(archive.replace(".zip", ""), 'zip', log_dir) print("Logs archived for audit")
# STRIDE Threat Model Categories stride = ["Spoofing", "Tampering", "Repudiation", "Information Disclosure", "Denial of Service", "Elevation of Privilege"] for threat in stride: print("Assessing threat:", threat)
# Example alignment check framework = ["Access Control", "Asset Management", "Incident Response"] org_controls = ["Access Control", "Incident Response"] missing = set(framework) - set(org_controls) print("Missing controls:", missing)
# Auto-create remediation task vuln = "OpenSSH outdated" if vuln: print(f"Creating ticket for: {vuln}") # Can integrate with Jira, ServiceNow
# Python: Trigger Nessus scan via API import requests scan_url = "https://nessus.local/api/v1/scans" data = {"template_id": 1, "name": "Weekly Scan"} response = requests.post(scan_url, json=data, verify=False) print("Scan triggered:", response.status_code)
# Simulate integration with patch system vuln = "Critical OpenSSL bug" patch_system = "Patch Deployer" print(f"{vuln} sent to {patch_system} for deployment")
# Example using Trivy in CI pipeline os.system("trivy fs --severity HIGH /app/code")
# Send issue to GitHub from scan import requests issue = {"title": "Fix XSS", "body": "Found in index.js"} requests.post("https://api.github.com/repos/org/repo/issues", json=issue)
# Simple MTTR calculation import datetime opened = datetime.datetime(2024, 5, 1) closed = datetime.datetime(2024, 5, 4) print("MTTR:", (closed - opened).days, "days")
# Basic fuzz test example inputs = ["A" * x for x in range(1, 100)] for input_str in inputs: print("Testing input:", input_str)
# Example: Checkov CLI os.system("checkov -d /terraform/code")
# MobSF API scan trigger requests.post("http://mobsf/api/v1/scan", files={"file": open("apk_file.apk", "rb")})
# Check serial debug port status import serial try: serial.Serial("/dev/ttyUSB0", 9600) print("Debug port active") except: print("Debug port not found")
# Example: Real-time feed poller import time while True: print("Checking feed... (simulated)") time.sleep(10)
# Example of ML model prediction from sklearn.ensemble import RandomForestClassifier model = RandomForestClassifier() X_train = [[0,1],[1,0]]; y_train = [1,0] model.fit(X_train, y_train) print("Prediction:", model.predict([[1,1]]))
# Simulate an attack methodology phase in Python def simulate_attack_phase(phase): print(f"[!] Simulating phase: {phase}") simulate_attack_phase("Reconnaissance")
# List all Cyber Kill Chain phases kill_chain = [ "Reconnaissance", "Weaponization", "Delivery", "Exploitation", "Installation", "Command and Control", "Actions on Objectives" ] for phase in kill_chain: print(f"Kill Chain Phase: {phase}")
# Represent a simple Diamond Model relationship in Python diamond = { "Adversary": "APT28", "Capability": "Zero-day exploit", "Infrastructure": "Malicious domain", "Victim": "Energy sector" } for key, value in diamond.items(): print(f"{key}: {value}")
# Print sample ATT&CK tactics tactics = [ "Initial Access", "Execution", "Persistence", "Privilege Escalation", "Defense Evasion", "Credential Access" ] for tactic in tactics: print(f"MITRE ATT&CK Tactic: {tactic}")
# Pseudocode: Print OSSTMM testing aspects osstmm_areas = ["Data Controls", "Process Control", "Operational Risk", "Metrics"] for area in osstmm_areas: print(f"OSSTMM Testing Focus: {area}")
# Python: Basic automated scan example for XSS vulnerability in URLs import requests def check_xss(url): payload = "<script>alert('XSS')</script>" test_url = f"{url}{payload}" response = requests.get(test_url) if payload in response.text: print(f"Potential XSS vulnerability detected at {test_url}") else: print(f"No XSS vulnerability detected at {url}") # Example usage check_xss("http://example.com/search?q=")
# Python: Simple log anomaly detection example def detect_incident(logs): suspicious_keywords = ["failed login", "unauthorized", "error", "malware"] incidents = [] for line in logs: if any(keyword in line.lower() for keyword in suspicious_keywords): incidents.append(line) return incidents # Example usage logs = [ "User admin failed login at 10:15", "File uploaded successfully", "Unauthorized access attempt detected", ] alerts = detect_incident(logs) print("Detected incidents:") for alert in alerts: print(alert)
# Python: Simple chain of custody tracker from datetime import datetime chain_of_custody = [] def acquire_evidence(evidence_id, handler): timestamp = datetime.now().isoformat() chain_of_custody.append({ "evidence_id": evidence_id, "handler": handler, "action": "acquired", "timestamp": timestamp }) print(f"Evidence {evidence_id} acquired by {handler} at {timestamp}") # Example usage acquire_evidence("EV001", "Investigator Alice") print(chain_of_custody)
# Python: Validate IoCs by checking format def validate_ioc(ioc): import re ip_pattern = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$" hash_pattern = r"^[a-fA-F0-9]{64}$" # SHA-256 hash if re.match(ip_pattern, ioc): return "Valid IP" elif re.match(hash_pattern, ioc): return "Valid SHA-256 Hash" else: return "Unknown IoC format" # Example usage print(validate_ioc("192.168.1.100")) print(validate_ioc("d2d2c3a3e4f1b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2")) print(validate_ioc("http://malicious.com"))
# Python: Correlate login failures from different logs def correlate_logs(log_sources): failed_logins = [] for source in log_sources: for entry in source: if "failed login" in entry.lower(): failed_logins.append(entry) return failed_logins # Example usage firewall_logs = ["User admin failed login", "Connection from 10.0.0.1"] ids_logs = ["Failed login detected for user admin"] all_failed = correlate_logs([firewall_logs, ids_logs]) print("Correlated Failed Logins:") for entry in all_failed: print(entry)
# Python: List suspicious files by extension in a directory import os def suspicious_files(path): suspicious_exts = ['.exe', '.dll', '.bat'] files = [f for f in os.listdir(path) if os.path.splitext(f)[1].lower() in suspicious_exts] print("Suspicious files found:") for f in files: print(f) # Example usage (change path as needed) suspicious_files("/tmp")
# Python: Simulate sandbox logging behavior (conceptual) def sandbox_simulation(malware_name): print(f"Executing {malware_name} in sandbox...") activities = [ "Created file temp.dll", "Attempted network connection to 10.10.10.10", "Modified registry key HKCU\\Software\\Malware" ] for activity in activities: print(f"Sandbox log: {activity}") # Example usage sandbox_simulation("evil_malware.exe")
# Python: Simple containment flag and recovery status class Incident: def __init__(self, id): self.id = id self.contained = False self.eradicated = False self.recovered = False def contain(self): self.contained = True print(f"Incident {self.id} contained.") def eradicate(self): if self.contained: self.eradicated = True print(f"Incident {self.id} eradicated.") else: print("Contain incident first.") def recover(self): if self.eradicated: self.recovered = True print(f"Incident {self.id} recovered.") else: print("Eradicate incident first.") # Example usage inc = Incident("INC1001") inc.contain() inc.eradicate() inc.recover()
# Python: Simulate network device isolation status class NetworkDevice: def __init__(self, name): self.name = name self.isolated = False def isolate(self): self.isolated = True print(f"Device {self.name} is now isolated from the network.") def reconnect(self): self.isolated = False print(f"Device {self.name} reconnected to the network.") # Example usage device = NetworkDevice("Server01") device.isolate() device.reconnect()
# Python: Apply compensating controls status def apply_compensating_control(control_name): print(f"Compensating control '{control_name}' applied temporarily.") # Example usage apply_compensating_control("Manual login review for admin accounts")
# Python: Simulate system re-imaging status def reimage_system(system_name): print(f"Re-imaging system {system_name}... Complete. System restored to clean image.") # Example usage reimage_system("Workstation-23")
# Python: Reset permissions simulation def restore_permissions(user, permissions): print(f"Restoring permissions for {user}: {permissions}") # Example usage restore_permissions("alice", ["read", "write", "execute"])
# Python: Simulated vulnerability scan result def validate_recovery(system): print(f"Running vulnerability scan on {system}...") print("No critical vulnerabilities found. System is secure.") # Example usage validate_recovery("Database Server 1")
# Python: Incident response plan skeleton incident_response_plan = { "roles": ["Incident Handler", "Communications", "Legal", "IT Support"], "steps": ["Detect", "Analyze", "Contain", "Eradicate", "Recover", "Review"], "communication": { "internal": ["Email", "Phone", "Chat"], "external": ["Law Enforcement", "Regulators"] } } print("Incident Response Plan Overview:") for role in incident_response_plan["roles"]: print(f"- {role}")
# Python: Simple playbook example for malware incident malware_playbook = { "identification": ["Isolate affected machines", "Collect malware samples"], "eradication": ["Run anti-malware tools", "Remove malicious files"], "recovery": ["Restore systems from backups", "Monitor for reinfection"], "communication": ["Notify security team", "Update stakeholders"] } def execute_playbook(playbook): for phase, actions in playbook.items(): print(f"{phase.capitalize()} phase:") for action in actions: print(f"- {action}") # Example usage execute_playbook(malware_playbook)
# Sample table setup incident = "Ransomware Outbreak" roles = ["SOC Analyst", "Incident Commander", "Legal"] for r in roles: print(f"{r} participating in {incident} exercise")
# Simulate tool use forensic_tool = "Volatility" print(f"Launching memory analysis with {forensic_tool}")
# Simulate failover test primary = False if not primary: print("Switching to DR site...")
# RACI example roles = {"SOC Analyst": "Responsible", "Manager": "Accountable"} for role, duty in roles.items(): print(f"{role} is {duty}")
# Python: Log post-incident recovery activities def post_incident_log(incident_id, actions, notes): """ Logs post-incident activities for accountability and future review. incident_id: str - unique incident identifier actions: list - recovery and validation steps completed notes: str - additional observations or recommendations """ print(f"Logging post-incident activities for {incident_id}") for action in actions: print(f"- Action completed: {action}") print(f"Additional notes: {notes}") # Example usage post_incident_log("INC2025-07", ["System integrity verified", "Backups restored"], "Recommend patching vulnerability XYZ.")
# Python: Summarize lessons learned from after-action report def after_action_report(incident_id, timeline, successes, improvements): print(f"After-Action Report: Incident {incident_id}\n") print("Incident Timeline:") for event in timeline: print(f" - {event}") print("\nWhat Went Well:") for success in successes: print(f" - {success}") print("\nAreas for Improvement:") for imp in improvements: print(f" - {imp}") # Example usage after_action_report( "INC2025-07", ["Malware detected at 10:00 AM", "Isolated infected systems at 10:30 AM", "Systems restored by 1:00 PM"], ["Fast isolation prevented spread", "Good team coordination"], ["Improve phishing awareness training", "Enhance endpoint monitoring"] )
# Python: 5 Whys root cause analysis example def five_whys(incident): print(f"Root Cause Analysis for: {incident}") reasons = [ "Why 1: Firewall rules misconfigured", "Why 2: Staff unaware of rule changes", "Why 3: No documentation for updates", "Why 4: Lack of training on procedures", "Why 5: Inadequate oversight" ] for reason in reasons: print(reason) # Example usage five_whys("Unauthorized access through firewall")
# Python: Format executive incident summary def executive_summary(incident_id, impact, response, risks, recommendations): print(f"Executive Summary for Incident {incident_id}") print("-"*50) print(f"Impact: {impact}") print(f"Response Summary: {response}") print(f"Residual Risks: {risks}") print("Recommendations:") for rec in recommendations: print(f" - {rec}") # Example usage executive_summary( "INC2025-07", "Customer data exposure affecting 10,000 users", "Incident contained within 2 hours", "Medium risk of data misuse", ["Increase endpoint monitoring", "Conduct staff security training"] )
# Python: Update security playbook dictionary def update_playbook(playbook, incident_type, steps): playbook[incident_type] = steps print(f"Playbook updated for {incident_type} incidents.") # Example playbook and update playbook = { "phishing": ["Identify email", "Quarantine user account", "Reset credentials"] } update_playbook(playbook, "ransomware", ["Isolate infected systems", "Notify response team", "Restore backups"]) print(playbook)
# Python: Add IoCs to a list and display them def add_iocs(ioc_list, new_ioc): ioc_list.append(new_ioc) print("Updated IoC List:") for ioc in ioc_list: print(f" - {ioc}") # Example usage iocs = ["hash:abcd1234", "ip:192.168.1.100"] add_iocs(iocs, "domain:malicious-site.com")
# Python: Simulate forensic artifact collection def collect_artifacts(artifacts): print("Collecting forensic artifacts:") for artifact in artifacts: print(f"- Collected {artifact}") # Example artifacts artifacts = ["memory dump", "network logs", "file metadata"] collect_artifacts(artifacts)
# Python: Simulate imaging and carving processes def imaging(device): print(f"Creating bit-for-bit image of {device}... Done.") def carving(image_file): print(f"Carving data fragments from {image_file}... Found 3 deleted files.") # Example usage imaging("Disk 1") carving("Disk1.img")
import hashlib def sha256_hash(file_path): sha256 = hashlib.sha256() with open(file_path, "rb") as f: while chunk := f.read(8192): sha256.update(chunk) return sha256.hexdigest() # Example usage (file must exist) # print(sha256_hash("evidence.img"))
# Python: Simple timeline reconstruction from event logs def reconstruct_timeline(events): sorted_events = sorted(events, key=lambda e: e["timestamp"]) print("Attack Timeline:") for event in sorted_events: print(f"{event['timestamp']}: {event['description']}") # Example usage events = [ {"timestamp": "2025-07-25 10:00", "description": "Phishing email sent"}, {"timestamp": "2025-07-25 10:15", "description": "User clicked link"}, {"timestamp": "2025-07-25 10:45", "description": "Malware installed"}, ] reconstruct_timeline(events)
# Python: Simple chain of custody log def chain_of_custody_log(evidence_id, handler, action): from datetime import datetime timestamp = datetime.now().isoformat() print(f"{timestamp} | Evidence {evidence_id} | {action} by {handler}") # Example usage chain_of_custody_log("EV123", "Analyst John", "Collected") chain_of_custody_log("EV123", "Lab Tech Alice", "Transferred")
# Python: Simulate artifact extraction and analysis def analyze_artifacts(artifacts): print("Analyzing forensic artifacts:") for artifact in artifacts: print(f"- Extracted data from {artifact}") # Example artifacts artifacts = ["Registry", "Windows Event Log", "Browser Cache"] analyze_artifacts(artifacts)
# Python: Simple escalation notification def notify_stakeholders(severity, message): stakeholders = { "low": ["IT Team"], "medium": ["IT Team", "Security Manager"], "high": ["IT Team", "Security Manager", "Executives", "Legal"] } recipients = stakeholders.get(severity, ["IT Team"]) print(f"Notifying {', '.join(recipients)}: {message}") # Example usage notify_stakeholders("high", "Data breach detected in payment system.")
# Python: List stakeholders based on incident type def identify_stakeholders(incident_type): base = ["IT", "Security Team"] if incident_type == "data breach": base += ["Legal", "Compliance", "Executives", "Regulators", "Affected Customers"] elif incident_type == "phishing": base += ["Security Awareness Team"] print(f"Stakeholders for {incident_type}: {', '.join(base)}") # Example usage identify_stakeholders("data breach")
# Python: Simulate regulatory reporting checklist def regulatory_report_checklist(incident): checklist = [ "Assess data types affected", "Determine notification deadlines", "Prepare disclosure statements", "Notify regulatory bodies", "Inform affected individuals" ] print(f"Regulatory reporting checklist for {incident}:") for item in checklist: print(f"- {item}") # Example usage regulatory_report_checklist("Customer data breach")
# Python: Draft simple public relations statement template def pr_statement(incident, message, contact): print(f"Public Relations Statement on {incident}") print("-"*50) print(message) print(f"\nFor more information, contact: {contact}") # Example usage pr_statement( "Security Incident July 2025", "Our teams have contained the incident quickly and are working diligently to ensure customer data safety.", "pr@company.com" )
# Python: Simple example using TLS socket to secure communication import socket import ssl def secure_client_send(message, server_hostname, server_port): context = ssl.create_default_context() with socket.create_connection((server_hostname, server_port)) as sock: with context.wrap_socket(sock, server_hostname=server_hostname) as ssock: ssock.sendall(message.encode()) response = ssock.recv(1024).decode() print("Received:", response) # Example usage (replace with real server info) # secure_client_send("Incident update: containment successful.", "secure.server.com", 443)
# Python: Escalation example based on incident severity def escalate_incident(severity): if severity >= 8: print("Escalate to CISO and executive team.") elif severity >= 5: print("Notify incident response team.") else: print("Handle at local IT team level.") # Example usage escalate_incident(7) # Output: Notify incident response team.
# Python: Simple KPI tracker example kpi = { "mean_time_to_detect": 12, # hours "mean_time_to_respond": 6, # hours "incidents_resolved": 15, "incidents_open": 3 } def print_kpi(kpi): for key, value in kpi.items(): print(f"{key.replace('_',' ').title()}: {value}") print_kpi(kpi)
# Python: Calculate average MTTD from incident timestamps from datetime import datetime incident_times = [ ("2023-07-20 10:00:00", "2023-07-20 12:30:00"), # (occurred, detected) ("2023-07-21 09:15:00", "2023-07-21 10:00:00"), ("2023-07-22 14:00:00", "2023-07-22 14:45:00"), ] def calculate_mttd(incident_times): total_seconds = 0 for occurred, detected in incident_times: fmt = "%Y-%m-%d %H:%M:%S" start = datetime.strptime(occurred, fmt) end = datetime.strptime(detected, fmt) total_seconds += (end - start).total_seconds() average_seconds = total_seconds / len(incident_times) return average_seconds / 3600 # convert to hours print(f"Average MTTD: {calculate_mttd(incident_times):.2f} hours")
# Python: Calculate average MTTR from incident timestamps incident_response_times = [ ("2023-07-20 12:30:00", "2023-07-20 15:00:00"), # (detected, resolved) ("2023-07-21 10:00:00", "2023-07-21 13:15:00"), ("2023-07-22 14:45:00", "2023-07-22 18:00:00"), ] def calculate_mttr(response_times): total_seconds = 0 for detected, resolved in response_times: fmt = "%Y-%m-%d %H:%M:%S" start = datetime.strptime(detected, fmt) end = datetime.strptime(resolved, fmt) total_seconds += (end - start).total_seconds() average_seconds = total_seconds / len(response_times) return average_seconds / 3600 # convert to hours print(f"Average MTTR: {calculate_mttr(incident_response_times):.2f} hours")
# Python: Count alerts by type from a list alerts = [ {"type": "malware", "details": "..."}, {"type": "phishing", "details": "..."}, {"type": "malware", "details": "..."}, {"type": "intrusion", "details": "..."}, {"type": "malware", "details": "..."}, ] from collections import Counter def count_alerts(alerts): types = [alert["type"] for alert in alerts] counts = Counter(types) return counts print("Alert volumes by type:") print(count_alerts(alerts))
# Python: Simple trend analysis example (monthly incidents) monthly_incidents = { "Jan": 5, "Feb": 7, "Mar": 9, "Apr": 6, "May": 8, "Jun": 10, } def analyze_trends(data): for month, count in data.items(): print(f"{month}: {count} incidents") analyze_trends(monthly_incidents)
# Python: Track improvement in MTTD over quarters mttd_quarters = [15, 12, 10, 8] # hours per quarter def print_improvements(metrics): for i, value in enumerate(metrics, 1): print(f"Quarter {i}: MTTD = {value} hours") print_improvements(mttd_quarters)
# Python: Example automated alert triage function def auto_triage(alert): if alert["severity"] > 7: return "High priority - escalate immediately" elif alert["severity"] > 4: return "Medium priority - investigate" else: return "Low priority - monitor" # Example alert alert = {"id": "A123", "severity": 8} print(auto_triage(alert))
# Python: Simplified SOAR playbook runner simulation def run_playbook(steps): for step in steps: print(f"Executing: {step}") playbook_steps = [ "Collect logs from endpoint", "Enrich alert with threat intel", "Isolate affected device", "Notify security team", ] run_playbook(playbook_steps)
# Python: Automated triage filtering example alerts = [ {"id": "1", "severity": 9, "source": "IDS"}, {"id": "2", "severity": 3, "source": "Firewall"}, {"id": "3", "severity": 6, "source": "Endpoint"}, ] def filter_alerts(alerts, threshold=5): return [a for a in alerts if a["severity"] >= threshold] high_priority = filter_alerts(alerts) print("High priority alerts:") for alert in high_priority: print(alert)
# Python: Simulated integration fetching threat intel def fetch_threat_intel(): return ["192.168.100.5", "malicious.com", "hash123456"] def check_alert_against_intel(alert, intel): return any(i in alert for i in intel) # Example usage intel_data = fetch_threat_intel() alert = "Connection attempt to 192.168.100.5" if check_alert_against_intel(alert, intel_data): print("Alert matches known threat intel!") else: print("No match with threat intel.")
# Python: Simulate automated evidence capture def capture_evidence(evidence_type): print(f"Capturing {evidence_type} evidence...") # Placeholder for real capture logic print(f"{evidence_type} evidence saved with timestamp.") # Example usage capture_evidence("memory dump") capture_evidence("system logs")
# Python: Basic Git command example for playbook version control # (Note: Run these commands in terminal, shown here as comments) # git init # git add incident_playbook.yml # git commit -m "Initial playbook version" # git push origin main
# Python: Simple simulation of playbook testing def test_playbook(playbook): print("Starting playbook test...") for step in playbook: print(f"Executing step: {step}") print("Playbook test completed successfully.") playbook_steps = ["Identify incident", "Contain threat", "Eradicate malware", "Recover systems"] test_playbook(playbook_steps)
# Python: Placeholder for scheduling training sessions def schedule_training(date, topic): print(f"Training on '{topic}' scheduled for {date}.") schedule_training("2025-08-15", "Ransomware response simulation")
# Python: Record lessons learned example lessons = [] def add_lesson(lesson): lessons.append(lesson) print("Lesson added.") add_lesson("Need faster malware detection tools.") add_lesson("Improve communication during escalation.") print("Lessons learned:") for l in lessons: print(f"- {l}")
# Python: Generate simple incident report incident_report = { "id": "INC-20250727-01", "description": "Phishing email detected", "actions": ["Blocked sender", "Notified users", "Monitored inbox"], "status": "Closed" } def print_report(report): for key, value in report.items(): print(f"{key.title()}: {value}") print_report(incident_report)
# Python: Check if incident requires breach notification def requires_notification(incident_type): notify_types = ["data breach", "personal data exposure"] return incident_type.lower() in notify_types print(requires_notification("Data Breach")) # True print(requires_notification("Malware Infection")) # False
# Python: Template for public statement def create_public_statement(incident_summary, actions_taken): statement = f"Security Incident Report:\n{incident_summary}\nActions Taken:\n" for action in actions_taken: statement += f"- {action}\n" return statement summary = "A phishing attack targeted our email system on July 20." actions = ["Blocked phishing emails", "Alerted affected users", "Enhanced email filtering"] print(create_public_statement(summary, actions))
# Python: Example vulnerability report generator vulnerability = { "id": "VULN-2025-001", "description": "SQL Injection vulnerability in login form", "risk_level": "High", "affected_system": "Web Application Server", "recommendation": "Implement parameterized queries to prevent injection." } def print_vulnerability_report(vuln): print(f"Vulnerability ID: {vuln['id']}") print(f"Description: {vuln['description']}") print(f"Risk Level: {vuln['risk_level']}") print(f"Affected System: {vuln['affected_system']}") print(f"Recommendation: {vuln['recommendation']}") print_vulnerability_report(vulnerability)
# Python: Simple CSV compliance report creation import csv data = [ {"control": "Access Control", "status": "Compliant"}, {"control": "Encryption", "status": "Non-Compliant"}, ] with open('compliance_report.csv', 'w', newline='') as csvfile: fieldnames = ['control', 'status'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for row in data: writer.writerow(row) print("Compliance report saved as compliance_report.csv")
# Python: Calculate average risk score from vulnerabilities vulnerabilities = [ {"id": "V1", "risk_score": 8.5}, {"id": "V2", "risk_score": 6.0}, {"id": "V3", "risk_score": 9.2}, ] def average_risk_score(vulns): total = sum(v["risk_score"] for v in vulns) return total / len(vulns) print(f"Average Risk Score: {average_risk_score(vulnerabilities):.2f}")
# Python: Simple printout of affected hosts mapping affected_hosts = { "Web Server": ["192.168.1.10", "192.168.1.11"], "Database Server": ["192.168.1.20"], "User Workstations": ["192.168.1.100", "192.168.1.101", "192.168.1.102"], } for host_type, ips in affected_hosts.items(): print(f"{host_type}:") for ip in ips: print(f" - {ip}")
# Python: Example mitigation plan tracker mitigation_plan = { "immediate_action": "Apply patch KB123456", "long_term": "Review and update firewall rules quarterly", "training": "Conduct phishing awareness every 6 months" } def print_mitigation_plan(plan): for key, action in plan.items(): print(f"{key.replace('_',' ').title()}: {action}") print_mitigation_plan(mitigation_plan)
# Python: Simple task list for action plan action_plan = [ {"task": "Patch vulnerable servers", "owner": "SysAdmin", "deadline": "2025-08-10", "status": "Pending"}, {"task": "Update antivirus signatures", "owner": "Security Team", "deadline": "2025-08-05", "status": "Completed"}, ] def print_action_plan(tasks): for t in tasks: print(f"Task: {t['task']}") print(f"Owner: {t['owner']}") print(f"Deadline: {t['deadline']}") print(f"Status: {t['status']}") print("---") print_action_plan(action_plan)
# Python: Define a communication plan schedule communication_plan = { "weekly_updates": "Email to security and IT teams", "management_reports": "Monthly summary meetings", "external_notifications": "As per regulatory requirements" } def print_communication_plan(plan): for key, method in plan.items(): print(f"{key.replace('_',' ').title()}: {method}") print_communication_plan(communication_plan)
# Python: List of stakeholder groups stakeholders = ["IT Team", "Security Team", "Legal Department", "Compliance", "Executives", "Vendors", "Customers"] print("Stakeholder Groups:") for s in stakeholders: print(f"- {s}")
# Python: Example timeline setup reporting_schedule = { "critical": "Daily", "high_risk": "Weekly", "routine": "Monthly" } for level, cadence in reporting_schedule.items(): print(f"{level.title()} vulnerabilities: Report every {cadence}")
# Python: Example SLA data structure slas = { "response_time": "4 hours", "resolution_time": "24 hours", "notification_time": "1 hour" } def print_sla_details(sla): for key, value in sla.items(): print(f"{key.replace('_',' ').title()}: {value}") print_sla_details(slas)
# Python: Governance checklist example governance_checklist = { "policy_review": True, "risk_approval": True, "compliance_audit": False, } def print_governance_status(checklist): for item, status in checklist.items(): print(f"{item.replace('_',' ').title()}: {'Completed' if status else 'Pending'}") print_governance_status(governance_checklist)
# Python: Simple escalation path list escalation_path = ["Security Analyst", "Security Manager", "CISO", "Legal Team"] print("Escalation Path:") for level in escalation_path: print(f"- {level}")
# Python: Calculate remediation KPI example vulnerabilities = [ {"id": "V1", "status": "Remediated", "days_to_fix": 5}, {"id": "V2", "status": "Open", "days_to_fix": None}, {"id": "V3", "status": "Remediated", "days_to_fix": 3}, ] remediated = [v for v in vulnerabilities if v["status"] == "Remediated"] average_days = sum(v["days_to_fix"] for v in remediated) / len(remediated) print(f"Average Days to Remediate: {average_days:.2f}")
# Python: Plotting vulnerability trend over months (requires matplotlib) import matplotlib.pyplot as plt months = ["Jan", "Feb", "Mar", "Apr", "May"] open_vulns = [12, 9, 7, 5, 3] plt.plot(months, open_vulns, marker='o') plt.title("Open Vulnerabilities Trend Over Time") plt.xlabel("Month") plt.ylabel("Number of Open Vulnerabilities") plt.show()
# Python: Simple zero-day tracker zero_days = [ {"id": "ZD-001", "discovered": "2025-07-01"}, {"id": "ZD-002", "discovered": "2025-07-15"}, ] print(f"Total Zero-Day Vulnerabilities: {len(zero_days)}") for zd in zero_days: print(f"- {zd['id']} discovered on {zd['discovered']}")
# Python: Calculate SLA compliance percentage sla_targets = { "response_time_hours": 4, "resolution_time_hours": 24, } incidents = [ {"id": "INC1", "response_hours": 3, "resolution_hours": 20}, {"id": "INC2", "response_hours": 5, "resolution_hours": 30}, ] def calculate_compliance(incidents, target_key): met = sum(1 for i in incidents if i[target_key] <= sla_targets[target_key]) return (met / len(incidents)) * 100 response_compliance = calculate_compliance(incidents, "response_hours") resolution_compliance = calculate_compliance(incidents, "resolution_hours") print(f"Response Time SLA Compliance: {response_compliance:.1f}%") print(f"Resolution Time SLA Compliance: {resolution_compliance:.1f}%")
# Python: Simple heatmap visualization (requires seaborn and matplotlib) import seaborn as sns import matplotlib.pyplot as plt import numpy as np # Simulated vulnerability severity across 5 systems data = np.array([ [3, 1, 4, 2, 5], [2, 5, 3, 1, 4], [4, 2, 1, 5, 3], [5, 3, 2, 4, 1], [1, 4, 5, 3, 2], ]) sns.heatmap(data, annot=True, cmap="YlOrRd") plt.title("Vulnerability Heatmap") plt.show()
# Python: Calculate remediation velocity example import datetime vulns = [ {"id": "V1", "discovered": datetime.date(2025, 7, 1), "remediated": datetime.date(2025, 7, 5)}, {"id": "V2", "discovered": datetime.date(2025, 7, 3), "remediated": datetime.date(2025, 7, 6)}, ] total_days = sum((v["remediated"] - v["discovered"]).days for v in vulns) average_velocity = total_days / len(vulns) print(f"Average Remediation Velocity: {average_velocity:.1f} days")
# Python: Track incident communications log communications_log = [] def log_communication(recipient, message): communications_log.append({"recipient": recipient, "message": message, "timestamp": datetime.datetime.now()}) print(f"Logged communication to {recipient}") import datetime log_communication("IT Team", "Incident detected and contained.") log_communication("Management", "Mitigation in progress.") print("Communication Log:") for entry in communications_log: print(f"{entry['timestamp']}: To {entry['recipient']} - {entry['message']}")
# Python: Generate a concise executive summary incident_details = { "impact": "Temporary service disruption", "actions": ["Incident contained", "Systems restored", "Security patch applied"], "risk_level": "Moderate", "next_steps": ["Conduct post-incident review", "Update security policies"] } def executive_summary(details): summary = f"Impact: {details['impact']}\n" summary += "Key Actions:\n" for action in details["actions"]: summary += f"- {action}\n" summary += f"Risk Level: {details['risk_level']}\n" summary += "Recommended Next Steps:\n" for step in details["next_steps"]: summary += f"- {step}\n" return summary print(executive_summary(incident_details))
# Python: Example of storing 5W timeline events timeline = [ {"who": "Attacker", "what": "Phishing Email Sent", "when": "2025-07-01 09:00", "where": "External Network", "why": "Credential Theft"}, {"who": "User", "what": "Clicked Link", "when": "2025-07-01 09:15", "where": "User PC", "why": "Unaware of Threat"}, {"who": "Security Team", "what": "Incident Detected", "when": "2025-07-01 10:00", "where": "SIEM Dashboard", "why": "Anomaly Alert"} ] for event in timeline: print(f"Who: {event['who']}, What: {event['what']}, When: {event['when']}, Where: {event['where']}, Why: {event['why']}")
# Python: Example of impact assessment with recommendations incident_impact = { "data_loss": True, "system_downtime_hours": 5, "financial_cost_estimate": 20000 } def assess_impact(impact): if impact["data_loss"]: print("Critical: Data loss occurred.") if impact["system_downtime_hours"] > 4: print("High impact: Extended downtime.") print(f"Estimated financial cost: ${impact['financial_cost_estimate']}") recommendations = [ "Restore data from backup.", "Improve endpoint detection.", "Conduct employee phishing training." ] assess_impact(incident_impact) print("\nRecommendations:") for rec in recommendations: print(f"- {rec}")
# Python: Simulate evidence collection in a report evidence = { "logs": ["auth.log", "firewall.log"], "screenshots": ["alert1.png", "dashboard.png"], "forensic_images": ["disk_image.dd"] } def list_evidence(evidence_dict): for ev_type, files in evidence_dict.items(): print(f"{ev_type.title()}:") for f in files: print(f" - {f}") print("Collected Evidence:") list_evidence(evidence)
# Python: Scope and impact example scope = ["Email Server", "HR Database", "Employee Workstations"] impact = { "disruption": "Email downtime for 3 hours", "data_exposure": "Personal employee data", "financial_loss": 15000 } print("Incident Scope:") for system in scope: print(f"- {system}") print("\nIncident Impact:") for key, val in impact.items(): print(f"{key.title().replace('_', ' ')}: {val}")
# Python: Example stakeholder message templates stakeholders = { "executives": "Summary of incident impact and remediation status.", "IT Team": "Technical details and immediate tasks.", "Legal": "Compliance and regulatory implications.", "Customers": "Reassurance and support information." } for group, message in stakeholders.items(): print(f"{group.title()} Message: {message}")
# Python: Example briefing data structure briefing = { "incident_summary": "Phishing attack caused temporary email service disruption.", "business_impact": "Minor, no financial loss yet.", "actions_taken": ["Patch applied", "User training initiated"], "resources_needed": ["Additional monitoring tools"] } for key, val in briefing.items(): print(f"{key.replace('_',' ').title()}: {val}")
# Python: Example breach notification format breach_report = { "date_reported": "2025-07-15", "affected_data": "Personal Identifiable Information (PII)", "number_of_records": 1500, "mitigation_steps": ["Access revoked", "Encryption updated"], "reporting_authority": "Data Protection Agency" } for key, val in breach_report.items(): print(f"{key.replace('_',' ').title()}: {val}")
# Python: Example public message template public_message = """ We recently detected a security incident affecting our email services. No sensitive data was compromised. We have taken immediate corrective actions and are enhancing our security measures to prevent future incidents. Please remain vigilant for phishing emails and report suspicious activity. """ print(public_message)
# Python: Example disclosure message differentiation messages = { "internal": "Incident detected. Follow the incident response playbook and report anomalies.", "external": "We are investigating a recent incident and will update you with findings." } for audience, message in messages.items(): print(f"{audience.title()} Message: {message}")
# Python: Closure notification example closure_notification = { "incident_id": "INC-2025-007", "status": "Resolved", "resolution_summary": "Phishing campaign blocked; affected accounts secured.", "lessons_learned": "Need stronger email filtering and user training.", "date_closed": "2025-07-20" } for key, val in closure_notification.items(): print(f"{key.replace('_',' ').title()}: {val}")
# Python: Root cause and lessons learned summary report = { "root_cause": "Weak password policy allowed credential theft.", "lessons_learned": [ "Implement MFA for all accounts.", "Enhance password complexity requirements.", "Increase phishing simulation training frequency." ] } print(f"Root Cause: {report['root_cause']}") print("Lessons Learned:") for lesson in report["lessons_learned"]: print(f"- {lesson}")
# Python: Simple 5 Whys example def five_whys(problem): answers = [ "Why did the phishing succeed? - Weak user awareness.", "Why was awareness weak? - Insufficient training.", "Why was training insufficient? - Budget constraints.", "Why budget constrained? - Management prioritization.", "Why was management priority low? - Lack of risk understanding." ] print(f"Problem: {problem}\n") for ans in answers: print(ans) five_whys("Phishing attack led to credential compromise.")
# Python: Example gap analysis report required_controls = {"MFA", "Encryption", "Logging", "Patch Management"} current_controls = {"Encryption", "Logging"} gaps = required_controls - current_controls print("Security Control Gaps:") for gap in gaps: print(f"- {gap}")
# Python: Example remediation output tasks remediation_plan = [ {"task": "Enable MFA", "owner": "Security Team", "due_date": "2025-08-15"}, {"task": "Apply critical patches", "owner": "SysAdmin", "due_date": "2025-08-05"} ] for item in remediation_plan: print(f"Task: {item['task']}, Owner: {item['owner']}, Due: {item['due_date']}")
# Python: Example policy update tracker policy_updates = [ {"policy": "Password Policy", "update": "Require 12+ characters with special symbols"}, {"policy": "Incident Response", "update": "Add phishing detection training"} ] for update in policy_updates: print(f"Policy: {update['policy']}, Update: {update['update']}")
# Python: Example forensics summary update forensics_summary = { "initial_findings": "Malware execution detected at 09:00", "additional_findings": "C2 communication confirmed at 10:30", "final_status": "Malware eradicated, no data exfiltration detected" } for key, val in forensics_summary.items(): print(f"{key.replace('_',' ').title()}: {val}")
# Python: Example communication tools list tools_formats = { "Email": "Detailed reports and notifications", "Instant Messaging": "Quick alerts and coordination", "Dashboard": "Live incident status updates", "Presentation": "Executive briefings" } for tool, purpose in tools_formats.items(): print(f"{tool}: {purpose}")
# Python: Simulate dashboard data update dashboard_data = { "active_incidents": 3, "resolved_incidents": 15, "average_response_time_minutes": 45 } def display_dashboard(data): for k, v in data.items(): print(f"{k.replace('_',' ').title()}: {v}") display_dashboard(dashboard_data)
# Python: Generate a simple bar chart with matplotlib import matplotlib.pyplot as plt vulnerabilities = ['Low', 'Medium', 'High'] counts = [10, 5, 2] plt.bar(vulnerabilities, counts, color=['green', 'orange', 'red']) plt.title('Vulnerability Severity Counts') plt.xlabel('Severity') plt.ylabel('Count') plt.show()
# Python: Simple executive briefing template output briefing_template = { "Title": "Security Incident Executive Briefing", "Date": "2025-07-21", "Summary": "Phishing incident resulted in limited email disruption; no data loss detected.", "Business Impact": "Minimal operational impact, no financial loss.", "Next Steps": "Enhance email filtering and conduct user training.", "Resource Requests": "Approval for additional monitoring tools." } for key, val in briefing_template.items(): print(f"{key}: {val}\n")
// Example Python snippet to parse forensic logs and extract key events with open('forensic_log.txt', 'r') as f: logs = f.readlines() for line in logs: if "ERROR" in line or "SUSPICIOUS" in line: print("Important event:", line.strip())
// Sample JSON format for sharing forensic summary across teams { "incident_id": "1234", "summary": "Unauthorized access detected", "technical_details": "...", "compliance_implications": "Potential GDPR breach", "recommended_actions": "Reset passwords, enhance monitoring" }
// Python automation snippet to generate basic forensic report import datetime def generate_report(events): report = f"Forensic Report - {datetime.date.today()}\n" report += "Events:\n" for e in events: report += f"- {e}\n" with open('forensic_report.txt', 'w') as f: f.write(report) print("Report generated.") events = ["Login failure", "Malware detected", "File deletion"] generate_report(events)
// Example: Triggering Nessus scan report download (pseudo-code) import requests def download_report(scan_id): url = f"https://nessus.local/api/scans/{scan_id}/report" response = requests.get(url, verify=False) with open("scan_report.pdf", "wb") as file: file.write(response.content) print("Scan report downloaded.") download_report(101)
// Sample code to refresh security dashboard via REST API import requests def refresh_dashboard(): api_url = "https://securitydashboard.local/api/data" response = requests.get(api_url) if response.status_code == 200: data = response.json() print("Dashboard data updated:", data) else: print("Failed to refresh dashboard.") refresh_dashboard()
// Example pseudo-code for SOAR summary generation incident = { "alerts": 5, "actions_taken": ["Blocked IP", "Reset password"], "status": "Contained" } summary = f"Incident Summary:\nAlerts: {incident['alerts']}\nActions: {', '.join(incident['actions_taken'])}\nStatus: {incident['status']}" print(summary)
// Example Python snippet to fetch and integrate threat feed import requests feed_url = "https://threatfeed.example.com/api/latest" response = requests.get(feed_url) if response.ok: threats = response.json() print("Latest threat indicators:", threats) else: print("Failed to retrieve threat feed")
// Example Jinja2 template snippet for report generation report_template = """ Incident Report: - ID: {{ id }} - Date: {{ date }} - Summary: {{ summary }} - Actions Taken: {{ actions }} """ from jinja2 import Template data = {"id": 123, "date": "2025-07-27", "summary": "Data breach", "actions": "Password reset"} print(Template(report_template).render(data))
// Sample Python code to generate incident trend data incidents = [5, 3, 8, 2, 6] # incidents per month months = ["Jan", "Feb", "Mar", "Apr", "May"] for month, count in zip(months, incidents): print(f"{month}: {count} incidents")
// Python example splitting incidents quarterly incidents = [10, 15, 12, 20, 18, 25, 22, 30, 28, 35, 33, 40] # Monthly incidents quarters = [sum(incidents[i:i+3]) for i in range(0, 12, 3)] for i, q in enumerate(quarters, 1): print(f"Q{i} Incidents: {q}")
// Example pseudo code for creating presentation slides slides = [ "Intro: Security Posture", "Incident Trends", "Risk Mitigation Status", "Recommendations" ] for slide in slides: print("Slide:", slide)
// Python example to gather evidence files for audit import os, shutil evidence_dir = "/forensics/evidence" audit_dir = "/audit_ready" for file in os.listdir(evidence_dir): shutil.copy(os.path.join(evidence_dir, file), audit_dir) print("Evidence packaged for audit")
// Example compliance gap check pseudo-code required_controls = {"Encryption", "Access Control", "Audit Logs"} implemented_controls = {"Encryption", "Audit Logs"} gaps = required_controls - implemented_controls print("Compliance gaps:", gaps)
// Example risk score tracking (simplified) risk_scores = [80, 75, 70, 60, 55] for month, score in enumerate(risk_scores, 1): print(f"Month {month}: Risk Score = {score}")
// Sample policy snippet for secure communications policy = """ Only authorized personnel may disclose incident details. Compliance with GDPR and other privacy laws is mandatory. Communication should be logged and monitored. """ print(policy)
// Example pseudocode for masking PII in logs def mask_pii(data): return data.replace("user@example.com", "*****@*****") log = "User login from user@example.com" print(mask_pii(log))
// Example notification timeline (pseudo-code) breach_detected = True days_since_breach = 20 if breach_detected and days_since_breach <= 30: print("Notify regulators and affected parties within legal timeframe")
// Example using Python's ssl module for secure socket import socket, ssl context = ssl.create_default_context() with socket.create_connection(("secure.example.com", 443)) as sock: with context.wrap_socket(sock, server_hostname="secure.example.com") as ssock: print(ssock.version())
// Example: Encrypt sensitive notes before storage from cryptography.fernet import Fernet key = Fernet.generate_key() cipher_suite = Fernet(key) note = "Sensitive analyst commentary" encrypted_note = cipher_suite.encrypt(note.encode()) print(encrypted_note)
// Example workflow step in legal coordination def notify_legal(incident_id): print(f"Legal team notified about incident {incident_id}") notify_legal("INC-12345")
# Python: Simple training session tracker training_sessions = [ {"topic": "Phishing Awareness", "date": "2025-07-01", "trainer": "Alice", "attendees": 25}, {"topic": "Incident Response Basics", "date": "2025-07-10", "trainer": "Bob", "attendees": 30}, ] def list_training_sessions(sessions): for s in sessions: print(f"Topic: {s['topic']}") print(f"Date: {s['date']}") print(f"Trainer: {s['trainer']}") print(f"Attendees: {s['attendees']}") print("---") list_training_sessions(training_sessions)
# Python: Summarize training feedback scores training_feedback = [ {"topic": "Phishing Awareness", "avg_score": 4.5, "max_score": 5}, {"topic": "Incident Response", "avg_score": 4.7, "max_score": 5}, ] for feedback in training_feedback: print(f"Topic: {feedback['topic']}") print(f"Average Feedback Score: {feedback['avg_score']} / {feedback['max_score']}") print("---")
# Python: Log lessons learned points lessons_learned = [ "Improve incident communication protocols.", "Update firewall rules to cover new threats.", "Increase training frequency for new hires." ] print("Lessons Learned:") for lesson in lessons_learned: print(f"- {lesson}")
# Python: Simulate document hand-off checklist documents = ["Incident Response Plan", "Network Diagrams", "Security Policies", "Training Manuals"] def handoff_documents(docs): print("Documents handed off:") for doc in docs: print(f"- {doc}") handoff_documents(documents)
# Python: Simple report template function def generate_report_template(): report = { "Incident_ID": "", "Date": "", "Description": "", "Actions_Taken": "", "Recommendations": "", "Analyst_Name": "" } return report report = generate_report_template() print("Empty Report Template:") for k, v in report.items(): print(f"{k}: {v}")
# Python: Schedule and send education reminders import datetime education_schedule = [ {"topic": "New Phishing Techniques", "date": "2025-08-01"}, {"topic": "Cloud Security Updates", "date": "2025-09-01"}, ] def send_reminder(topic, date): print(f"Reminder: Training on '{topic}' scheduled for {date}") for edu in education_schedule: send_reminder(edu["topic"], edu["date"])
# Python: Textual lifecycle steps printout lifecycle_steps = ["Preparation", "Detection & Analysis", "Containment", "Eradication", "Recovery", "Post-Incident Activity"] print("Incident Lifecycle:") for step in lifecycle_steps: print(f"- {step}")
# Python: Simple timeline text representation events = [ {"time": "10:00", "event": "Initial Alert"}, {"time": "10:15", "event": "Incident Confirmed"}, {"time": "11:00", "event": "Containment Started"}, {"time": "14:00", "event": "Incident Resolved"}, ] print("Incident Timeline:") for e in events: print(f"{e['time']}: {e['event']}")
# Python: Simple kill chain stages list kill_chain_stages = [ "Reconnaissance", "Weaponization", "Delivery", "Exploitation", "Installation", "Command & Control", "Actions on Objectives", ] print("Kill Chain Stages:") for stage in kill_chain_stages: print(f"- {stage}")
# Python: Simple impact score bar visualization (text) impacts = {"Low": 1, "Medium": 3, "High": 5} for impact, score in impacts.items(): print(f"{impact}: {'█' * score}")
# Python: Simple graph relationships using dict threat_graph = { "Attacker": ["Malware", "Phishing"], "Malware": ["Server A", "Server B"], "Phishing": ["User Workstations"] } print("Threat Graph Relationships:") for node, connections in threat_graph.items(): print(f"{node} -> {', '.join(connections)}")
# Python: Simple text dashboard simulation dashboard = { "Open Incidents": 5, "Critical": 2, "High": 1, "Medium": 1, "Low": 1, } print("Incident Dashboard:") for key, value in dashboard.items(): print(f"{key}: {value}")
# Python: Continuous improvement task list improvement_tasks = [ "Update incident response policies", "Conduct quarterly security drills", "Enhance threat intelligence feeds", ] print("Continuous Improvement Tasks:") for task in improvement_tasks: print(f"- {task}")
# Python: Log post-incident review notes post_incident_reviews = [ "Response time was satisfactory.", "Communication gaps identified between teams.", "Recommend improved documentation." ] print("Post-Incident Review Notes:") for note in post_incident_reviews: print(f"- {note}")
# Python: Track remediation success rates remediation_results = [ {"vuln_id": "V1", "fixed": True}, {"vuln_id": "V2", "fixed": False}, {"vuln_id": "V3", "fixed": True}, ] success_count = sum(1 for r in remediation_results if r["fixed"]) total = len(remediation_results) print(f"Remediation Success Rate: {success_count}/{total} ({(success_count/total)*100:.1f}%)")
# Python: Documenting policy updates policy_updates = [ "Add multi-factor authentication requirements.", "Enforce stronger password policies.", "Increase monitoring of privileged accounts.", ] print("Policy Updates Needed:") for update in policy_updates: print(f"- {update}")
# Python: Track IR plan update items ir_plan_updates = [ "Include cloud service provider incident roles.", "Update contact list for incident response team.", "Add ransomware-specific response procedures.", ] print("IR Plan Update Items:") for item in ir_plan_updates: print(f"- {item}")
# Example: Simple maturity score tracker in Python # Define maturity levels for three domains maturity = { 'incident_response': 3, # Scale 1-5 'threat_detection': 4, 'compliance': 2 } def maturity_summary(maturity): for domain, level in maturity.items(): print(f"Domain: {domain.replace('_', ' ').title()} - Maturity Level: {level}/5") maturity_summary(maturity)
# Simulating a case study scenario case_study = { "incident": "Phishing Email Compromise", "steps": ["Detect email", "Isolate account", "Analyze logs", "Reset credentials"] } for step in case_study['steps']: print(f"Case Study Step: {step}")
# Example: Generating a ransomware report summary report = { "incident_type": "Ransomware", "detected_by": "EDR", "ransom_note_found": True, "impact_scope": "HR Department File Server", "response_status": "Containment Initiated" } for key, value in report.items(): print(f"{key.replace('_', ' ').title()}: {value}")
# Communication structure example message = { "recipient": "Customers", "tone": "Reassuring", "content": "We are investigating a data breach and taking all necessary steps to secure your information." } print(f"To: {message['recipient']}\nTone: {message['tone']}\nContent: {message['content']}")
# Example: Basic vulnerability case dictionary vuln_case = { "asset": "web01.example.com", "vulnerability": "CVE-2022-1234", "action": "Patch applied" } print("Vulnerability Action Case:") for k, v in vuln_case.items(): print(f"{k.title()}: {v}")
# Departments participating in incident departments = ["Security", "IT", "Legal", "HR"] for dept in departments: print(f"Notifying {dept} department...")
# Simulating audit report content report = { "incident": "Data leak", "regulation": "GDPR", "status": "Reported to DPA within 72 hours" } print("Audit Scenario Report:") for key, val in report.items(): print(f"{key}: {val}")
# Track certification status certs = {"CySA+": "2026-01-01", "CISSP": "2025-12-15"} for cert, expiry in certs.items(): print(f"{cert} expires on {expiry}")
# Example CEU log ceu_log = [ {"event": "SANS Workshop", "points": 6}, {"event": "CompTIA Webinar", "points": 2} ] total = sum(item["points"] for item in ceu_log) print(f"Total CEUs: {total}")
# Create education roadmap plan = ["Cloud Security Course", "Python for Security", "Blue Team Training"] for item in plan: print(f"Planned Training: {item}")
# Sample exam tips print("- Read exam objectives") print("- Use flashcards") print("- Practice hands-on scenarios")
# Simulate CLI task print("Configure a firewall rule:") print("allow tcp from 10.0.0.0/8 to any port 443")
# Example role map roles = ["Tier 1 SOC Analyst", "Incident Responder", "Security Engineer"] for role in roles: print(f"Career Path: {role}")
# Python Example: Simple SIEM log normalization function def normalize_log(log): normalized = { "timestamp": log.get("time"), "source_ip": log.get("src_ip"), "destination_ip": log.get("dest_ip"), "event_type": log.get("type"), "message": log.get("msg") } return normalized sample_log = { "time": "2025-07-27T14:22:00Z", "src_ip": "192.168.1.10", "dest_ip": "10.0.0.5", "type": "login_attempt", "msg": "Failed login from user admin" } print(normalize_log(sample_log))
# Python Example: Simple behavior-based detection stub def detect_suspicious_process(process_list): suspicious_keywords = ["keylogger", "proxy", "backdoor"] alerts = [] for proc in process_list: if any(word in proc.lower() for word in suspicious_keywords): alerts.append(f"Suspicious process detected: {proc}") return alerts running_processes = ["chrome.exe", "KeyLogger.exe", "explorer.exe"] alerts = detect_suspicious_process(running_processes) for alert in alerts: print(alert)
# Python Example: Simple IOC ingestion simulation def ingest_iocs(ioc_feed): iocs = [] for ioc in ioc_feed: iocs.append({ "type": ioc["type"], "value": ioc["value"], "confidence": ioc.get("confidence", "medium") }) return iocs sample_ioc_feed = [ {"type": "ip", "value": "203.0.113.45", "confidence": "high"}, {"type": "domain", "value": "malicious-site.com"} ] ingested_iocs = ingest_iocs(sample_ioc_feed) for ioc in ingested_iocs: print(ioc)
# Python Example: Parse AWS CloudTrail logs for failed login attempts import json def parse_cloudtrail_logs(logs): failed_logins = [] for event in logs["Records"]: if event["eventName"] == "ConsoleLogin" and event["responseElements"] is None: failed_logins.append({ "user": event["userIdentity"]["userName"], "time": event["eventTime"], "source_ip": event["sourceIPAddress"] }) return failed_logins sample_logs = { "Records": [ { "eventName": "ConsoleLogin", "responseElements": None, "userIdentity": {"userName": "admin"}, "eventTime": "2025-07-27T13:45:00Z", "sourceIPAddress": "198.51.100.10" }, { "eventName": "ConsoleLogin", "responseElements": {"Login": "Success"}, "userIdentity": {"userName": "dev_user"}, "eventTime": "2025-07-27T14:00:00Z", "sourceIPAddress": "198.51.100.20" } ] } failed = parse_cloudtrail_logs(sample_logs) for f in failed: print(f"Failed login by {f['user']} from IP {f['source_ip']} at {f['time']}")