# Example: Logging with synced timestamps
import datetime
print(f"Log Time: {datetime.datetime.utcnow().isoformat()} - User Login")
# Ensures consistent timestamps in logs for correlation.
# Simulate checking unnecessary service
services = ["Telnet", "RDP"]
if "Telnet" in services:
print("Disable Telnet to harden OS")
# Disabling insecure legacy services is key to OS security.
# Example: Alert if container lacks resource limits
container_config = {"memory": None}
if not container_config["memory"]:
print("Set memory limits to prevent abuse")
# Protects containers from resource exhaustion.
# Example: enforce zero trust policy
user_authenticated = False
if not user_authenticated:
print("Access denied: Re-authentication required")
# Zero trust denies by default.
# Example: enforcing MFA
login_success = True
mfa_passed = False
if login_success and not mfa_passed:
print("MFA required before granting access")
# MFA reduces unauthorized access risk.
# Example: verify certificate expiration
cert_expiry_days = 10
if cert_expiry_days < 30:
print("Renew certificate to maintain secure communications")
# Prevents SSL expiration issues.
# Example: flagging credit card numbers
message = "My card is 4111-1111-1111-1111"
if "4111" in message:
print("DLP Alert: Potential CHD detected")
# Detects and flags sensitive data.
# Simulate SIEM alert logic
alert = {"source": "firewall", "severity": "high"}
if alert["severity"] == "high":
print("SOAR: Auto-isolate endpoint")
# SOAR enables automated defense actions.
# Example: EDR behavior alert
process_behavior = "keylogger"
if process_behavior == "keylogger":
print("EDR Alert: Keylogger behavior detected")
# Detects malicious endpoint behavior.
# Simulate security dashboard summary
alerts = ["EDR", "Firewall", "SIEM"]
print(f"Dashboard summary: {len(alerts)} alerts from multiple sources")
# Centralized alert overview.
# Example: detect beaconing intervals
intervals = [60, 61, 60, 62]
if all(i in range(58, 63) for i in intervals):
print("Suspicious beaconing pattern detected")
# Regular outbound patterns raise alerts.
# Example: detect non-standard port use
port = 8081
if port not in [80, 443]:
print("Unusual port usage detected")
# Flagging rogue port activity.
# Example: detect lateral connections
connections = ["192.168.1.5", "192.168.1.6"]
if len(connections) > 1:
print("Lateral movement suspected")
# Multiple internal hops may imply compromise.
# Example: lookup domain reputation
reputation_score = 2 # scale 0-10
if reputation_score < 3:
print("Malicious domain flagged")
# Threat intel helps correlate DNS activity.
# Example: flag abnormal data size
mb_sent = 1500
if mb_sent > 1000:
print("Potential exfiltration detected")
# Data egress alert for review.
# Example: alert on high CPU usage
cpu_usage = 92
if cpu_usage > 90:
print("High CPU usage detected")
# Review top processes consuming CPU.
# Example: detect blacklisted process
running_processes = ["explorer.exe", "malicious_tool.exe"]
if "malicious_tool.exe" in running_processes:
print("Unauthorized process running")
# Flag unexpected binaries.
# Example: detect file modification
modified_files = ["C:/Windows/System32/drivers/etc/hosts"]
if "hosts" in modified_files[0]:
print("Hosts file modified")
# Investigate for malicious redirection.
# Example: detect unknown task
scheduled_tasks = ["\Microsoft\Windows\UpdateTask", "\MaliciousScript"]
if "\MaliciousScript" in scheduled_tasks:
print("Suspicious scheduled task found")
# Audit task scheduler entries.
# Example: detect large archive creation
archive_size_gb = 5
if archive_size_gb > 1:
print("Potential data exfiltration")
# Check archive destination and upload logs.
# Example: detect suspicious account creation
new_accounts = ["admin2"]
if "admin2" in new_accounts:
print("Unexpected admin account created")
# Investigate account origin and privileges.
# Example: detect abnormal app connections
app_connections = ["192.168.1.100", "203.0.113.45"]
if "203.0.113.45" in app_connections:
print("Suspicious external connection detected")
# Log and alert unfamiliar destinations.
# Example: detect service failure
services_status = {"WebApp": "Stopped"}
if services_status["WebApp"] == "Stopped":
print("Web application service disrupted")
# Restart and review logs.
# Example: detect missing logs
log_entries = ["Error 1", "Error 2"]
if len(log_entries) < 10:
print("Possible log tampering detected")
# Validate log integrity.
# Example: Behavior baseline check
normal_hours = range(8, 18)
login_hour = 3 # Detected login at 3 AM
if login_hour not in normal_hours:
print("⚠️ Alert: Login outside of baseline hours")
# Unusual behavior is detected and flagged.
# Example: Check SPF status from header
email_header = "Received-SPF: fail"
if "fail" in email_header:
print("❌ SPF failed - possible spoofed email")
# Failing SPF can indicate email spoofing.
import hashlib
# Example: Generate SHA256 hash of a file
with open("file.exe", "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
print("File hash:", file_hash)
# Use this hash to compare with threat intel databases.
# Example: Python automation to block IP
malicious_ip = "10.10.10.10"
def block_ip(ip):
print(f"Blocking IP: {ip}")
# In production, insert firewall rule here
block_ip(malicious_ip)
# Automates action based on detection logic.
import re
log_entry = "Failed login from IP 192.168.1.100"
match = re.search(r"\d+\.\d+\.\d+\.\d+", log_entry)
if match:
print("🔍 IP extracted:", match.group())
# Extracts the IP address from the log using regex.
# Example: detect impersonation keyword in emails
email_content = "Urgent request from IT department"
if "urgent" in email_content.lower() and "IT" in email_content:
print("Possible social engineering detected")
# Check sender domain and urgency flags.
# Example: header mismatch detection
from_header = "support@secure.com"
reply_to = "attack@fake.com"
if from_header.split('@')[1] != reply_to.split('@')[1]:
print("Email header mismatch detected - phishing risk")
# Alert and quarantine the email.
# Example: detect base64 patterns
url = "http://example.com/?q=aHR0cDovL3ZpY3RpbS5jb20="
if "aHR0c" in url:
print("Obfuscated base64 payload detected")
# Decode and review payload.
# Example: detect lookalike domains
domain = "g00gle.com"
if "google" in domain and "0" in domain:
print("Possible spoofed domain detected")
# Flag for investigation.
# Example: detect unexpected scripting types
script_type = "Powershell"
if script_type not in ["Bash", "Python", "Shell"]:
print("Uncommon scripting usage - possible emerging threat")
# Review source and execution context.
# Example: shell snippet using tcpdump
# tcpdump -i eth0 port 443 -w capture.pcap
# This captures HTTPS traffic on eth0
# Useful for post-event packet analysis.
# Example SIEM rule concept (pseudocode)
if failed_logins > 5 and source_ip != whitelisted:
alert("Brute force attempt from unknown IP")
# Reduces false positives from allowed devices.
# Example: monitor suspicious process tree
parent = "explorer.exe"
child = "powershell.exe"
if parent == "explorer.exe" and child == "powershell.exe":
print("Suspicious endpoint process chain")
# Investigate PowerShell activity.
# Example: VirusTotal lookup (pseudo)
hash = "abcd1234"
if vt_lookup(hash) == "malicious":
print("File flagged as malware")
# Verify signature and uploader metadata.
# Example: sandbox alert (pseudocode)
if sandbox_report.contains("suspicious network call"):
print("Malicious behavior detected in sandbox")
# Block hash and alert SOC.
# No automated processes; mostly manual monitoring
print("Security ops at ad hoc stage; focus on awareness and documentation")
# Example: Standard checklist for incident response
def incident_checklist():
steps = ["Identify", "Contain", "Eradicate", "Recover"]
for step in steps:
print(f"Perform: {step}")
incident_checklist()
# Follows documented process steps for incidents
# Example: Role assignment in SOC
soc_roles = {"Analyst": "Monitor alerts", "Responder": "Handle incidents"}
print("SOC Roles:", soc_roles)
# Defines clear security roles
# Calculate average MTTR from sample data
mttr_values = [30, 45, 25, 60]
average_mttr = sum(mttr_values) / len(mttr_values)
print(f"Average MTTR: {average_mttr} minutes")
# Uses metrics to improve performance
# Simulate automated incident response trigger
incident_detected = True
if incident_detected:
print("Triggering automated containment playbook")
# Automation reduces manual intervention
# Sample threat intel feed processing
threat_feed = ["malicious_ip_1", "malicious_ip_2"]
detected_ip = "malicious_ip_1"
if detected_ip in threat_feed:
print("Alert: IP flagged by threat intelligence")
# Enhances detection accuracy
# Simulate IR playbook step
def run_playbook():
steps = ["Detect", "Analyze", "Contain", "Eradicate", "Recover"]
for step in steps:
print(f"Executing: {step}")
run_playbook()
# Structured approach to incidents
# Example: Notify stakeholders
def notify_team(message):
print(f"Notify SOC team: {message}")
notify_team("New phishing campaign detected")
# Enables rapid collaboration
# Simple continuous monitor simulation
import time
def monitor():
print("Monitoring network traffic...")
time.sleep(2)
print("No anomalies detected")
monitor()
# Constant vigilance on network activity
# Sample dashboard data
metrics = {"MTTD": 15, "MTTR": 45, "Alerts": 200}
for key, value in metrics.items():
print(f"{key}: {value}")
# Visualize SOC effectiveness
# Calculate MTTD example
detection_times = [10, 20, 15, 12] # in minutes
mttd = sum(detection_times) / len(detection_times)
print(f"Mean Time to Detect: {mttd} minutes")
# Calculate MTTR example
response_times = [30, 40, 25, 35]
mttr = sum(response_times) / len(response_times)
print(f"Mean Time to Respond: {mttr} minutes")
# Track alert count
alerts = 500
print(f"Alerts generated today: {alerts}")
# Calculate false positive rate
false_positives = 50
total_alerts = 500
false_positive_rate = (false_positives / total_alerts) * 100
print(f"False Positive Rate: {false_positive_rate}%")
# Calculate true positive rate
true_positives = 450
tpr = (true_positives / total_alerts) * 100
print(f"True Positive Rate: {tpr}%")
# Incident count example
incidents = 25
print(f"Confirmed incidents this month: {incidents}")
# Analyst efficiency calculation
alerts_handled = 200
hours_worked = 8
efficiency = alerts_handled / hours_worked
print(f"Alerts handled per hour: {efficiency}")
# Time to contain example
containment_times = [20, 30, 15]
avg_contain_time = sum(containment_times) / len(containment_times)
print(f"Average Time to Contain: {avg_contain_time} minutes")
# Time to recovery example
recovery_times = [40, 50, 35]
avg_recovery_time = sum(recovery_times) / len(recovery_times)
print(f"Average Time to Recovery: {avg_recovery_time} minutes")
# Alert triage time example
triage_times = [5, 7, 6]
avg_triage_time = sum(triage_times) / len(triage_times)
print(f"Average Alert Triage Time: {avg_triage_time} minutes")
# Example KPI definition dictionary
kpis = {
"Incident Response Time": "Average time to resolve incidents",
"Alert Accuracy": "Ratio of true alerts to total alerts"
}
print("Defined KPIs:", kpis)
# Example SLO for alert response
slo_response_time = 15 # minutes
print(f"SLO: Respond to alerts within {slo_response_time} minutes")
# Simulate KPI data collection
alerts_handled = 200
incidents_resolved = 30
print(f"Alerts handled: {alerts_handled}, Incidents resolved: {incidents_resolved}")
# Simple print dashboard mockup
dashboard = {"MTTR": 25, "MTTD": 12, "Alerts": 150}
for metric, value in dashboard.items():
print(f"{metric}: {value}")
# SLA example
sla = {"Response Time": "Less than 30 minutes"}
print("SLA terms:", sla)
# Prioritize alerts example
alerts = [{"id":1, "severity":"high"}, {"id":2, "severity":"low"}]
high_priority = [a for a in alerts if a["severity"] == "high"]
print("High priority alerts:", high_priority)
# SLA breach check
response_time = 40
if response_time > 30:
print("SLA breached: Investigate and improve process")
# Validate KPI data example
def validate(value):
if value < 0:
return False
return True
print("Data valid?", validate(10))
# Simple trend calculation
data = [20, 22, 25, 30]
trend = data[-1] - data[0]
print(f"Trend increase over period: {trend}")
# KPI review example
def review_kpi(current, target):
if current > target:
print("KPI on track")
else:
print("KPI needs improvement")
review_kpi(25, 20)
# Example role assignment
soc_team = {"Manager": "Oversees SOC activities and strategy"}
print("SOC Manager Role:", soc_team["Manager"])
# Level 1 analyst duties
def level1_monitor(alert):
if alert == "high":
print("Escalate to Level 2")
else:
print("Monitor alert")
level1_monitor("medium")
# Level 2 analyst investigation
def investigate(incident):
print(f"Investigating incident: {incident}")
investigate("Suspicious login")
# Threat hunting example
def threat_hunt(data):
suspicious = [d for d in data if d == "anomaly"]
print("Threat hunting results:", suspicious)
threat_hunt(["normal", "anomaly", "normal"])
# Incident response simulation
def respond(incident):
print(f"Containing threat: {incident}")
respond("Malware detected")
# Forensics example
def analyze_evidence(evidence):
print(f"Analyzing evidence: {evidence}")
analyze_evidence("Disk image")
# Threat intel processing
def process_intel(intel):
print(f"Processing intel: {intel}")
process_intel("New phishing campaign")
# SOC tool config
def configure_tool(tool):
print(f"Configuring tool: {tool}")
configure_tool("SIEM")
# Compliance check
def check_compliance():
print("Compliance with GDPR and PCI-DSS verified")
check_compliance()
# Training session
def train(topic):
print(f"Training on: {topic}")
train("Incident response procedures")
# Simulate false positive filtering
alerts = ["true", "false", "false", "true"]
true_alerts = [a for a in alerts if a == "true"]
print(f"Filtered true alerts: {len(true_alerts)}")
# Basic context check simulation
def evaluate_context(event):
if "suspicious" in event:
print("Needs human review")
else:
print("Auto-processed")
evaluate_context("suspicious login attempt")
# Example complex decision
def complex_incident(incident):
print(f"Escalate complex incident: {incident}")
complex_incident("Advanced persistent threat")
# Data validation
def validate_data(data):
if not data:
print("Invalid data - manual review needed")
else:
print("Data accepted for automation")
validate_data([])
# Maintenance alert
print("Schedule weekly update and tuning of automation rules")
# Integration check example
tools = ["SIEM", "SOAR", "EDR"]
print(f"Integrating tools: {', '.join(tools)}")
# Warning message
print("Balance automation with human oversight")
# Rule update reminder
print("Review and update detection rules monthly")
# Privacy compliance check
print("Ensure automation complies with GDPR and HIPAA")
# Cost-benefit analysis mockup
cost = 10000
benefit = 15000
print("Automation ROI:", benefit - cost)
# PDCA cycle simulation
steps = ["Plan", "Do", "Check", "Act"]
for step in steps:
print(f"Executing: {step}")
# Simple RCA example
def root_cause(issue):
causes = {"Malware": "Phishing email", "Breach": "Unpatched system"}
return causes.get(issue, "Unknown cause")
print(root_cause("Malware"))
# Feedback collection
feedback = ["Improve alert clarity", "Faster incident response"]
for f in feedback:
print("Feedback:", f)
# Metrics review mockup
kpis = {"MTTR": 30, "MTTD": 15}
print("Reviewing KPIs:")
for k, v in kpis.items():
print(f"{k}: {v} minutes")
# Training reminder
print("Schedule monthly SOC skill development sessions")
# Automation tuning log
print("Automation rules updated based on recent incident trends")
# Post-mortem template
def post_mortem(incident):
print(f"Analyzing incident: {incident}")
print("Lessons learned and next steps")
post_mortem("Ransomware attack")
# Doc update note
print("Incident response playbook updated after latest attack")
# Tech assessment
tools = ["SIEM", "SOAR", "EDR"]
print("Evaluating tools:", ", ".join(tools))
# Threat feed simulation
threats = ["New malware variant", "Phishing wave"]
for threat in threats:
print("Monitor threat:", threat)
# Example: simulate log aggregation from two sources
logs_source1 = ['login_success', 'file_access']
logs_source2 = ['login_fail', 'network_connect']
all_logs = logs_source1 + logs_source2
print("Aggregated logs:", all_logs)
# Example: normalize log entries
raw_logs = [{'src': '192.168.1.5', 'msg': 'Login success'}, {'source_ip': '192.168.1.6', 'message': 'Login fail'}]
normalized_logs = []
for log in raw_logs:
normalized = {
'ip': log.get('src') or log.get('source_ip'),
'event': log.get('msg') or log.get('message')
}
normalized_logs.append(normalized)
print(normalized_logs)
# Example: alert on failed login attempts
events = ['login_fail', 'login_fail', 'login_success']
fail_count = events.count('login_fail')
if fail_count > 1:
print("Alert: Multiple failed login attempts detected!")
# Example: correlate failed login followed by successful login
events = [('login_fail', 'user1'), ('login_fail', 'user1'), ('login_success', 'user1')]
fail_attempts = [e for e in events if e[0] == 'login_fail' and e[1] == 'user1']
success = any(e for e in events if e == ('login_success', 'user1'))
if len(fail_attempts) > 1 and success:
print("Potential brute force detected for user1")
# Example: simple log retention filter
import datetime
logs = [{'date': datetime.date(2024, 7, 1), 'event': 'login'}, {'date': datetime.date(2023, 1, 1), 'event': 'file_access'}]
cutoff_date = datetime.date(2024, 1, 1)
recent_logs = [log for log in logs if log['date'] >= cutoff_date]
print("Logs retained:", recent_logs)
# Example: generate simple compliance report
events = ['login', 'file_access', 'unauthorized_access']
compliance_events = [e for e in events if e != 'unauthorized_access']
print(f"Compliance events: {len(compliance_events)} out of {len(events)}")
# Example: find timeline of login events
timeline = [('2024-07-20 10:00', 'login_fail'), ('2024-07-20 10:05', 'login_success')]
for time, event in timeline:
print(f"At {time}, event: {event}")
# Example: check IP against threat feed
threat_ips = ['203.0.113.10']
event_ip = '203.0.113.10'
if event_ip in threat_ips:
print("Alert: Event from known malicious IP")
# Example: simulate simple log queue processing
log_queue = ['event1', 'event2', 'event3']
while log_queue:
event = log_queue.pop(0)
print(f"Processing {event}")
# Example: basic anomaly score calculation
events = [1, 1, 1, 5, 1]
avg = sum(events)/len(events)
for e in events:
if e > avg * 2:
print(f"Anomaly detected: event value {e} is unusually high")
# Example: simulate simple automation trigger
event = "phishing_email_detected"
if event == "phishing_email_detected":
print("Trigger: Quarantine email and alert SOC team")
# Example: simple playbook steps
def quarantine_endpoint():
print("Endpoint quarantined")
def alert_team():
print("SOC team alerted")
# Execute playbook
quarantine_endpoint()
alert_team()
# Example: prioritize incident by severity
incidents = {'incident1': 3, 'incident2': 7} # severity scale 1-10
priority = max(incidents, key=incidents.get)
print(f"Highest priority incident: {priority}")
# Example: pseudo API call to block IP
def block_ip(ip):
print(f"Blocking IP: {ip}")
block_ip("192.168.1.100")
# Example: enrich alert with user info
alert = {"ip": "203.0.113.10"}
user_info = {"owner": "John Doe", "role": "Admin"}
alert.update(user_info)
print(alert)
# Example: automated password reset
def reset_password(user):
print(f"Password reset for {user}")
reset_password("john.doe")
# Example: update case status
case = {"id": 101, "status": "open"}
case["status"] = "investigating"
print(case)
# Example: send alert notification
def notify_team(message):
print(f"Notification sent: {message}")
notify_team("Incident #101 requires immediate attention")
# Example: calculate average response time
response_times = [5, 7, 4]
avg_time = sum(response_times) / len(response_times)
print(f"Average response time: {avg_time} minutes")
# Example: validate playbook steps before execution
playbook_steps = ['validate_alert', 'quarantine_host', 'notify_team']
if 'quarantine_host' in playbook_steps:
print("Playbook valid, ready to execute")
# Example: Simple alert detection script
alert = "unauthorized_access"
if alert == "unauthorized_access":
print("Incident detected: Unauthorized access")
# Alert triggers incident response
# Example: Classify incident severity based on type
incident_type = "malware"
severity_map = {"malware": 5, "phishing": 3, "dos": 4}
severity = severity_map.get(incident_type, 1)
print(f"Incident severity: {severity}")
# Assign severity for prioritization
# Example: Lookup response action for incident type
response_plans = {"malware": "isolate system", "phishing": "block sender"}
incident = "malware"
print(f"Response action: {response_plans[incident]}")
# Plan guides remediation steps
# Example: Escalate if severity is high
severity = 7
if severity > 5:
print("Escalate incident to SOC Manager")
# Timely escalation improves handling
# Example: Isolate a compromised host
def isolate_host(host_id):
print(f"Host {host_id} isolated from network")
isolate_host("host123")
# Immediate containment action
# Example: Remove malware from a system
def clean_malware(system_id):
print(f"Malware removed from {system_id}")
clean_malware("host123")
# System cleaned and ready for recovery
# Example: Document root cause of incident
root_cause = "phishing email"
print(f"Root cause identified: {root_cause}")
# Essential for continuous improvement
# Example: Create incident report summary
report = {"id": 101, "status": "closed", "details": "Malware outbreak contained"}
print(report)
# Documentation aids future preparedness
# Example: Schedule a security training session
def schedule_training(topic):
print(f"Training scheduled on: {topic}")
schedule_training("Phishing Awareness")
# Education empowers users
# Example: Update incident response playbook
playbook = ["detect", "contain", "eradicate"]
playbook.append("review")
print(f"Updated playbook steps: {playbook}")
# Continuous improvement cycle
# Example: Simple log monitor
logs = ["login success", "failed login", "file accessed"]
for log in logs:
if "failed login" in log:
print("Alert: Failed login detected")
# Detect suspicious login attempts
# Example: Simulate log collection
def collect_logs(agent_installed):
if agent_installed:
print("Logs collected via agent")
else:
print("Logs collected agentlessly")
collect_logs(True)
# Different collection methods
# Example: Log rotation simulation
def rotate_logs(log_count):
if log_count > 1000:
print("Rotating logs to archive")
else:
print("Log size within limits")
rotate_logs(1500)
# Ensure logs don’t grow uncontrollably
# Example: Centralized log receipt
central_logs = []
def receive_log(log):
central_logs.append(log)
print(f"Received log: {log}")
receive_log("User login")
# Aggregate logs centrally
# Example: Correlate multiple logs
logs = ["failed login", "failed login", "account locked"]
if logs.count("failed login") > 1 and "account locked" in logs:
print("Multiple failed logins followed by lockout")
# Detect brute-force attacks
# Example: Trigger alert on threshold
failed_attempts = 5
if failed_attempts > 3:
print("Alert: Multiple failed login attempts")
# Prompt security team to investigate
# Example: Send event to SIEM
event = {"type": "login", "status": "fail"}
def send_to_siem(event):
print(f"Event sent to SIEM: {event}")
send_to_siem(event)
# Centralized security event management
# Example: Monitor suspicious port usage
port = 23
if port == 23:
print("Alert: Telnet traffic detected, consider blocking")
# Telnet is insecure and often blocked
# Example: Check for unauthorized process
running_processes = ["chrome.exe", "unknown.exe"]
if "unknown.exe" in running_processes:
print("Alert: Unknown process running")
# Endpoint security check
# Example: Update monitoring rules
monitoring_rules = ["rule1", "rule2"]
monitoring_rules.append("rule3")
print(f"Updated rules: {monitoring_rules}")
# Adapt to evolving threat landscape
# Example: Basic disaster recovery checklist
disaster_plan = ["backup data", "restore servers", "test failover"]
print("Disaster recovery plan steps:")
for step in disaster_plan:
print(f"- {step}")
# Preparing recovery procedures
# Example: Identify critical business functions
critical_functions = ["customer support", "order processing", "IT operations"]
print(f"Critical functions to maintain: {critical_functions}")
# Prioritize business continuity efforts
# Example: Perform incremental backup
def backup(data, last_backup):
if data != last_backup:
print("Performing incremental backup")
else:
print("No changes detected")
backup("file_v2", "file_v1")
# Efficient backup approach
# Example: Check if recovery meets objectives
rto = 2 # hours
actual_recovery_time = 1.5
if actual_recovery_time <= rto:
print("Recovery within RTO")
else:
print("Recovery delayed")
# Measure recovery effectiveness
# Example: Switch to backup server
def failover(active, backup):
if not active:
print("Failing over to backup server")
else:
print("Primary server operational")
failover(False, True)
# Maintain service continuity
# Example: Notify stakeholders
def notify(message):
print(f"Notification sent: {message}")
notify("Data center outage detected, recovery underway")
# Effective communication reduces confusion
# Example: Conduct recovery drill
def conduct_drill():
print("Starting disaster recovery drill")
print("All systems restored successfully")
conduct_drill()
# Validate readiness
# Example: Verify backup integrity
backup_hash = "abc123"
current_hash = "abc123"
if backup_hash == current_hash:
print("Backup data integrity verified")
else:
print("Backup data corrupted")
# Prevent recovery issues
# Example: Check compliance status
compliant = True
if compliant:
print("DR plan meets compliance requirements")
else:
print("Compliance gaps detected")
# Avoid regulatory penalties
# Example: Update DR plan
plan_steps = ["backup", "restore", "test"]
plan_steps.append("review")
print(f"Updated DR plan steps: {plan_steps}")
# Keep plans current and effective
# Example: Basic active ping scan simulation
import os
def ping_host(ip):
response = os.system(f"ping -c 1 {ip} > /dev/null 2>&1")
if response == 0:
print(f"{ip} is alive")
else:
print(f"{ip} is unreachable")
ips = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
for ip in ips:
ping_host(ip)
# Simple script to detect live hosts on a subnet
# Example: Simulate internal vs external scan
def scan_scope(scope):
if scope == "internal":
print("Scanning internal network assets...")
elif scope == "external":
print("Scanning public-facing IPs...")
else:
print("Unknown scan scope")
scan_scope("internal")
scan_scope("external")
# Different focus depending on scan type
# Example: Represent agent vs agentless scanning
class Scanner:
def __init__(self, method):
self.method = method
def scan(self, target):
if self.method == "agent":
print(f"Scanning {target} with installed agent for detailed data")
elif self.method == "agentless":
print(f"Scanning {target} remotely without agent")
else:
print("Unknown scanning method")
scanner1 = Scanner("agent")
scanner2 = Scanner("agentless")
scanner1.scan("Host A")
scanner2.scan("Host B")
# Illustrates different scanning approaches
# Example: Check scanning type and output detail level
def perform_scan(credentialed):
if credentialed:
print("Performing credentialed scan: deep system checks")
else:
print("Performing non-credentialed scan: external view only")
perform_scan(True)
perform_scan(False)
# Shows differences in scan depth based on credentials
# Example: Simulating passive vs active scan behavior
def scan_network(scan_type):
if scan_type == "passive":
print("Listening to network traffic quietly (passive scan).")
elif scan_type == "active":
print("Sending probes to devices (active scan).")
else:
print("Unknown scan type.")
scan_network("passive")
scan_network("active")
# Demonstrates difference between passive and active scanning
# Example: Pseudocode illustrating static vs dynamic analysis
def analyze_code(code, analysis_type):
if analysis_type == "static":
print("Analyzing code without execution (static analysis).")
# Example: check for hardcoded passwords
if "password" in code:
print("Warning: Hardcoded password found.")
elif analysis_type == "dynamic":
print("Running code and monitoring behavior (dynamic analysis).")
# Example: simulate runtime error detection
try:
exec(code)
except Exception as e:
print(f"Runtime error detected: {e}")
sample_code = "print('Hello')\npassword = '1234'"
analyze_code(sample_code, "static")
analyze_code("print(1/0)", "dynamic")
# Shows differences between static and dynamic code evaluation
# Example: Simulate critical infrastructure scan with caution flag
def scan_critical_system(system_name, cautious=True):
if cautious:
print(f"Performing low-impact scan on {system_name} to avoid disruption.")
else:
print(f"Performing standard scan on {system_name}. Use with care!")
scan_critical_system("Power Grid SCADA")
scan_critical_system("Water Treatment System", cautious=False)
# Highlights importance of careful scanning in critical infrastructure
# Example: Identify safe scan mode for OT systems
def ot_scan_mode(system_type):
if system_type in ["OT", "ICS", "SCADA"]:
print(f"Using passive scan mode for {system_type} system to prevent disruptions.")
else:
print("Standard scan mode applicable.")
ot_scan_mode("SCADA")
ot_scan_mode("Corporate Network")
# Emphasizes the importance of scan mode based on system type
# Example: Simple network device mapping simulation
industrial_devices = {
"PLC1": "192.168.10.10",
"RTU2": "192.168.10.20",
"HMI": "192.168.10.30"
}
for device, ip in industrial_devices.items():
print(f"Found device {device} at IP {ip}")
# Lists discovered industrial control devices and their IPs
# Example: Compare current config to baseline
baseline_config = {"firewall_enabled": True, "patch_level": "2024-07"}
current_config = {"firewall_enabled": True, "patch_level": "2024-06"}
for setting in baseline_config:
if baseline_config[setting] != current_config.get(setting):
print(f"Configuration drift detected for {setting}")
else:
print(f"{setting} is compliant with baseline")
# Detects configuration differences affecting compliance
# Example: Basic segmentation check simulation
def check_access(segment, data_sensitivity):
if segment == "public" and data_sensitivity == "high":
print("Warning: Sensitive data exposed on public segment!")
else:
print("Segmentation appropriate for data sensitivity.")
check_access("public", "high")
check_access("restricted", "high")
# Highlights importance of proper segmentation for sensitive data
# Example: Compliance check pseudocode
pci_requirements = {"firewall": True, "encryption": True}
system_config = {"firewall": True, "encryption": False}
for control in pci_requirements:
if pci_requirements[control] != system_config.get(control):
print(f"Non-compliant: {control} requirement not met")
else:
print(f"{control} compliant")
# Checks system settings against PCI DSS requirements
# Example: Simulate tool selection logic
tools = ["Nessus", "OpenVAS", "Qualys"]
budget = 1000
for tool in tools:
if tool == "OpenVAS":
print(f"{tool} is open-source and free.")
elif tool == "Nessus" and budget > 500:
print(f"{tool} selected based on budget and features.")
elif tool == "Qualys" and budget > 1000:
print(f"{tool} is premium and chosen for enterprise use.")
# Illustrates selecting vulnerability assessment tools based on budget and features
# Example: Basic scan run simulation
def run_scan(tool):
print(f"Starting scan using {tool}...")
# Simulate scanning duration
import time
time.sleep(1)
print(f"Scan with {tool} complete. Vulnerabilities found: 3")
run_scan("Nessus")
run_scan("OpenVAS")
run_scan("Qualys")
# Demonstrates simple scan execution for different tools
# Example: Simulate simple web scan process
web_tools = ["Burp Suite", "ZAP", "Nikto"]
for tool in web_tools:
print(f"Launching {tool} for web vulnerability scan...")
# Simulated scanning action
print(f"{tool} scan completed. Issues detected: 5\n")
# Illustrates workflow of web vulnerability scanners
# Example: Basic usage simulation for Nmap and Metasploit
def tool_action(tool):
if tool == "Nmap":
print("Scanning network for live hosts and open ports...")
elif tool == "Metasploit":
print("Launching exploit module against target host...")
tool_action("Nmap")
tool_action("Metasploit")
# Demonstrates common usage scenarios for multipurpose tools
# Example: Cloud tool usage overview
cloud_tools = ["ScoutSuite", "Prowler", "Pacu"]
for tool in cloud_tools:
print(f"Running {tool} to assess AWS security posture...")
# Highlights roles of cloud security tools in auditing and penetration testing
# Example: Pseudocode for reverse engineering steps
def reverse_engineer(binary):
print(f"Loading binary {binary} into debugger...")
print("Setting breakpoints...")
print("Analyzing function calls and memory usage...")
print("Documenting findings for vulnerability assessment.")
reverse_engineer("malware_sample.exe")
# Illustrates basic reverse engineering workflow with debugger
# Example: Filter scan results for critical severity
scan_results = [
{"id":1, "severity":"medium"},
{"id":2, "severity":"critical"},
{"id":3, "severity":"low"},
]
critical_findings = [r for r in scan_results if r["severity"] == "critical"]
print(f"Critical issues found: {len(critical_findings)}")
# Focuses remediation efforts on highest severity vulnerabilities
# Example: Identify repeated vulnerabilities across hosts
scan_data = [
{"host":"host1", "vuln":"CVE-2021-1234"},
{"host":"host2", "vuln":"CVE-2021-1234"},
{"host":"host3", "vuln":"CVE-2022-5678"},
]
vuln_counts = {}
for entry in scan_data:
vuln = entry["vuln"]
vuln_counts[vuln] = vuln_counts.get(vuln, 0) + 1
for vuln, count in vuln_counts.items():
print(f"{vuln} found on {count} hosts")
# Highlights frequently occurring vulnerabilities for priority fixing
# Example: Validate scan results for false positives
scan_results = [{"id":1, "vulnerable":True}, {"id":2, "vulnerable":False}]
def validate_results(results):
for res in results:
if res["vulnerable"]:
print(f"Vulnerability confirmed on ID {res['id']}")
else:
print(f"Potential false positive on ID {res['id']} - needs manual review")
validate_results(scan_results)
# Helps differentiate true vulnerabilities from false alerts
# Example: Prioritize by asset type
scan_results = [
{"asset":"database", "vuln_score":7},
{"asset":"workstation", "vuln_score":5},
]
priority_order = {"database": 1, "workstation": 2}
sorted_results = sorted(scan_results, key=lambda x: (priority_order[x["asset"]], -x["vuln_score"]))
for r in sorted_results:
print(f"Asset: {r['asset']}, Score: {r['vuln_score']}")
# Sorts vulnerabilities prioritizing critical assets first
# Example: Link vulnerabilities to threats
vulnerabilities = [{"id":1, "cve":"CVE-2023-1234"}]
threat_db = {"CVE-2023-1234": "Active exploitation by ransomware"}
for vuln in vulnerabilities:
threat = threat_db.get(vuln["cve"], "No known threat")
print(f"Vulnerability {vuln['cve']} linked to threat: {threat}")
# Helps security teams understand real-world impact of vulnerabilities
# Example: Simple consolidation logic
reports = [
{"id": 1, "vuln": "X", "severity": "high"},
{"id": 2, "vuln": "X", "severity": "high"},
{"id": 3, "vuln": "Y", "severity": "medium"},
]
unique_vulns = {}
for r in reports:
unique_vulns[r["vuln"]] = r
for vuln, details in unique_vulns.items():
print(f"Vulnerability: {vuln}, Severity: {details['severity']}")
# Removes duplicates to create concise reports
# Example: Prioritize vulnerabilities by CVSS score
vulns = [{"name":"vuln1", "cvss":9.0}, {"name":"vuln2", "cvss":5.4}]
sorted_vulns = sorted(vulns, key=lambda x: x["cvss"], reverse=True)
for v in sorted_vulns:
print(f"{v['name']} with CVSS score {v['cvss']}")
# Sorts vulnerabilities from highest to lowest risk
# Example: Simplified CVSS vector interpretation
cvss_vector = {"AC": "Low", "PR": "None", "UI": "None", "C": "High", "I": "High", "A": "High"}
def interpret_vector(vec):
if vec["AC"] == "Low" and vec["PR"] == "None":
print("Easy to exploit vulnerability")
if vec["C"] == "High" and vec["I"] == "High" and vec["A"] == "High":
print("High impact on system security")
interpret_vector(cvss_vector)
# Helps assess exploitability and impact dimensions of CVSS
# Example: Assess exploitability and weaponization
exploit_info = {"exploit_available": True, "complexity": "Low"}
if exploit_info["exploit_available"] and exploit_info["complexity"] == "Low":
print("High weaponization potential - urgent fix needed")
else:
print("Lower urgency based on exploitability")
# Helps determine threat from active exploits
# Example: Calculate risk score with asset value
vulnerability_score = 7.0
asset_value = {"database": 10, "workstation": 5}
asset = "database"
risk_score = vulnerability_score * asset_value.get(asset, 1)
print(f"Risk score for {asset}: {risk_score}")
# Combines vulnerability and asset importance for prioritization
# Example: Zero-day mitigation logic
zero_day_detected = True
if zero_day_detected:
print("Apply workarounds and increase monitoring")
else:
print("Regular patching process applies")
# Emphasizes quick action for unpatched critical vulnerabilities
# Example: Prioritize remediation by combined risk factors
vulnerabilities = [
{"id":1, "cvss":9.0, "asset_value":10, "exploit":True},
{"id":2, "cvss":6.0, "asset_value":3, "exploit":False},
]
def risk_score(vuln):
score = vuln["cvss"] * vuln["asset_value"]
if vuln["exploit"]:
score *= 1.5
return score
sorted_vulns = sorted(vulnerabilities, key=risk_score, reverse=True)
for v in sorted_vulns:
print(f"Vuln ID {v['id']} prioritized with risk score: {risk_score(v):.2f}")
# Ensures high-risk vulnerabilities fixed first based on multiple factors
# Example: firewall + antivirus + encryption layers
# No direct code; architecture concept
# Example: VLAN config snippet (conceptual)
# VLAN 10 for finance, VLAN 20 for dev
# RBAC example concept: role "admin" vs "user"
# IDS alert pseudocode
if packet.is_malicious():
alert_admin()
block_packet()
# Command line example: apt update && apt upgrade
sudo apt update
sudo apt upgrade -y
# Antivirus scan command example
clamscan -r /home/user
# Example: disable ssh root login in sshd_config
PermitRootLogin no
# Log monitoring with tail command
tail -f /var/log/auth.log
# No code, this is organizational control
# Backup example using rsync
rsync -av --delete /important/data /backup/location
# Example: Python regex to allow only alphanumeric input
import re
def validate_input(input_str):
pattern = "^[a-zA-Z0-9]+$"
return re.match(pattern, input_str) is not None
print(validate_input("";
return <div>{name}</div>
# Example: avoid eval usage
// Instead of eval(code), use safer alternatives
# Example: DOMPurify in JS to sanitize input
const clean = DOMPurify.sanitize(userInput);
# Use automated tools like OWASP ZAP
zap-cli quick-scan http://example.com
# HTTP header example
X-XSS-Protection: 1; mode=block
# Unsafe C example prone to buffer overflow
char buffer[10];
strcpy(buffer, "This string is way too long!");
# Example: malloc misuse in C (conceptual)
char *ptr = malloc(10);
strcpy(ptr, "Overflowing this buffer causes heap corruption");
# Classic stack overflow vulnerability in C
void func() {
char buffer[10];
gets(buffer); // unsafe function
}
# Safer alternative example
strncpy(buffer, input, sizeof(buffer)-1);
buffer[sizeof(buffer)-1] = '\0';
# Enabled via OS configuration; no code snippet
# Compiler flag -fstack-protector enables this protection
gcc -fstack-protector source.c -o program
# Enabled in OS or compiler; example config
execstack -c program
# Compiler-level support, no direct code
# Modern malloc implementations provide protections
# Tools like Valgrind or AddressSanitizer
clang -fsanitize=address source.c -o program
# Unsafe LDAP filter construction example (Python)
user_input = "*)(&(objectClass=*))"
ldap_filter = "(cn=" + user_input + ")"
# This could manipulate the filter to return all entries
# Unsafe XML parsing (Python lxml example)
user_input = "<!ENTITY xxe SYSTEM 'file:///etc/passwd'>"
xml_data = "<data>" + user_input + "</data>"
# Parsing this can cause sensitive file disclosure
# Vulnerable code example (Python Flask)
filename = request.args.get('file')
with open('/var/www/uploads/' + filename) as f:
data = f.read()
# If filename = "../../etc/passwd" attacker can read system files
# Secure SQL example using Python sqlite3
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
# Example: sanitize filename to prevent directory traversal
import os
filename = os.path.basename(user_input)
# Example with SQLAlchemy ORM
user = session.query(User).filter(User.name == user_input).first()
# WAF rule example (conceptual)
if request.matches_pattern(".*('|\").*;.*--.*"):
block_request()
# Logging example in Python
if suspicious_input_detected:
logger.warning(f"Possible injection attempt: {user_input}")
# Conceptual example: lack of permission check
if user.is_authenticated():
show_sensitive_data()
# Missing role or ownership checks allow unauthorized access
# Vulnerable Python code example
import requests
url = request.args.get('url')
response = requests.get(url) # No validation on URL
# Basic URL validation example
from urllib.parse import urlparse
allowed_hosts = ['example.com']
url = urlparse(user_input_url)
if url.hostname not in allowed_hosts:
raise Exception("Blocked URL")
# Unsafe eval example in Python
eval(user_input) # Extremely dangerous
# Example: use JSON parsing instead of eval
import json
data = json.loads(user_input)
# Example: check ownership before access
if resource.owner_id != current_user.id:
raise PermissionError("Access denied")
# Example: Django permission decorators
from django.contrib.auth.decorators import login_required, permission_required
@login_required
@permission_required('app.view_resource')
def view_resource(request):
...
# Logging example
logger.info(f"Unauthorized access attempt by user {user.id}")
# Role assignment example
user.assign_role('read_only')
# Use tools like OWASP ZAP or Burp Suite for testing
# SDLC phases: Requirements, Design, Implementation, Testing, Deployment, Maintenance
# Example: STRIDE methodology
# Spoofing, Tampering, Repudiation, Information Disclosure, Denial of Service, Elevation of Privilege
# Use tools like GitHub pull requests with security checklists
# Example tool: Bandit for Python
bandit -r project/
# Example: OWASP ZAP scan against running web app
zap-cli quick-scan http://localhost:8000
# Example: Python safety check
safety check
# Examples: input validation, output encoding, error handling
# Safe error response example
try:
...
except Exception:
return "An error occurred. Please try again later."
# Example: mask sensitive info in logs
logger.info(f"User login: {user.id}, IP: {request.ip}")
# GitHub Actions example with SAST
name: Security Scan
on: [push]
jobs:
bandit_scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run Bandit
run: bandit -r .
# Example: run nmap script to detect vulns
nmap --script vuln target.com
# Example scoring: CVSS (Common Vulnerability Scoring System)
# CVSS score ranges from 0 to 10
# Schedule during maintenance windows to reduce downtime
# Use ITSM tools like Jira or ServiceNow
# Deploy patches first in staging environments
# Example: email or ticket updates
# Re-scan with vulnerability tools post-patch
nmap --script vuln target.com
# Use centralized repositories or ticketing systems
# Continuous vulnerability scanning example with OpenVAS
openvas-start
# Post-incident review meetings and reports
# Example: physical access logs when electronic badge system is down
# Formal risk acceptance forms or decisions
# Combining monitoring and manual processes
# E.g., manual reviews of access logs
# Physical escorting of visitors
# Enhanced audit trails
# Temporary SOP updates
# Awareness sessions
# Formal documentation and sign-offs
# Periodic audits and testing
# Lifecycle steps ensure thorough patch application
# Use staging environments for validation
# Night or weekend patch windows
# Example Puppet manifest to apply updates
exec { 'update_system':
command => '/usr/bin/apt-get update && /usr/bin/apt-get upgrade -y',
}
# Keep backup images before patching
# Patch logs with timestamps and results
# Email alerts or ticket notifications
# Automated compliance reports
# Scan systems for patch status
# Lessons learned incorporated into next cycles
# Scheduled during low user activity to reduce disruption
# Email notifications or meetings before window
# Backup systems before changes
# Example: Sunday 2 AM - 4 AM maintenance
# Ticketing system updates for traceability
# Verify backup integrity before patching
# Status updates via email or dashboards
# Run test scripts to check service availability
# Maintenance logs and incident reports
# Post-mortem meetings for lessons learned
try:
result = 10 / 0
except ZeroDivisionError:
print("Handled division by zero error")
# Risk acceptance form example (pseudo)
risk = {"description": "Legacy system vulnerability", "accepted_by": "CTO"}
import logging
logging.error("Exception occurred", exc_info=True)
# Example: Python list of assets assets = ["webserver", "database", "API endpoint"]
import nmap
scanner = nmap.PortScanner()
scanner.scan('192.168.1.0/24')
# Example: uptime monitoring
if uptime_percentage < 99.9:
alert_team()
# Python log retention example
import shutil, os
log_dir = "/var/log/security"
archive = "/backup/security_logs.zip"
shutil.make_archive(archive.replace(".zip", ""), 'zip', log_dir)
print("Logs archived for audit")
# STRIDE Threat Model Categories
stride = ["Spoofing", "Tampering", "Repudiation", "Information Disclosure", "Denial of Service", "Elevation of Privilege"]
for threat in stride:
print("Assessing threat:", threat)
# Example alignment check
framework = ["Access Control", "Asset Management", "Incident Response"]
org_controls = ["Access Control", "Incident Response"]
missing = set(framework) - set(org_controls)
print("Missing controls:", missing)
# Auto-create remediation task
vuln = "OpenSSH outdated"
if vuln:
print(f"Creating ticket for: {vuln}")
# Can integrate with Jira, ServiceNow
# Python: Trigger Nessus scan via API
import requests
scan_url = "https://nessus.local/api/v1/scans"
data = {"template_id": 1, "name": "Weekly Scan"}
response = requests.post(scan_url, json=data, verify=False)
print("Scan triggered:", response.status_code)
# Simulate integration with patch system
vuln = "Critical OpenSSL bug"
patch_system = "Patch Deployer"
print(f"{vuln} sent to {patch_system} for deployment")
# Example using Trivy in CI pipeline
os.system("trivy fs --severity HIGH /app/code")
# Send issue to GitHub from scan
import requests
issue = {"title": "Fix XSS", "body": "Found in index.js"}
requests.post("https://api.github.com/repos/org/repo/issues", json=issue)
# Simple MTTR calculation
import datetime
opened = datetime.datetime(2024, 5, 1)
closed = datetime.datetime(2024, 5, 4)
print("MTTR:", (closed - opened).days, "days")
# Basic fuzz test example
inputs = ["A" * x for x in range(1, 100)]
for input_str in inputs:
print("Testing input:", input_str)
# Example: Checkov CLI
os.system("checkov -d /terraform/code")
# MobSF API scan trigger
requests.post("http://mobsf/api/v1/scan", files={"file": open("apk_file.apk", "rb")})
# Check serial debug port status
import serial
try:
serial.Serial("/dev/ttyUSB0", 9600)
print("Debug port active")
except:
print("Debug port not found")
# Example: Real-time feed poller
import time
while True:
print("Checking feed... (simulated)")
time.sleep(10)
# Example of ML model prediction
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier()
X_train = [[0,1],[1,0]]; y_train = [1,0]
model.fit(X_train, y_train)
print("Prediction:", model.predict([[1,1]]))
# Simulate an attack methodology phase in Python
def simulate_attack_phase(phase):
print(f"[!] Simulating phase: {phase}")
simulate_attack_phase("Reconnaissance")
# List all Cyber Kill Chain phases
kill_chain = [
"Reconnaissance",
"Weaponization",
"Delivery",
"Exploitation",
"Installation",
"Command and Control",
"Actions on Objectives"
]
for phase in kill_chain:
print(f"Kill Chain Phase: {phase}")
# Represent a simple Diamond Model relationship in Python
diamond = {
"Adversary": "APT28",
"Capability": "Zero-day exploit",
"Infrastructure": "Malicious domain",
"Victim": "Energy sector"
}
for key, value in diamond.items():
print(f"{key}: {value}")
# Print sample ATT&CK tactics
tactics = [
"Initial Access",
"Execution",
"Persistence",
"Privilege Escalation",
"Defense Evasion",
"Credential Access"
]
for tactic in tactics:
print(f"MITRE ATT&CK Tactic: {tactic}")
# Pseudocode: Print OSSTMM testing aspects
osstmm_areas = ["Data Controls", "Process Control", "Operational Risk", "Metrics"]
for area in osstmm_areas:
print(f"OSSTMM Testing Focus: {area}")
# Python: Basic automated scan example for XSS vulnerability in URLs
import requests
def check_xss(url):
payload = "<script>alert('XSS')</script>"
test_url = f"{url}{payload}"
response = requests.get(test_url)
if payload in response.text:
print(f"Potential XSS vulnerability detected at {test_url}")
else:
print(f"No XSS vulnerability detected at {url}")
# Example usage
check_xss("http://example.com/search?q=")
# Python: Simple log anomaly detection example
def detect_incident(logs):
suspicious_keywords = ["failed login", "unauthorized", "error", "malware"]
incidents = []
for line in logs:
if any(keyword in line.lower() for keyword in suspicious_keywords):
incidents.append(line)
return incidents
# Example usage
logs = [
"User admin failed login at 10:15",
"File uploaded successfully",
"Unauthorized access attempt detected",
]
alerts = detect_incident(logs)
print("Detected incidents:")
for alert in alerts:
print(alert)
# Python: Simple chain of custody tracker
from datetime import datetime
chain_of_custody = []
def acquire_evidence(evidence_id, handler):
timestamp = datetime.now().isoformat()
chain_of_custody.append({
"evidence_id": evidence_id,
"handler": handler,
"action": "acquired",
"timestamp": timestamp
})
print(f"Evidence {evidence_id} acquired by {handler} at {timestamp}")
# Example usage
acquire_evidence("EV001", "Investigator Alice")
print(chain_of_custody)
# Python: Validate IoCs by checking format
def validate_ioc(ioc):
import re
ip_pattern = r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$"
hash_pattern = r"^[a-fA-F0-9]{64}$" # SHA-256 hash
if re.match(ip_pattern, ioc):
return "Valid IP"
elif re.match(hash_pattern, ioc):
return "Valid SHA-256 Hash"
else:
return "Unknown IoC format"
# Example usage
print(validate_ioc("192.168.1.100"))
print(validate_ioc("d2d2c3a3e4f1b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2"))
print(validate_ioc("http://malicious.com"))
# Python: Correlate login failures from different logs
def correlate_logs(log_sources):
failed_logins = []
for source in log_sources:
for entry in source:
if "failed login" in entry.lower():
failed_logins.append(entry)
return failed_logins
# Example usage
firewall_logs = ["User admin failed login", "Connection from 10.0.0.1"]
ids_logs = ["Failed login detected for user admin"]
all_failed = correlate_logs([firewall_logs, ids_logs])
print("Correlated Failed Logins:")
for entry in all_failed:
print(entry)
# Python: List suspicious files by extension in a directory
import os
def suspicious_files(path):
suspicious_exts = ['.exe', '.dll', '.bat']
files = [f for f in os.listdir(path) if os.path.splitext(f)[1].lower() in suspicious_exts]
print("Suspicious files found:")
for f in files:
print(f)
# Example usage (change path as needed)
suspicious_files("/tmp")
# Python: Simulate sandbox logging behavior (conceptual)
def sandbox_simulation(malware_name):
print(f"Executing {malware_name} in sandbox...")
activities = [
"Created file temp.dll",
"Attempted network connection to 10.10.10.10",
"Modified registry key HKCU\\Software\\Malware"
]
for activity in activities:
print(f"Sandbox log: {activity}")
# Example usage
sandbox_simulation("evil_malware.exe")
# Python: Simple containment flag and recovery status
class Incident:
def __init__(self, id):
self.id = id
self.contained = False
self.eradicated = False
self.recovered = False
def contain(self):
self.contained = True
print(f"Incident {self.id} contained.")
def eradicate(self):
if self.contained:
self.eradicated = True
print(f"Incident {self.id} eradicated.")
else:
print("Contain incident first.")
def recover(self):
if self.eradicated:
self.recovered = True
print(f"Incident {self.id} recovered.")
else:
print("Eradicate incident first.")
# Example usage
inc = Incident("INC1001")
inc.contain()
inc.eradicate()
inc.recover()
# Python: Simulate network device isolation status
class NetworkDevice:
def __init__(self, name):
self.name = name
self.isolated = False
def isolate(self):
self.isolated = True
print(f"Device {self.name} is now isolated from the network.")
def reconnect(self):
self.isolated = False
print(f"Device {self.name} reconnected to the network.")
# Example usage
device = NetworkDevice("Server01")
device.isolate()
device.reconnect()
# Python: Apply compensating controls status
def apply_compensating_control(control_name):
print(f"Compensating control '{control_name}' applied temporarily.")
# Example usage
apply_compensating_control("Manual login review for admin accounts")
# Python: Simulate system re-imaging status
def reimage_system(system_name):
print(f"Re-imaging system {system_name}... Complete. System restored to clean image.")
# Example usage
reimage_system("Workstation-23")
# Python: Reset permissions simulation
def restore_permissions(user, permissions):
print(f"Restoring permissions for {user}: {permissions}")
# Example usage
restore_permissions("alice", ["read", "write", "execute"])
# Python: Simulated vulnerability scan result
def validate_recovery(system):
print(f"Running vulnerability scan on {system}...")
print("No critical vulnerabilities found. System is secure.")
# Example usage
validate_recovery("Database Server 1")
# Python: Incident response plan skeleton
incident_response_plan = {
"roles": ["Incident Handler", "Communications", "Legal", "IT Support"],
"steps": ["Detect", "Analyze", "Contain", "Eradicate", "Recover", "Review"],
"communication": {
"internal": ["Email", "Phone", "Chat"],
"external": ["Law Enforcement", "Regulators"]
}
}
print("Incident Response Plan Overview:")
for role in incident_response_plan["roles"]:
print(f"- {role}")
# Python: Simple playbook example for malware incident
malware_playbook = {
"identification": ["Isolate affected machines", "Collect malware samples"],
"eradication": ["Run anti-malware tools", "Remove malicious files"],
"recovery": ["Restore systems from backups", "Monitor for reinfection"],
"communication": ["Notify security team", "Update stakeholders"]
}
def execute_playbook(playbook):
for phase, actions in playbook.items():
print(f"{phase.capitalize()} phase:")
for action in actions:
print(f"- {action}")
# Example usage
execute_playbook(malware_playbook)
# Sample table setup
incident = "Ransomware Outbreak"
roles = ["SOC Analyst", "Incident Commander", "Legal"]
for r in roles:
print(f"{r} participating in {incident} exercise")
# Simulate tool use
forensic_tool = "Volatility"
print(f"Launching memory analysis with {forensic_tool}")
# Simulate failover test
primary = False
if not primary:
print("Switching to DR site...")
# RACI example
roles = {"SOC Analyst": "Responsible", "Manager": "Accountable"}
for role, duty in roles.items():
print(f"{role} is {duty}")
# Python: Log post-incident recovery activities
def post_incident_log(incident_id, actions, notes):
"""
Logs post-incident activities for accountability and future review.
incident_id: str - unique incident identifier
actions: list - recovery and validation steps completed
notes: str - additional observations or recommendations
"""
print(f"Logging post-incident activities for {incident_id}")
for action in actions:
print(f"- Action completed: {action}")
print(f"Additional notes: {notes}")
# Example usage
post_incident_log("INC2025-07", ["System integrity verified", "Backups restored"], "Recommend patching vulnerability XYZ.")
# Python: Summarize lessons learned from after-action report
def after_action_report(incident_id, timeline, successes, improvements):
print(f"After-Action Report: Incident {incident_id}\n")
print("Incident Timeline:")
for event in timeline:
print(f" - {event}")
print("\nWhat Went Well:")
for success in successes:
print(f" - {success}")
print("\nAreas for Improvement:")
for imp in improvements:
print(f" - {imp}")
# Example usage
after_action_report(
"INC2025-07",
["Malware detected at 10:00 AM", "Isolated infected systems at 10:30 AM", "Systems restored by 1:00 PM"],
["Fast isolation prevented spread", "Good team coordination"],
["Improve phishing awareness training", "Enhance endpoint monitoring"]
)
# Python: 5 Whys root cause analysis example
def five_whys(incident):
print(f"Root Cause Analysis for: {incident}")
reasons = [
"Why 1: Firewall rules misconfigured",
"Why 2: Staff unaware of rule changes",
"Why 3: No documentation for updates",
"Why 4: Lack of training on procedures",
"Why 5: Inadequate oversight"
]
for reason in reasons:
print(reason)
# Example usage
five_whys("Unauthorized access through firewall")
# Python: Format executive incident summary
def executive_summary(incident_id, impact, response, risks, recommendations):
print(f"Executive Summary for Incident {incident_id}")
print("-"*50)
print(f"Impact: {impact}")
print(f"Response Summary: {response}")
print(f"Residual Risks: {risks}")
print("Recommendations:")
for rec in recommendations:
print(f" - {rec}")
# Example usage
executive_summary(
"INC2025-07",
"Customer data exposure affecting 10,000 users",
"Incident contained within 2 hours",
"Medium risk of data misuse",
["Increase endpoint monitoring", "Conduct staff security training"]
)
# Python: Update security playbook dictionary
def update_playbook(playbook, incident_type, steps):
playbook[incident_type] = steps
print(f"Playbook updated for {incident_type} incidents.")
# Example playbook and update
playbook = {
"phishing": ["Identify email", "Quarantine user account", "Reset credentials"]
}
update_playbook(playbook, "ransomware", ["Isolate infected systems", "Notify response team", "Restore backups"])
print(playbook)
# Python: Add IoCs to a list and display them
def add_iocs(ioc_list, new_ioc):
ioc_list.append(new_ioc)
print("Updated IoC List:")
for ioc in ioc_list:
print(f" - {ioc}")
# Example usage
iocs = ["hash:abcd1234", "ip:192.168.1.100"]
add_iocs(iocs, "domain:malicious-site.com")
# Python: Simulate forensic artifact collection
def collect_artifacts(artifacts):
print("Collecting forensic artifacts:")
for artifact in artifacts:
print(f"- Collected {artifact}")
# Example artifacts
artifacts = ["memory dump", "network logs", "file metadata"]
collect_artifacts(artifacts)
# Python: Simulate imaging and carving processes
def imaging(device):
print(f"Creating bit-for-bit image of {device}... Done.")
def carving(image_file):
print(f"Carving data fragments from {image_file}... Found 3 deleted files.")
# Example usage
imaging("Disk 1")
carving("Disk1.img")
import hashlib
def sha256_hash(file_path):
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while chunk := f.read(8192):
sha256.update(chunk)
return sha256.hexdigest()
# Example usage (file must exist)
# print(sha256_hash("evidence.img"))
# Python: Simple timeline reconstruction from event logs
def reconstruct_timeline(events):
sorted_events = sorted(events, key=lambda e: e["timestamp"])
print("Attack Timeline:")
for event in sorted_events:
print(f"{event['timestamp']}: {event['description']}")
# Example usage
events = [
{"timestamp": "2025-07-25 10:00", "description": "Phishing email sent"},
{"timestamp": "2025-07-25 10:15", "description": "User clicked link"},
{"timestamp": "2025-07-25 10:45", "description": "Malware installed"},
]
reconstruct_timeline(events)
# Python: Simple chain of custody log
def chain_of_custody_log(evidence_id, handler, action):
from datetime import datetime
timestamp = datetime.now().isoformat()
print(f"{timestamp} | Evidence {evidence_id} | {action} by {handler}")
# Example usage
chain_of_custody_log("EV123", "Analyst John", "Collected")
chain_of_custody_log("EV123", "Lab Tech Alice", "Transferred")
# Python: Simulate artifact extraction and analysis
def analyze_artifacts(artifacts):
print("Analyzing forensic artifacts:")
for artifact in artifacts:
print(f"- Extracted data from {artifact}")
# Example artifacts
artifacts = ["Registry", "Windows Event Log", "Browser Cache"]
analyze_artifacts(artifacts)
# Python: Simple escalation notification
def notify_stakeholders(severity, message):
stakeholders = {
"low": ["IT Team"],
"medium": ["IT Team", "Security Manager"],
"high": ["IT Team", "Security Manager", "Executives", "Legal"]
}
recipients = stakeholders.get(severity, ["IT Team"])
print(f"Notifying {', '.join(recipients)}: {message}")
# Example usage
notify_stakeholders("high", "Data breach detected in payment system.")
# Python: List stakeholders based on incident type
def identify_stakeholders(incident_type):
base = ["IT", "Security Team"]
if incident_type == "data breach":
base += ["Legal", "Compliance", "Executives", "Regulators", "Affected Customers"]
elif incident_type == "phishing":
base += ["Security Awareness Team"]
print(f"Stakeholders for {incident_type}: {', '.join(base)}")
# Example usage
identify_stakeholders("data breach")
# Python: Simulate regulatory reporting checklist
def regulatory_report_checklist(incident):
checklist = [
"Assess data types affected",
"Determine notification deadlines",
"Prepare disclosure statements",
"Notify regulatory bodies",
"Inform affected individuals"
]
print(f"Regulatory reporting checklist for {incident}:")
for item in checklist:
print(f"- {item}")
# Example usage
regulatory_report_checklist("Customer data breach")
# Python: Draft simple public relations statement template
def pr_statement(incident, message, contact):
print(f"Public Relations Statement on {incident}")
print("-"*50)
print(message)
print(f"\nFor more information, contact: {contact}")
# Example usage
pr_statement(
"Security Incident July 2025",
"Our teams have contained the incident quickly and are working diligently to ensure customer data safety.",
"pr@company.com"
)
# Python: Simple example using TLS socket to secure communication
import socket
import ssl
def secure_client_send(message, server_hostname, server_port):
context = ssl.create_default_context()
with socket.create_connection((server_hostname, server_port)) as sock:
with context.wrap_socket(sock, server_hostname=server_hostname) as ssock:
ssock.sendall(message.encode())
response = ssock.recv(1024).decode()
print("Received:", response)
# Example usage (replace with real server info)
# secure_client_send("Incident update: containment successful.", "secure.server.com", 443)
# Python: Escalation example based on incident severity
def escalate_incident(severity):
if severity >= 8:
print("Escalate to CISO and executive team.")
elif severity >= 5:
print("Notify incident response team.")
else:
print("Handle at local IT team level.")
# Example usage
escalate_incident(7) # Output: Notify incident response team.
# Python: Simple KPI tracker example
kpi = {
"mean_time_to_detect": 12, # hours
"mean_time_to_respond": 6, # hours
"incidents_resolved": 15,
"incidents_open": 3
}
def print_kpi(kpi):
for key, value in kpi.items():
print(f"{key.replace('_',' ').title()}: {value}")
print_kpi(kpi)
# Python: Calculate average MTTD from incident timestamps
from datetime import datetime
incident_times = [
("2023-07-20 10:00:00", "2023-07-20 12:30:00"), # (occurred, detected)
("2023-07-21 09:15:00", "2023-07-21 10:00:00"),
("2023-07-22 14:00:00", "2023-07-22 14:45:00"),
]
def calculate_mttd(incident_times):
total_seconds = 0
for occurred, detected in incident_times:
fmt = "%Y-%m-%d %H:%M:%S"
start = datetime.strptime(occurred, fmt)
end = datetime.strptime(detected, fmt)
total_seconds += (end - start).total_seconds()
average_seconds = total_seconds / len(incident_times)
return average_seconds / 3600 # convert to hours
print(f"Average MTTD: {calculate_mttd(incident_times):.2f} hours")
# Python: Calculate average MTTR from incident timestamps
incident_response_times = [
("2023-07-20 12:30:00", "2023-07-20 15:00:00"), # (detected, resolved)
("2023-07-21 10:00:00", "2023-07-21 13:15:00"),
("2023-07-22 14:45:00", "2023-07-22 18:00:00"),
]
def calculate_mttr(response_times):
total_seconds = 0
for detected, resolved in response_times:
fmt = "%Y-%m-%d %H:%M:%S"
start = datetime.strptime(detected, fmt)
end = datetime.strptime(resolved, fmt)
total_seconds += (end - start).total_seconds()
average_seconds = total_seconds / len(response_times)
return average_seconds / 3600 # convert to hours
print(f"Average MTTR: {calculate_mttr(incident_response_times):.2f} hours")
# Python: Count alerts by type from a list
alerts = [
{"type": "malware", "details": "..."},
{"type": "phishing", "details": "..."},
{"type": "malware", "details": "..."},
{"type": "intrusion", "details": "..."},
{"type": "malware", "details": "..."},
]
from collections import Counter
def count_alerts(alerts):
types = [alert["type"] for alert in alerts]
counts = Counter(types)
return counts
print("Alert volumes by type:")
print(count_alerts(alerts))
# Python: Simple trend analysis example (monthly incidents)
monthly_incidents = {
"Jan": 5,
"Feb": 7,
"Mar": 9,
"Apr": 6,
"May": 8,
"Jun": 10,
}
def analyze_trends(data):
for month, count in data.items():
print(f"{month}: {count} incidents")
analyze_trends(monthly_incidents)
# Python: Track improvement in MTTD over quarters
mttd_quarters = [15, 12, 10, 8] # hours per quarter
def print_improvements(metrics):
for i, value in enumerate(metrics, 1):
print(f"Quarter {i}: MTTD = {value} hours")
print_improvements(mttd_quarters)
# Python: Example automated alert triage function
def auto_triage(alert):
if alert["severity"] > 7:
return "High priority - escalate immediately"
elif alert["severity"] > 4:
return "Medium priority - investigate"
else:
return "Low priority - monitor"
# Example alert
alert = {"id": "A123", "severity": 8}
print(auto_triage(alert))
# Python: Simplified SOAR playbook runner simulation
def run_playbook(steps):
for step in steps:
print(f"Executing: {step}")
playbook_steps = [
"Collect logs from endpoint",
"Enrich alert with threat intel",
"Isolate affected device",
"Notify security team",
]
run_playbook(playbook_steps)
# Python: Automated triage filtering example
alerts = [
{"id": "1", "severity": 9, "source": "IDS"},
{"id": "2", "severity": 3, "source": "Firewall"},
{"id": "3", "severity": 6, "source": "Endpoint"},
]
def filter_alerts(alerts, threshold=5):
return [a for a in alerts if a["severity"] >= threshold]
high_priority = filter_alerts(alerts)
print("High priority alerts:")
for alert in high_priority:
print(alert)
# Python: Simulated integration fetching threat intel
def fetch_threat_intel():
return ["192.168.100.5", "malicious.com", "hash123456"]
def check_alert_against_intel(alert, intel):
return any(i in alert for i in intel)
# Example usage
intel_data = fetch_threat_intel()
alert = "Connection attempt to 192.168.100.5"
if check_alert_against_intel(alert, intel_data):
print("Alert matches known threat intel!")
else:
print("No match with threat intel.")
# Python: Simulate automated evidence capture
def capture_evidence(evidence_type):
print(f"Capturing {evidence_type} evidence...")
# Placeholder for real capture logic
print(f"{evidence_type} evidence saved with timestamp.")
# Example usage
capture_evidence("memory dump")
capture_evidence("system logs")
# Python: Basic Git command example for playbook version control
# (Note: Run these commands in terminal, shown here as comments)
# git init
# git add incident_playbook.yml
# git commit -m "Initial playbook version"
# git push origin main
# Python: Simple simulation of playbook testing
def test_playbook(playbook):
print("Starting playbook test...")
for step in playbook:
print(f"Executing step: {step}")
print("Playbook test completed successfully.")
playbook_steps = ["Identify incident", "Contain threat", "Eradicate malware", "Recover systems"]
test_playbook(playbook_steps)
# Python: Placeholder for scheduling training sessions
def schedule_training(date, topic):
print(f"Training on '{topic}' scheduled for {date}.")
schedule_training("2025-08-15", "Ransomware response simulation")
# Python: Record lessons learned example
lessons = []
def add_lesson(lesson):
lessons.append(lesson)
print("Lesson added.")
add_lesson("Need faster malware detection tools.")
add_lesson("Improve communication during escalation.")
print("Lessons learned:")
for l in lessons:
print(f"- {l}")
# Python: Generate simple incident report
incident_report = {
"id": "INC-20250727-01",
"description": "Phishing email detected",
"actions": ["Blocked sender", "Notified users", "Monitored inbox"],
"status": "Closed"
}
def print_report(report):
for key, value in report.items():
print(f"{key.title()}: {value}")
print_report(incident_report)
# Python: Check if incident requires breach notification
def requires_notification(incident_type):
notify_types = ["data breach", "personal data exposure"]
return incident_type.lower() in notify_types
print(requires_notification("Data Breach")) # True
print(requires_notification("Malware Infection")) # False
# Python: Template for public statement
def create_public_statement(incident_summary, actions_taken):
statement = f"Security Incident Report:\n{incident_summary}\nActions Taken:\n"
for action in actions_taken:
statement += f"- {action}\n"
return statement
summary = "A phishing attack targeted our email system on July 20."
actions = ["Blocked phishing emails", "Alerted affected users", "Enhanced email filtering"]
print(create_public_statement(summary, actions))
# Python: Example vulnerability report generator
vulnerability = {
"id": "VULN-2025-001",
"description": "SQL Injection vulnerability in login form",
"risk_level": "High",
"affected_system": "Web Application Server",
"recommendation": "Implement parameterized queries to prevent injection."
}
def print_vulnerability_report(vuln):
print(f"Vulnerability ID: {vuln['id']}")
print(f"Description: {vuln['description']}")
print(f"Risk Level: {vuln['risk_level']}")
print(f"Affected System: {vuln['affected_system']}")
print(f"Recommendation: {vuln['recommendation']}")
print_vulnerability_report(vulnerability)
# Python: Simple CSV compliance report creation
import csv
data = [
{"control": "Access Control", "status": "Compliant"},
{"control": "Encryption", "status": "Non-Compliant"},
]
with open('compliance_report.csv', 'w', newline='') as csvfile:
fieldnames = ['control', 'status']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)
print("Compliance report saved as compliance_report.csv")
# Python: Calculate average risk score from vulnerabilities
vulnerabilities = [
{"id": "V1", "risk_score": 8.5},
{"id": "V2", "risk_score": 6.0},
{"id": "V3", "risk_score": 9.2},
]
def average_risk_score(vulns):
total = sum(v["risk_score"] for v in vulns)
return total / len(vulns)
print(f"Average Risk Score: {average_risk_score(vulnerabilities):.2f}")
# Python: Simple printout of affected hosts mapping
affected_hosts = {
"Web Server": ["192.168.1.10", "192.168.1.11"],
"Database Server": ["192.168.1.20"],
"User Workstations": ["192.168.1.100", "192.168.1.101", "192.168.1.102"],
}
for host_type, ips in affected_hosts.items():
print(f"{host_type}:")
for ip in ips:
print(f" - {ip}")
# Python: Example mitigation plan tracker
mitigation_plan = {
"immediate_action": "Apply patch KB123456",
"long_term": "Review and update firewall rules quarterly",
"training": "Conduct phishing awareness every 6 months"
}
def print_mitigation_plan(plan):
for key, action in plan.items():
print(f"{key.replace('_',' ').title()}: {action}")
print_mitigation_plan(mitigation_plan)
# Python: Simple task list for action plan
action_plan = [
{"task": "Patch vulnerable servers", "owner": "SysAdmin", "deadline": "2025-08-10", "status": "Pending"},
{"task": "Update antivirus signatures", "owner": "Security Team", "deadline": "2025-08-05", "status": "Completed"},
]
def print_action_plan(tasks):
for t in tasks:
print(f"Task: {t['task']}")
print(f"Owner: {t['owner']}")
print(f"Deadline: {t['deadline']}")
print(f"Status: {t['status']}")
print("---")
print_action_plan(action_plan)
# Python: Define a communication plan schedule
communication_plan = {
"weekly_updates": "Email to security and IT teams",
"management_reports": "Monthly summary meetings",
"external_notifications": "As per regulatory requirements"
}
def print_communication_plan(plan):
for key, method in plan.items():
print(f"{key.replace('_',' ').title()}: {method}")
print_communication_plan(communication_plan)
# Python: List of stakeholder groups
stakeholders = ["IT Team", "Security Team", "Legal Department", "Compliance", "Executives", "Vendors", "Customers"]
print("Stakeholder Groups:")
for s in stakeholders:
print(f"- {s}")
# Python: Example timeline setup
reporting_schedule = {
"critical": "Daily",
"high_risk": "Weekly",
"routine": "Monthly"
}
for level, cadence in reporting_schedule.items():
print(f"{level.title()} vulnerabilities: Report every {cadence}")
# Python: Example SLA data structure
slas = {
"response_time": "4 hours",
"resolution_time": "24 hours",
"notification_time": "1 hour"
}
def print_sla_details(sla):
for key, value in sla.items():
print(f"{key.replace('_',' ').title()}: {value}")
print_sla_details(slas)
# Python: Governance checklist example
governance_checklist = {
"policy_review": True,
"risk_approval": True,
"compliance_audit": False,
}
def print_governance_status(checklist):
for item, status in checklist.items():
print(f"{item.replace('_',' ').title()}: {'Completed' if status else 'Pending'}")
print_governance_status(governance_checklist)
# Python: Simple escalation path list
escalation_path = ["Security Analyst", "Security Manager", "CISO", "Legal Team"]
print("Escalation Path:")
for level in escalation_path:
print(f"- {level}")
# Python: Calculate remediation KPI example
vulnerabilities = [
{"id": "V1", "status": "Remediated", "days_to_fix": 5},
{"id": "V2", "status": "Open", "days_to_fix": None},
{"id": "V3", "status": "Remediated", "days_to_fix": 3},
]
remediated = [v for v in vulnerabilities if v["status"] == "Remediated"]
average_days = sum(v["days_to_fix"] for v in remediated) / len(remediated)
print(f"Average Days to Remediate: {average_days:.2f}")
# Python: Plotting vulnerability trend over months (requires matplotlib)
import matplotlib.pyplot as plt
months = ["Jan", "Feb", "Mar", "Apr", "May"]
open_vulns = [12, 9, 7, 5, 3]
plt.plot(months, open_vulns, marker='o')
plt.title("Open Vulnerabilities Trend Over Time")
plt.xlabel("Month")
plt.ylabel("Number of Open Vulnerabilities")
plt.show()
# Python: Simple zero-day tracker
zero_days = [
{"id": "ZD-001", "discovered": "2025-07-01"},
{"id": "ZD-002", "discovered": "2025-07-15"},
]
print(f"Total Zero-Day Vulnerabilities: {len(zero_days)}")
for zd in zero_days:
print(f"- {zd['id']} discovered on {zd['discovered']}")
# Python: Calculate SLA compliance percentage
sla_targets = {
"response_time_hours": 4,
"resolution_time_hours": 24,
}
incidents = [
{"id": "INC1", "response_hours": 3, "resolution_hours": 20},
{"id": "INC2", "response_hours": 5, "resolution_hours": 30},
]
def calculate_compliance(incidents, target_key):
met = sum(1 for i in incidents if i[target_key] <= sla_targets[target_key])
return (met / len(incidents)) * 100
response_compliance = calculate_compliance(incidents, "response_hours")
resolution_compliance = calculate_compliance(incidents, "resolution_hours")
print(f"Response Time SLA Compliance: {response_compliance:.1f}%")
print(f"Resolution Time SLA Compliance: {resolution_compliance:.1f}%")
# Python: Simple heatmap visualization (requires seaborn and matplotlib)
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
# Simulated vulnerability severity across 5 systems
data = np.array([
[3, 1, 4, 2, 5],
[2, 5, 3, 1, 4],
[4, 2, 1, 5, 3],
[5, 3, 2, 4, 1],
[1, 4, 5, 3, 2],
])
sns.heatmap(data, annot=True, cmap="YlOrRd")
plt.title("Vulnerability Heatmap")
plt.show()
# Python: Calculate remediation velocity example
import datetime
vulns = [
{"id": "V1", "discovered": datetime.date(2025, 7, 1), "remediated": datetime.date(2025, 7, 5)},
{"id": "V2", "discovered": datetime.date(2025, 7, 3), "remediated": datetime.date(2025, 7, 6)},
]
total_days = sum((v["remediated"] - v["discovered"]).days for v in vulns)
average_velocity = total_days / len(vulns)
print(f"Average Remediation Velocity: {average_velocity:.1f} days")
# Python: Track incident communications log
communications_log = []
def log_communication(recipient, message):
communications_log.append({"recipient": recipient, "message": message, "timestamp": datetime.datetime.now()})
print(f"Logged communication to {recipient}")
import datetime
log_communication("IT Team", "Incident detected and contained.")
log_communication("Management", "Mitigation in progress.")
print("Communication Log:")
for entry in communications_log:
print(f"{entry['timestamp']}: To {entry['recipient']} - {entry['message']}")
# Python: Generate a concise executive summary
incident_details = {
"impact": "Temporary service disruption",
"actions": ["Incident contained", "Systems restored", "Security patch applied"],
"risk_level": "Moderate",
"next_steps": ["Conduct post-incident review", "Update security policies"]
}
def executive_summary(details):
summary = f"Impact: {details['impact']}\n"
summary += "Key Actions:\n"
for action in details["actions"]:
summary += f"- {action}\n"
summary += f"Risk Level: {details['risk_level']}\n"
summary += "Recommended Next Steps:\n"
for step in details["next_steps"]:
summary += f"- {step}\n"
return summary
print(executive_summary(incident_details))
# Python: Example of storing 5W timeline events
timeline = [
{"who": "Attacker", "what": "Phishing Email Sent", "when": "2025-07-01 09:00", "where": "External Network", "why": "Credential Theft"},
{"who": "User", "what": "Clicked Link", "when": "2025-07-01 09:15", "where": "User PC", "why": "Unaware of Threat"},
{"who": "Security Team", "what": "Incident Detected", "when": "2025-07-01 10:00", "where": "SIEM Dashboard", "why": "Anomaly Alert"}
]
for event in timeline:
print(f"Who: {event['who']}, What: {event['what']}, When: {event['when']}, Where: {event['where']}, Why: {event['why']}")
# Python: Example of impact assessment with recommendations
incident_impact = {
"data_loss": True,
"system_downtime_hours": 5,
"financial_cost_estimate": 20000
}
def assess_impact(impact):
if impact["data_loss"]:
print("Critical: Data loss occurred.")
if impact["system_downtime_hours"] > 4:
print("High impact: Extended downtime.")
print(f"Estimated financial cost: ${impact['financial_cost_estimate']}")
recommendations = [
"Restore data from backup.",
"Improve endpoint detection.",
"Conduct employee phishing training."
]
assess_impact(incident_impact)
print("\nRecommendations:")
for rec in recommendations:
print(f"- {rec}")
# Python: Simulate evidence collection in a report
evidence = {
"logs": ["auth.log", "firewall.log"],
"screenshots": ["alert1.png", "dashboard.png"],
"forensic_images": ["disk_image.dd"]
}
def list_evidence(evidence_dict):
for ev_type, files in evidence_dict.items():
print(f"{ev_type.title()}:")
for f in files:
print(f" - {f}")
print("Collected Evidence:")
list_evidence(evidence)
# Python: Scope and impact example
scope = ["Email Server", "HR Database", "Employee Workstations"]
impact = {
"disruption": "Email downtime for 3 hours",
"data_exposure": "Personal employee data",
"financial_loss": 15000
}
print("Incident Scope:")
for system in scope:
print(f"- {system}")
print("\nIncident Impact:")
for key, val in impact.items():
print(f"{key.title().replace('_', ' ')}: {val}")
# Python: Example stakeholder message templates
stakeholders = {
"executives": "Summary of incident impact and remediation status.",
"IT Team": "Technical details and immediate tasks.",
"Legal": "Compliance and regulatory implications.",
"Customers": "Reassurance and support information."
}
for group, message in stakeholders.items():
print(f"{group.title()} Message: {message}")
# Python: Example briefing data structure
briefing = {
"incident_summary": "Phishing attack caused temporary email service disruption.",
"business_impact": "Minor, no financial loss yet.",
"actions_taken": ["Patch applied", "User training initiated"],
"resources_needed": ["Additional monitoring tools"]
}
for key, val in briefing.items():
print(f"{key.replace('_',' ').title()}: {val}")
# Python: Example breach notification format
breach_report = {
"date_reported": "2025-07-15",
"affected_data": "Personal Identifiable Information (PII)",
"number_of_records": 1500,
"mitigation_steps": ["Access revoked", "Encryption updated"],
"reporting_authority": "Data Protection Agency"
}
for key, val in breach_report.items():
print(f"{key.replace('_',' ').title()}: {val}")
# Python: Example public message template
public_message = """
We recently detected a security incident affecting our email services. No sensitive data was compromised.
We have taken immediate corrective actions and are enhancing our security measures to prevent future incidents.
Please remain vigilant for phishing emails and report suspicious activity.
"""
print(public_message)
# Python: Example disclosure message differentiation
messages = {
"internal": "Incident detected. Follow the incident response playbook and report anomalies.",
"external": "We are investigating a recent incident and will update you with findings."
}
for audience, message in messages.items():
print(f"{audience.title()} Message: {message}")
# Python: Closure notification example
closure_notification = {
"incident_id": "INC-2025-007",
"status": "Resolved",
"resolution_summary": "Phishing campaign blocked; affected accounts secured.",
"lessons_learned": "Need stronger email filtering and user training.",
"date_closed": "2025-07-20"
}
for key, val in closure_notification.items():
print(f"{key.replace('_',' ').title()}: {val}")
# Python: Root cause and lessons learned summary
report = {
"root_cause": "Weak password policy allowed credential theft.",
"lessons_learned": [
"Implement MFA for all accounts.",
"Enhance password complexity requirements.",
"Increase phishing simulation training frequency."
]
}
print(f"Root Cause: {report['root_cause']}")
print("Lessons Learned:")
for lesson in report["lessons_learned"]:
print(f"- {lesson}")
# Python: Simple 5 Whys example
def five_whys(problem):
answers = [
"Why did the phishing succeed? - Weak user awareness.",
"Why was awareness weak? - Insufficient training.",
"Why was training insufficient? - Budget constraints.",
"Why budget constrained? - Management prioritization.",
"Why was management priority low? - Lack of risk understanding."
]
print(f"Problem: {problem}\n")
for ans in answers:
print(ans)
five_whys("Phishing attack led to credential compromise.")
# Python: Example gap analysis report
required_controls = {"MFA", "Encryption", "Logging", "Patch Management"}
current_controls = {"Encryption", "Logging"}
gaps = required_controls - current_controls
print("Security Control Gaps:")
for gap in gaps:
print(f"- {gap}")
# Python: Example remediation output tasks
remediation_plan = [
{"task": "Enable MFA", "owner": "Security Team", "due_date": "2025-08-15"},
{"task": "Apply critical patches", "owner": "SysAdmin", "due_date": "2025-08-05"}
]
for item in remediation_plan:
print(f"Task: {item['task']}, Owner: {item['owner']}, Due: {item['due_date']}")
# Python: Example policy update tracker
policy_updates = [
{"policy": "Password Policy", "update": "Require 12+ characters with special symbols"},
{"policy": "Incident Response", "update": "Add phishing detection training"}
]
for update in policy_updates:
print(f"Policy: {update['policy']}, Update: {update['update']}")
# Python: Example forensics summary update
forensics_summary = {
"initial_findings": "Malware execution detected at 09:00",
"additional_findings": "C2 communication confirmed at 10:30",
"final_status": "Malware eradicated, no data exfiltration detected"
}
for key, val in forensics_summary.items():
print(f"{key.replace('_',' ').title()}: {val}")
# Python: Example communication tools list
tools_formats = {
"Email": "Detailed reports and notifications",
"Instant Messaging": "Quick alerts and coordination",
"Dashboard": "Live incident status updates",
"Presentation": "Executive briefings"
}
for tool, purpose in tools_formats.items():
print(f"{tool}: {purpose}")
# Python: Simulate dashboard data update
dashboard_data = {
"active_incidents": 3,
"resolved_incidents": 15,
"average_response_time_minutes": 45
}
def display_dashboard(data):
for k, v in data.items():
print(f"{k.replace('_',' ').title()}: {v}")
display_dashboard(dashboard_data)
# Python: Generate a simple bar chart with matplotlib
import matplotlib.pyplot as plt
vulnerabilities = ['Low', 'Medium', 'High']
counts = [10, 5, 2]
plt.bar(vulnerabilities, counts, color=['green', 'orange', 'red'])
plt.title('Vulnerability Severity Counts')
plt.xlabel('Severity')
plt.ylabel('Count')
plt.show()
# Python: Simple executive briefing template output
briefing_template = {
"Title": "Security Incident Executive Briefing",
"Date": "2025-07-21",
"Summary": "Phishing incident resulted in limited email disruption; no data loss detected.",
"Business Impact": "Minimal operational impact, no financial loss.",
"Next Steps": "Enhance email filtering and conduct user training.",
"Resource Requests": "Approval for additional monitoring tools."
}
for key, val in briefing_template.items():
print(f"{key}: {val}\n")
// Example Python snippet to parse forensic logs and extract key events
with open('forensic_log.txt', 'r') as f:
logs = f.readlines()
for line in logs:
if "ERROR" in line or "SUSPICIOUS" in line:
print("Important event:", line.strip())
// Sample JSON format for sharing forensic summary across teams
{
"incident_id": "1234",
"summary": "Unauthorized access detected",
"technical_details": "...",
"compliance_implications": "Potential GDPR breach",
"recommended_actions": "Reset passwords, enhance monitoring"
}
// Python automation snippet to generate basic forensic report
import datetime
def generate_report(events):
report = f"Forensic Report - {datetime.date.today()}\n"
report += "Events:\n"
for e in events:
report += f"- {e}\n"
with open('forensic_report.txt', 'w') as f:
f.write(report)
print("Report generated.")
events = ["Login failure", "Malware detected", "File deletion"]
generate_report(events)
// Example: Triggering Nessus scan report download (pseudo-code)
import requests
def download_report(scan_id):
url = f"https://nessus.local/api/scans/{scan_id}/report"
response = requests.get(url, verify=False)
with open("scan_report.pdf", "wb") as file:
file.write(response.content)
print("Scan report downloaded.")
download_report(101)
// Sample code to refresh security dashboard via REST API
import requests
def refresh_dashboard():
api_url = "https://securitydashboard.local/api/data"
response = requests.get(api_url)
if response.status_code == 200:
data = response.json()
print("Dashboard data updated:", data)
else:
print("Failed to refresh dashboard.")
refresh_dashboard()
// Example pseudo-code for SOAR summary generation
incident = {
"alerts": 5,
"actions_taken": ["Blocked IP", "Reset password"],
"status": "Contained"
}
summary = f"Incident Summary:\nAlerts: {incident['alerts']}\nActions: {', '.join(incident['actions_taken'])}\nStatus: {incident['status']}"
print(summary)
// Example Python snippet to fetch and integrate threat feed
import requests
feed_url = "https://threatfeed.example.com/api/latest"
response = requests.get(feed_url)
if response.ok:
threats = response.json()
print("Latest threat indicators:", threats)
else:
print("Failed to retrieve threat feed")
// Example Jinja2 template snippet for report generation
report_template = """
Incident Report:
- ID: {{ id }}
- Date: {{ date }}
- Summary: {{ summary }}
- Actions Taken: {{ actions }}
"""
from jinja2 import Template
data = {"id": 123, "date": "2025-07-27", "summary": "Data breach", "actions": "Password reset"}
print(Template(report_template).render(data))
// Sample Python code to generate incident trend data
incidents = [5, 3, 8, 2, 6] # incidents per month
months = ["Jan", "Feb", "Mar", "Apr", "May"]
for month, count in zip(months, incidents):
print(f"{month}: {count} incidents")
// Python example splitting incidents quarterly
incidents = [10, 15, 12, 20, 18, 25, 22, 30, 28, 35, 33, 40] # Monthly incidents
quarters = [sum(incidents[i:i+3]) for i in range(0, 12, 3)]
for i, q in enumerate(quarters, 1):
print(f"Q{i} Incidents: {q}")
// Example pseudo code for creating presentation slides
slides = [
"Intro: Security Posture",
"Incident Trends",
"Risk Mitigation Status",
"Recommendations"
]
for slide in slides:
print("Slide:", slide)
// Python example to gather evidence files for audit
import os, shutil
evidence_dir = "/forensics/evidence"
audit_dir = "/audit_ready"
for file in os.listdir(evidence_dir):
shutil.copy(os.path.join(evidence_dir, file), audit_dir)
print("Evidence packaged for audit")
// Example compliance gap check pseudo-code
required_controls = {"Encryption", "Access Control", "Audit Logs"}
implemented_controls = {"Encryption", "Audit Logs"}
gaps = required_controls - implemented_controls
print("Compliance gaps:", gaps)
// Example risk score tracking (simplified)
risk_scores = [80, 75, 70, 60, 55]
for month, score in enumerate(risk_scores, 1):
print(f"Month {month}: Risk Score = {score}")
// Sample policy snippet for secure communications
policy = """
Only authorized personnel may disclose incident details.
Compliance with GDPR and other privacy laws is mandatory.
Communication should be logged and monitored.
"""
print(policy)
// Example pseudocode for masking PII in logs
def mask_pii(data):
return data.replace("user@example.com", "*****@*****")
log = "User login from user@example.com"
print(mask_pii(log))
// Example notification timeline (pseudo-code)
breach_detected = True
days_since_breach = 20
if breach_detected and days_since_breach <= 30:
print("Notify regulators and affected parties within legal timeframe")
// Example using Python's ssl module for secure socket
import socket, ssl
context = ssl.create_default_context()
with socket.create_connection(("secure.example.com", 443)) as sock:
with context.wrap_socket(sock, server_hostname="secure.example.com") as ssock:
print(ssock.version())
// Example: Encrypt sensitive notes before storage
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher_suite = Fernet(key)
note = "Sensitive analyst commentary"
encrypted_note = cipher_suite.encrypt(note.encode())
print(encrypted_note)
// Example workflow step in legal coordination
def notify_legal(incident_id):
print(f"Legal team notified about incident {incident_id}")
notify_legal("INC-12345")
# Python: Simple training session tracker
training_sessions = [
{"topic": "Phishing Awareness", "date": "2025-07-01", "trainer": "Alice", "attendees": 25},
{"topic": "Incident Response Basics", "date": "2025-07-10", "trainer": "Bob", "attendees": 30},
]
def list_training_sessions(sessions):
for s in sessions:
print(f"Topic: {s['topic']}")
print(f"Date: {s['date']}")
print(f"Trainer: {s['trainer']}")
print(f"Attendees: {s['attendees']}")
print("---")
list_training_sessions(training_sessions)
# Python: Summarize training feedback scores
training_feedback = [
{"topic": "Phishing Awareness", "avg_score": 4.5, "max_score": 5},
{"topic": "Incident Response", "avg_score": 4.7, "max_score": 5},
]
for feedback in training_feedback:
print(f"Topic: {feedback['topic']}")
print(f"Average Feedback Score: {feedback['avg_score']} / {feedback['max_score']}")
print("---")
# Python: Log lessons learned points
lessons_learned = [
"Improve incident communication protocols.",
"Update firewall rules to cover new threats.",
"Increase training frequency for new hires."
]
print("Lessons Learned:")
for lesson in lessons_learned:
print(f"- {lesson}")
# Python: Simulate document hand-off checklist
documents = ["Incident Response Plan", "Network Diagrams", "Security Policies", "Training Manuals"]
def handoff_documents(docs):
print("Documents handed off:")
for doc in docs:
print(f"- {doc}")
handoff_documents(documents)
# Python: Simple report template function
def generate_report_template():
report = {
"Incident_ID": "",
"Date": "",
"Description": "",
"Actions_Taken": "",
"Recommendations": "",
"Analyst_Name": ""
}
return report
report = generate_report_template()
print("Empty Report Template:")
for k, v in report.items():
print(f"{k}: {v}")
# Python: Schedule and send education reminders
import datetime
education_schedule = [
{"topic": "New Phishing Techniques", "date": "2025-08-01"},
{"topic": "Cloud Security Updates", "date": "2025-09-01"},
]
def send_reminder(topic, date):
print(f"Reminder: Training on '{topic}' scheduled for {date}")
for edu in education_schedule:
send_reminder(edu["topic"], edu["date"])
# Python: Textual lifecycle steps printout
lifecycle_steps = ["Preparation", "Detection & Analysis", "Containment", "Eradication", "Recovery", "Post-Incident Activity"]
print("Incident Lifecycle:")
for step in lifecycle_steps:
print(f"- {step}")
# Python: Simple timeline text representation
events = [
{"time": "10:00", "event": "Initial Alert"},
{"time": "10:15", "event": "Incident Confirmed"},
{"time": "11:00", "event": "Containment Started"},
{"time": "14:00", "event": "Incident Resolved"},
]
print("Incident Timeline:")
for e in events:
print(f"{e['time']}: {e['event']}")
# Python: Simple kill chain stages list
kill_chain_stages = [
"Reconnaissance",
"Weaponization",
"Delivery",
"Exploitation",
"Installation",
"Command & Control",
"Actions on Objectives",
]
print("Kill Chain Stages:")
for stage in kill_chain_stages:
print(f"- {stage}")
# Python: Simple impact score bar visualization (text)
impacts = {"Low": 1, "Medium": 3, "High": 5}
for impact, score in impacts.items():
print(f"{impact}: {'█' * score}")
# Python: Simple graph relationships using dict
threat_graph = {
"Attacker": ["Malware", "Phishing"],
"Malware": ["Server A", "Server B"],
"Phishing": ["User Workstations"]
}
print("Threat Graph Relationships:")
for node, connections in threat_graph.items():
print(f"{node} -> {', '.join(connections)}")
# Python: Simple text dashboard simulation
dashboard = {
"Open Incidents": 5,
"Critical": 2,
"High": 1,
"Medium": 1,
"Low": 1,
}
print("Incident Dashboard:")
for key, value in dashboard.items():
print(f"{key}: {value}")
# Python: Continuous improvement task list
improvement_tasks = [
"Update incident response policies",
"Conduct quarterly security drills",
"Enhance threat intelligence feeds",
]
print("Continuous Improvement Tasks:")
for task in improvement_tasks:
print(f"- {task}")
# Python: Log post-incident review notes
post_incident_reviews = [
"Response time was satisfactory.",
"Communication gaps identified between teams.",
"Recommend improved documentation."
]
print("Post-Incident Review Notes:")
for note in post_incident_reviews:
print(f"- {note}")
# Python: Track remediation success rates
remediation_results = [
{"vuln_id": "V1", "fixed": True},
{"vuln_id": "V2", "fixed": False},
{"vuln_id": "V3", "fixed": True},
]
success_count = sum(1 for r in remediation_results if r["fixed"])
total = len(remediation_results)
print(f"Remediation Success Rate: {success_count}/{total} ({(success_count/total)*100:.1f}%)")
# Python: Documenting policy updates
policy_updates = [
"Add multi-factor authentication requirements.",
"Enforce stronger password policies.",
"Increase monitoring of privileged accounts.",
]
print("Policy Updates Needed:")
for update in policy_updates:
print(f"- {update}")
# Python: Track IR plan update items
ir_plan_updates = [
"Include cloud service provider incident roles.",
"Update contact list for incident response team.",
"Add ransomware-specific response procedures.",
]
print("IR Plan Update Items:")
for item in ir_plan_updates:
print(f"- {item}")
# Example: Simple maturity score tracker in Python
# Define maturity levels for three domains
maturity = {
'incident_response': 3, # Scale 1-5
'threat_detection': 4,
'compliance': 2
}
def maturity_summary(maturity):
for domain, level in maturity.items():
print(f"Domain: {domain.replace('_', ' ').title()} - Maturity Level: {level}/5")
maturity_summary(maturity)
# Simulating a case study scenario
case_study = {
"incident": "Phishing Email Compromise",
"steps": ["Detect email", "Isolate account", "Analyze logs", "Reset credentials"]
}
for step in case_study['steps']:
print(f"Case Study Step: {step}")
# Example: Generating a ransomware report summary
report = {
"incident_type": "Ransomware",
"detected_by": "EDR",
"ransom_note_found": True,
"impact_scope": "HR Department File Server",
"response_status": "Containment Initiated"
}
for key, value in report.items():
print(f"{key.replace('_', ' ').title()}: {value}")
# Communication structure example
message = {
"recipient": "Customers",
"tone": "Reassuring",
"content": "We are investigating a data breach and taking all necessary steps to secure your information."
}
print(f"To: {message['recipient']}\nTone: {message['tone']}\nContent: {message['content']}")
# Example: Basic vulnerability case dictionary
vuln_case = {
"asset": "web01.example.com",
"vulnerability": "CVE-2022-1234",
"action": "Patch applied"
}
print("Vulnerability Action Case:")
for k, v in vuln_case.items():
print(f"{k.title()}: {v}")
# Departments participating in incident
departments = ["Security", "IT", "Legal", "HR"]
for dept in departments:
print(f"Notifying {dept} department...")
# Simulating audit report content
report = {
"incident": "Data leak",
"regulation": "GDPR",
"status": "Reported to DPA within 72 hours"
}
print("Audit Scenario Report:")
for key, val in report.items():
print(f"{key}: {val}")
# Track certification status
certs = {"CySA+": "2026-01-01", "CISSP": "2025-12-15"}
for cert, expiry in certs.items():
print(f"{cert} expires on {expiry}")
# Example CEU log
ceu_log = [
{"event": "SANS Workshop", "points": 6},
{"event": "CompTIA Webinar", "points": 2}
]
total = sum(item["points"] for item in ceu_log)
print(f"Total CEUs: {total}")
# Create education roadmap
plan = ["Cloud Security Course", "Python for Security", "Blue Team Training"]
for item in plan:
print(f"Planned Training: {item}")
# Sample exam tips
print("- Read exam objectives")
print("- Use flashcards")
print("- Practice hands-on scenarios")
# Simulate CLI task
print("Configure a firewall rule:")
print("allow tcp from 10.0.0.0/8 to any port 443")
# Example role map
roles = ["Tier 1 SOC Analyst", "Incident Responder", "Security Engineer"]
for role in roles:
print(f"Career Path: {role}")
# Python Example: Simple SIEM log normalization function
def normalize_log(log):
normalized = {
"timestamp": log.get("time"),
"source_ip": log.get("src_ip"),
"destination_ip": log.get("dest_ip"),
"event_type": log.get("type"),
"message": log.get("msg")
}
return normalized
sample_log = {
"time": "2025-07-27T14:22:00Z",
"src_ip": "192.168.1.10",
"dest_ip": "10.0.0.5",
"type": "login_attempt",
"msg": "Failed login from user admin"
}
print(normalize_log(sample_log))
# Python Example: Simple behavior-based detection stub
def detect_suspicious_process(process_list):
suspicious_keywords = ["keylogger", "proxy", "backdoor"]
alerts = []
for proc in process_list:
if any(word in proc.lower() for word in suspicious_keywords):
alerts.append(f"Suspicious process detected: {proc}")
return alerts
running_processes = ["chrome.exe", "KeyLogger.exe", "explorer.exe"]
alerts = detect_suspicious_process(running_processes)
for alert in alerts:
print(alert)
# Python Example: Simple IOC ingestion simulation
def ingest_iocs(ioc_feed):
iocs = []
for ioc in ioc_feed:
iocs.append({
"type": ioc["type"],
"value": ioc["value"],
"confidence": ioc.get("confidence", "medium")
})
return iocs
sample_ioc_feed = [
{"type": "ip", "value": "203.0.113.45", "confidence": "high"},
{"type": "domain", "value": "malicious-site.com"}
]
ingested_iocs = ingest_iocs(sample_ioc_feed)
for ioc in ingested_iocs:
print(ioc)
# Python Example: Parse AWS CloudTrail logs for failed login attempts
import json
def parse_cloudtrail_logs(logs):
failed_logins = []
for event in logs["Records"]:
if event["eventName"] == "ConsoleLogin" and event["responseElements"] is None:
failed_logins.append({
"user": event["userIdentity"]["userName"],
"time": event["eventTime"],
"source_ip": event["sourceIPAddress"]
})
return failed_logins
sample_logs = {
"Records": [
{
"eventName": "ConsoleLogin",
"responseElements": None,
"userIdentity": {"userName": "admin"},
"eventTime": "2025-07-27T13:45:00Z",
"sourceIPAddress": "198.51.100.10"
},
{
"eventName": "ConsoleLogin",
"responseElements": {"Login": "Success"},
"userIdentity": {"userName": "dev_user"},
"eventTime": "2025-07-27T14:00:00Z",
"sourceIPAddress": "198.51.100.20"
}
]
}
failed = parse_cloudtrail_logs(sample_logs)
for f in failed:
print(f"Failed login by {f['user']} from IP {f['source_ip']} at {f['time']}")