detecting-anomalies-in-industrial-control-systems▌
mukul975/Anthropic-Cybersecurity-Skills · updated May 25, 2026
MDX-style export adds YAML metadata + attribution linking explainx.ai and this canonical listing URL.
This skill covers deploying anomaly detection systems for industrial control environments using machine learning models trained on OT network baselines, physics-based process models, and behavioral analysis of industrial protocol communications. It addresses building normal behavior profiles for SCADA polling patterns, detecting deviations in Modbus/DNP3/OPC UA traffic, identifying rogue devices, and correlating network anomalies with physical process data from historians.
| name | detecting-anomalies-in-industrial-control-systems |
| description | 'This skill covers deploying anomaly detection systems for industrial control environments using machine learning models trained on OT network baselines, physics-based process models, and behavioral analysis of industrial protocol communications. It addresses building normal behavior profiles for SCADA polling patterns, detecting deviations in Modbus/DNP3/OPC UA traffic, identifying rogue devices, and correlating network anomalies with physical process data from historians. ' |
| domain | cybersecurity |
| subdomain | ot-ics-security |
| tags | - ot-security - ics - scada - industrial-control - iec62443 - anomaly-detection - machine-learning |
| version | 1.0.0 |
| author | mahipal |
| license | Apache-2.0 |
| atlas_techniques | - AML.T0043 - AML.T0018 |
| nist_ai_rmf | - MEASURE-2.7 - MEASURE-2.5 - MAP-5.1 |
| nist_csf | - PR.IR-01 - DE.CM-01 - ID.AM-05 - GV.OC-02 |
Detecting Anomalies in Industrial Control Systems
When to Use
- When deploying continuous monitoring for OT environments that lack intrusion detection
- When building behavior-based detection to complement signature-based IDS in OT networks
- When establishing baselines for deterministic SCADA communications to detect deviations
- When integrating machine learning anomaly detection with OT security monitoring platforms
- When investigating alerts from Nozomi Guardian or Dragos Platform that require deeper analysis
Do not use for signature-based detection of known exploits (see detecting-attacks-on-scada-systems), for IT network anomaly detection without OT protocols, or as a replacement for process safety systems (SIS).
Prerequisites
- Passive network monitoring sensors on OT network SPAN/TAP ports
- Minimum 2-4 weeks of baseline traffic capture during normal operations
- Python 3.9+ with scikit-learn, numpy, pandas for ML model training
- Process historian access for physical process correlation data
- Understanding of normal operational patterns including shift changes, batch processes, and maintenance windows
Workflow
Step 1: Build Multi-Dimensional Baseline Model
Capture and model the deterministic behavior of ICS communications across multiple dimensions: timing, protocol behavior, and network topology.
#!/usr/bin/env python3
"""ICS Anomaly Detection System.
Builds multi-dimensional baselines from OT network traffic and
detects anomalies using statistical and machine learning methods.
Designed for deterministic SCADA communication patterns.
"""
import json
import sys
import time
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
warnings.filterwarnings("ignore")
@dataclass
class CommunicationProfile:
"""Profile for a single master-slave communication pair."""
src_ip: str
dst_ip: str
protocol: str
port: int
avg_interval_ms: float = 0.0
std_interval_ms: float = 0.0
avg_payload_size: float = 0.0
function_codes: dict = field(default_factory=dict)
packets_per_minute: float = 0.0
first_seen: str = ""
last_seen: str = ""
class ICSAnomalyDetector:
"""Multi-dimensional anomaly detection for ICS environments."""
def __init__(self):
self.profiles = {}
self.topology_baseline = set()
self.timing_model = None
self.isolation_forest = None
self.scaler = StandardScaler()
self.anomalies = []
self.training_data = []
def build_baseline_from_pcap(self, pcap_data):
"""Build baselines from parsed pcap data (list of flow records)."""
print("[*] Building ICS communication baselines...")
for flow in pcap_data:
key = f"{flow['src']}->{flow['dst']}:{flow['port']}"
if key not in self.profiles:
self.profiles[key] = CommunicationProfile(
src_ip=flow["src"],
dst_ip=flow["dst"],
protocol=flow.get("protocol", "TCP"),
port=flow["port"],
first_seen=flow.get("timestamp", ""),
)
profile = self.profiles[key]
profile.last_seen = flow.get("timestamp", "")
# Track function codes for industrial protocols
fc = flow.get("function_code")
if fc is not None:
profile.function_codes[fc] = profile.function_codes.get(fc, 0) + 1
# Add to topology baseline
self.topology_baseline.add((flow["src"], flow["dst"], flow["port"]))
# Calculate interval statistics
self._calculate_timing_stats(pcap_data)
print(f" Communication pairs: {len(self.profiles)}")
print(f" Topology entries: {len(self.topology_baseline)}")
def _calculate_timing_stats(self, flows):
"""Calculate packet timing statistics per communication pair."""
timestamps = defaultdict(list)
for flow in flows:
key = f"{flow['src']}->{flow['dst']}:{flow['port']}"
ts = flow.get("timestamp_epoch")
if ts:
timestamps[key].append(ts)
for key, ts_list in timestamps.items():
if key in self.profiles and len(ts_list) > 1:
ts_sorted = sorted(ts_list)
intervals = [
(ts_sorted[i+1] - ts_sorted[i]) * 1000
for i in range(len(ts_sorted) - 1)
]
self.profiles[key].avg_interval_ms = np.mean(intervals)
self.profiles[key].std_interval_ms = np.std(intervals)
duration_min = (ts_sorted[-1] - ts_sorted[0]) / 60
if duration_min > 0:
self.profiles[key].packets_per_minute = len(ts_list) / duration_min
def train_isolation_forest(self, features_df):
"""Train Isolation Forest model on feature vectors from baseline traffic."""
print("[*] Training Isolation Forest model...")
feature_cols = [
"interval_ms", "payload_size", "packets_per_window",
"unique_func_codes", "new_connection_flag",
]
available_cols = [c for c in feature_cols if c in features_df.columns]
X = features_df[available_cols].fillna(0).values
X_scaled = self.scaler.fit_transform(X)
self.isolation_forest = IsolationForest(
n_estimators=200,
contamination=0.01, # Expect 1% anomaly rate in baseline
random_state=42,
n_jobs=-1,
)
self.isolation_forest.fit(X_scaled)
scores = self.isolation_forest.decision_function(X_scaled)
print(f" Model trained on {len(X)} samples")
print(f" Anomaly score range: [{scores.min():.4f}, {scores.max():.4f}]")
print(f" Threshold: {np.percentile(scores, 1):.4f}")
def detect_topology_anomaly(self, src_ip, dst_ip, port):
"""Detect new/unauthorized communication pairs."""
if (src_ip, dst_ip, port) not in self.topology_baseline:
return {
"type": "NEW_COMMUNICATION_PAIR",
"severity": "high",
"detail": f"New connection: {src_ip} -> {dst_ip}:{port} not in baseline",
"recommendation": "Verify if this is an authorized new device or configuration change",
}
return None
def detect_timing_anomaly(self, src_ip, dst_ip, port, interval_ms):
"""Detect polling interval deviations."""
key = f"{src_ip}->{dst_ip}:{port}"
profile = self.profiles.get(key)
if profile and profile.std_interval_ms > 0:
z_score = abs(interval_ms - profile.avg_interval_ms) / profile.std_interval_ms
if z_score > 4.0:
return {
"type": "TIMING_ANOMALY",
"severity": "medium",
"detail": (
f"Interval {interval_ms:.1f}ms deviates from baseline "
f"{profile.avg_interval_ms:.1f}ms (z-score: {z_score:.1f})"
),
"recommendation": "Check for network congestion, device malfunction, or MITM attack",
}
return None
def detect_function_code_anomaly(self, src_ip, dst_ip, port, func_code):
"""Detect unauthorized Modbus/DNP3 function codes."""
key = f"{src_ip}->{dst_ip}:{port}"
profile = self.profiles.get(key)
if profile and func_code not in profile.function_codes:
severity = "critical" if func_code in {5, 6, 15, 16, 8} else "high"
return {
"type": "UNAUTHORIZED_FUNCTION_CODE",
"severity": severity,
"detail": (
f"Function code {func_code} from {src_ip} to {dst_ip}:{port} "
f"not in baseline. Allowed: {list(profile.function_codes.keys())}"
),
"recommendation": "Investigate source - possible command injection attack",
}
return None
def analyze_flow(self, flow):
"""Analyze a single network flow against all detection models."""
results = []
# Topology check
topo = self.detect_topology_anomaly(flow["src"], flow["dst"], flow["port"])
if topo:
results.append(topo)
# Timing check
if "interval_ms" in flow:
timing = self.detect_timing_anomaly(
flow["src"], flow["dst"], flow["port"], flow["interval_ms"])
if timing:
results.append(timing)
# Function code check
if "function_code" in flow:
fc = self.detect_function_code_anomaly(
flow["src"], flow["dst"], flow["port"], flow["function_code"])
if fc:
results.append(fc)
self.anomalies.extend(results)
return results
def generate_report(self):
"""Generate anomaly detection report."""
print(f"\n{'='*60}")
print(f"ICS ANOMALY DETECTION REPORT")
print(f"{'='*60}")
print(f"Baseline Profiles: {len(self.profiles)}")
print(f"Anomalies Detected: {len(self.anomalies)}")
severity_counts = defaultdict(int)
for a in self.anomalies:
severity_counts[a["severity"]] += 1
for sev in ["critical", "high", "medium", "low"]:
if severity_counts[sev]:
print(f" {sev.upper()}: {severity_counts[sev]}")
for a in self.anomalies[:20]:
print(f"\n [{a['severity'].upper()}] {a['type']}")
print(f" {a['detail']}")
if __name__ == "__main__":
print("ICS Anomaly Detection System")
print("Load baseline data and call analyze_flow() for real-time detection")
Key Concepts
| Term | Definition |
|---|---|
| Deterministic Traffic | ICS networks exhibit highly predictable communication patterns where the same master polls the same slaves at fixed intervals with identical function codes |
| Isolation Forest | Unsupervised machine learning algorithm that isolates anomalies by randomly partitioning feature space, effective for OT traffic with low anomaly rates |
| Polling Interval | Time between consecutive SCADA master requests to a slave device, typically fixed and configurable (100ms to 10s) |
| Function Code Allowlist | Set of permitted industrial protocol operations for each communication pair, enforced by anomaly detection rules |
| Topology Baseline | Complete map of all authorized device-to-device communication paths in the OT network |
| Physics-Based Detection | Using physical process models (thermodynamics, fluid dynamics) to detect attacks that manipulate the process while spoofing sensor data |
Tools & Systems
- Nozomi Networks Guardian: OT anomaly detection with AI-powered baseline learning and industrial protocol analysis
- Dragos Platform: Threat detection using behavioral analytics and threat intelligence specific to ICS environments
- Scikit-learn: Python ML library with Isolation Forest, One-Class SVM, and Local Outlier Factor for anomaly detection
- Zeek with OT plugins: Network security monitor with Modbus, DNP3, and BACnet protocol analyzers for baseline building
Output Format
ICS Anomaly Detection Report
==============================
Detection Period: YYYY-MM-DD to YYYY-MM-DD
Baseline Size: [N] communication profiles
ANOMALIES DETECTED: [N]
Critical: [N] High: [N] Medium: [N] Low: [N]
[SEVERITY] ANOMALY_TYPE
Source: [IP] -> Target: [IP]:[Port]
Detail: [Description of deviation from baseline]
Baseline: [Expected behavior]
Observed: [Actual behavior]
How to use detecting-anomalies-in-industrial-control-systems on Cursor
AI-first code editor with Composer
Prerequisites
Before installing skills in Cursor, ensure your development environment meets these requirements:
- ›Cursor installed and configured on your development machine
- ›Node.js version 16.0+ with npm package manager (verify with
node --version) - ›Active project directory or workspace where you want to add detecting-anomalies-in-industrial-control-systems
Execute installation command
Execute the skills CLI command in your project's root directory to begin installation:
The skills CLI fetches detecting-anomalies-in-industrial-control-systems from GitHub repository mukul975/Anthropic-Cybersecurity-Skills and configures it for Cursor.
Select Cursor when prompted
The CLI will show a list of available agents. Use arrow keys to navigate and space to select Cursor:
Verify installation
Confirm successful installation by checking the skill directory location:
Reload or restart Cursor to activate detecting-anomalies-in-industrial-control-systems. Access the skill through slash commands (e.g., /detecting-anomalies-in-industrial-control-systems) or your agent's skill management interface.
Security & Verification Notice
We perform automated surface-level scans (Gen AI Scanner, Socket, Snyk) during installation. These checks detect common vulnerabilities but do not guarantee complete security. Always review skill source code and verify the publisher's reputation before production use.
Skills execute code in your development environment. Always verify the publisher's identity, review recent commits, and test in isolated environments before production deployment.
List & Monetize Your Skill
Submit your Claude Code skill and start earning
Use Cases▌
Task Automation & Efficiency
Automate repetitive workflows and reduce manual effort
Example
Generate reports, summarize documents, draft communications
Save 3-5 hours per week on routine tasks
Knowledge Enhancement
Learn new skills, understand complex topics, get expert guidance
Example
Explain concepts, provide examples, suggest learning resources
Accelerate learning and skill development by 2x
Quality Improvement
Enhance output quality through reviews, suggestions, and refinements
Example
Review drafts, suggest improvements, catch errors
Improve work quality by 30-40% with less effort
Implementation Guide▌
Prerequisites
- ›Claude Desktop or compatible AI client with skill support
- ›Clear understanding of task or problem to solve
- ›Willingness to iterate and refine outputs
Time Estimate
15-45 minutes depending on use case complexity
Installation Steps
- 1.Install skill using provided installation command
- 2.Test with simple use case relevant to your work
- 3.Evaluate output quality and relevance
- 4.Iterate on prompts to improve results
- 5.Integrate into regular workflow if valuable
Common Pitfalls
- ⚠Expecting perfect results without iteration
- ⚠Not providing enough context in prompts
- ⚠Using skill for tasks outside its intended scope
- ⚠Accepting outputs without review and validation
Best Practices▌
✓ Do
- +Start with clear, specific prompts
- +Provide relevant context and constraints
- +Review and refine all outputs before using
- +Iterate to improve output quality
- +Document successful prompt patterns
✗ Don't
- −Don't use without understanding skill limitations
- −Don't skip validation of outputs
- −Don't share sensitive information in prompts
- −Don't expect skill to replace human judgment
💡 Pro Tips
- ★Be specific about desired format and style
- ★Ask for multiple options to choose from
- ★Request explanations to understand reasoning
- ★Combine AI efficiency with human expertise
When to Use This▌
✓ Use When
Use when skill capabilities match your task, clear ROI on time saved, and you can validate outputs. Best for repetitive tasks, learning, and quality improvement.
✗ Avoid When
Avoid when task requires deep expertise you can't validate, involves sensitive decisions, or when learning process is more valuable than speed of completion.
Learning Path▌
- 1Familiarize yourself with skill capabilities and limitations
- 2Start with low-risk, non-critical tasks
- 3Progress to more complex and valuable use cases
- 4Build expertise through regular use and experimentation
Discussion
Product Hunt–style comments (not star reviews)- No comments yet — start the thread.
Ratings
4.5★★★★★63 reviews- ★★★★★Emma Park· Dec 24, 2024
I recommend detecting-anomalies-in-industrial-control-systems for anyone iterating fast on agent tooling; clear intent and a small, reviewable surface area.
- ★★★★★Sofia Mehta· Dec 12, 2024
Registry listing for detecting-anomalies-in-industrial-control-systems matched our evaluation — installs cleanly and behaves as described in the markdown.
- ★★★★★Camila Wang· Dec 8, 2024
Useful defaults in detecting-anomalies-in-industrial-control-systems — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.
- ★★★★★Hassan Dixit· Nov 27, 2024
Registry listing for detecting-anomalies-in-industrial-control-systems matched our evaluation — installs cleanly and behaves as described in the markdown.
- ★★★★★Yash Thakker· Nov 15, 2024
detecting-anomalies-in-industrial-control-systems is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.
- ★★★★★Emma Ndlovu· Nov 15, 2024
detecting-anomalies-in-industrial-control-systems reduced setup friction for our internal harness; good balance of opinion and flexibility.
- ★★★★★Dev Mensah· Nov 11, 2024
detecting-anomalies-in-industrial-control-systems is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.
- ★★★★★Emma Lopez· Nov 7, 2024
Keeps context tight: detecting-anomalies-in-industrial-control-systems is the kind of skill you can hand to a new teammate without a long onboarding doc.
- ★★★★★Harper Okafor· Nov 3, 2024
Useful defaults in detecting-anomalies-in-industrial-control-systems — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.
- ★★★★★Diya Nasser· Oct 26, 2024
detecting-anomalies-in-industrial-control-systems is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.
showing 1-10 of 63