building-automated-malware-submission-pipeline
Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration. Use when SOC teams need to scale malware analysis beyond manual sandbox submissions for high-volume alert triage.
What this skill does
# Building Automated Malware Submission Pipeline
## When to Use
Use this skill when:
- SOC teams face high volume of suspicious file alerts requiring sandbox analysis
- Manual sandbox submission creates bottlenecks in alert triage workflow
- Endpoint and email security tools quarantine files needing automated verdict determination
- Incident response requires rapid malware family identification and IOC extraction
**Do not use** for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.
## Prerequisites
- Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
- VirusTotal API key (Enterprise for submission, free for lookup)
- MalwareBazaar API access for known malware lookup
- File collection mechanism: EDR quarantine API, email gateway export, network capture
- Python 3.8+ with `requests`, `vt-py`, `pefile` libraries
- Isolated analysis network with no production connectivity
## Workflow
### Step 1: Build File Collection Pipeline
Collect suspicious files from multiple sources:
```python
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime
class MalwareCollector:
def __init__(self, quarantine_dir="/opt/malware_quarantine"):
self.quarantine_dir = Path(quarantine_dir)
self.quarantine_dir.mkdir(exist_ok=True)
def collect_from_edr(self, edr_api_url, api_token):
"""Pull quarantined files from CrowdStrike Falcon"""
headers = {"Authorization": f"Bearer {api_token}"}
# Get recent quarantine events
response = requests.get(
f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
headers=headers,
params={"filter": "state:'quarantined'", "limit": 50}
)
file_ids = response.json()["resources"]
for file_id in file_ids:
# Download quarantined file
dl_response = requests.get(
f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
headers=headers,
params={"ids": file_id}
)
file_data = dl_response.content
sha256 = hashlib.sha256(file_data).hexdigest()
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(file_data)
yield {"sha256": sha256, "path": str(filepath), "source": "edr"}
def collect_from_email_gateway(self, smtp_quarantine_path):
"""Pull attachments from email gateway quarantine"""
import email
from email import policy
for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
msg = email.message_from_binary_file(
eml_file.open("rb"), policy=policy.default
)
for attachment in msg.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
sha256 = hashlib.sha256(content).hexdigest()
filename = attachment.get_filename() or "unknown"
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(content)
yield {
"sha256": sha256,
"path": str(filepath),
"source": "email",
"original_filename": filename,
"sender": msg["From"],
"subject": msg["Subject"]
}
def compute_hashes(self, filepath):
"""Calculate MD5, SHA1, SHA256 for a file"""
with open(filepath, "rb") as f:
content = f.read()
return {
"md5": hashlib.md5(content).hexdigest(),
"sha1": hashlib.sha1(content).hexdigest(),
"sha256": hashlib.sha256(content).hexdigest(),
"size": len(content)
}
```
### Step 2: Pre-Screen with Hash Lookups
Check if the file is already known before sandbox submission:
```python
import vt
class MalwarePreScreener:
def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
self.vt_client = vt.Client(vt_api_key)
self.mb_api_url = mb_api_url
def check_virustotal(self, sha256):
"""Lookup hash in VirusTotal"""
try:
file_obj = self.vt_client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats
return {
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"type": getattr(file_obj, "type_description", "Unknown")
}
except vt.APIError:
return {"found": False}
def check_malwarebazaar(self, sha256):
"""Lookup hash in MalwareBazaar"""
response = requests.post(
self.mb_api_url,
data={"query": "get_info", "hash": sha256}
)
data = response.json()
if data["query_status"] == "ok":
entry = data["data"][0]
return {
"found": True,
"signature": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
return {"found": False}
def pre_screen(self, sha256):
"""Run all pre-screening checks"""
vt_result = self.check_virustotal(sha256)
mb_result = self.check_malwarebazaar(sha256)
verdict = "UNKNOWN"
if vt_result["found"] and vt_result.get("malicious", 0) > 10:
verdict = "KNOWN_MALICIOUS"
elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
verdict = "LIKELY_CLEAN"
return {
"sha256": sha256,
"virustotal": vt_result,
"malwarebazaar": mb_result,
"pre_screen_verdict": verdict,
"needs_sandbox": verdict == "UNKNOWN"
}
def close(self):
self.vt_client.close()
```
### Step 3: Submit to Sandbox for Dynamic Analysis
**Cuckoo Sandbox Submission:**
```python
class SandboxSubmitter:
def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
self.cuckoo_url = cuckoo_url
def submit_to_cuckoo(self, filepath, timeout=300):
"""Submit file to Cuckoo Sandbox"""
with open(filepath, "rb") as f:
response = requests.post(
f"{self.cuckoo_url}/tasks/create/file",
files={"file": f},
data={
"timeout": timeout,
"options": "procmemdump=yes,route=none",
"priority": 2,
"machine": "win10_x64"
}
)
task_id = response.json()["task_id"]
return task_id
def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
"""Wait for sandbox analysis to complete"""
import time
elapsed = 0
while elapsed < max_wait:
response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
status = response.json()["task"]["status"]
if status == "reported":
return self.get_report(task_id)
elif status == "failed_analysis":
return {"error": "Analysis failed"}
time.sleep(poll_interval)
elapsed += poll_interval
return {"error": "Analysis timed out"}
def get_report(self, task_id):
"""Retrieve analysis report"""
response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
report =Related in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.