snowflake-core-workflow-b
Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".
What this skill does
# Snowflake Core Workflow B — Data Transformation
## Overview
Build ELT pipelines using streams (change data capture), tasks (scheduling), and dynamic tables (declarative transforms).
## Prerequisites
- Data loaded into Snowflake (via `snowflake-core-workflow-a`)
- Understanding of ELT vs ETL patterns
- Role with `CREATE TASK`, `CREATE STREAM` privileges
## Instructions
### Step 1: Create a Stream for Change Data Capture
```sql
-- Track changes on the raw orders table
CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders
APPEND_ONLY = FALSE;
-- Append-only stream (lighter weight, inserts only)
CREATE OR REPLACE STREAM events_stream ON TABLE raw_events
APPEND_ONLY = TRUE;
-- Check what's changed since last consumption
SELECT * FROM orders_stream;
-- METADATA$ACTION = 'INSERT' | 'DELETE'
-- METADATA$ISUPDATE = TRUE if row is part of an UPDATE
-- METADATA$ROW_ID = unique row identifier
```
### Step 2: Create a Task to Process Stream Data
```sql
-- Transform task runs when stream has data
CREATE OR REPLACE TASK transform_orders
WAREHOUSE = TRANSFORM_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
MERGE INTO dim_orders AS target
USING (
SELECT
order_id,
customer_id,
amount::DECIMAL(12,2) AS amount,
order_date::TIMESTAMP_NTZ AS order_date,
CASE
WHEN amount >= 1000 THEN 'high_value'
WHEN amount >= 100 THEN 'medium_value'
ELSE 'standard'
END AS order_tier,
CURRENT_TIMESTAMP() AS processed_at
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
target.amount = source.amount,
target.order_tier = source.order_tier,
target.processed_at = source.processed_at
WHEN NOT MATCHED THEN INSERT
(order_id, customer_id, amount, order_date, order_tier, processed_at)
VALUES
(source.order_id, source.customer_id, source.amount,
source.order_date, source.order_tier, source.processed_at);
-- Enable the task
ALTER TASK transform_orders RESUME;
```
### Step 3: Build a Task DAG (Directed Acyclic Graph)
```sql
-- Root task: aggregate daily metrics
CREATE OR REPLACE TASK daily_metrics_root
WAREHOUSE = TRANSFORM_WH
SCHEDULE = 'USING CRON 0 6 * * * America/New_York'
AS
INSERT INTO daily_order_metrics
SELECT
CURRENT_DATE() - 1 AS metric_date,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM dim_orders
WHERE order_date >= CURRENT_DATE() - 1
AND order_date < CURRENT_DATE();
-- Child task: runs after root completes
CREATE OR REPLACE TASK update_customer_segments
WAREHOUSE = TRANSFORM_WH
AFTER daily_metrics_root
AS
MERGE INTO customer_segments AS target
USING (
SELECT customer_id,
COUNT(*) AS order_count,
SUM(amount) AS lifetime_value,
CASE
WHEN SUM(amount) >= 10000 THEN 'platinum'
WHEN SUM(amount) >= 5000 THEN 'gold'
WHEN SUM(amount) >= 1000 THEN 'silver'
ELSE 'bronze'
END AS segment
FROM dim_orders GROUP BY customer_id
) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET
target.order_count = source.order_count,
target.lifetime_value = source.lifetime_value,
target.segment = source.segment
WHEN NOT MATCHED THEN INSERT VALUES
(source.customer_id, source.order_count, source.lifetime_value, source.segment);
-- Resume tasks (children first, then root)
ALTER TASK update_customer_segments RESUME;
ALTER TASK daily_metrics_root RESUME;
```
### Step 4: Dynamic Tables (Declarative Alternative)
```sql
-- Auto-refreshes based on target freshness — no streams/tasks needed
CREATE OR REPLACE DYNAMIC TABLE customer_360
TARGET_LAG = '10 minutes'
WAREHOUSE = TRANSFORM_WH
AS
SELECT
c.customer_id, c.name, c.email,
COUNT(o.order_id) AS total_orders,
COALESCE(SUM(o.amount), 0) AS lifetime_value,
MAX(o.order_date) AS last_order_date,
DATEDIFF('day', MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
FROM customers c
LEFT JOIN dim_orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.email;
-- Monitor refresh status
SELECT name, target_lag, refresh_mode, scheduling_state
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'CUSTOMER_360';
```
### Step 5: Monitor Pipelines
```sql
-- Task run history
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD(hours, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Find failed runs
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
AND scheduled_time >= DATEADD(hours, -24, CURRENT_TIMESTAMP());
-- Stream lag check — if STALE = TRUE, data may be lost
SHOW STREAMS LIKE 'orders_stream';
```
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `Task is suspended` | Not resumed after creation | `ALTER TASK x RESUME` |
| `Stream is stale` | Data retention exceeded | Recreate stream; increase `DATA_RETENTION_TIME_IN_DAYS` |
| `Warehouse does not exist` | Wrong warehouse in task | Verify warehouse name |
| `MERGE: duplicate rows` | Non-unique join key | Add dedup CTE before MERGE |
| `Dynamic table refresh failed` | Source schema changed | Check upstream table definitions |
## Resources
- [Streams and Tasks](https://docs.snowflake.com/en/user-guide/data-pipelines-intro)
- [Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-about)
- [CREATE TASK](https://docs.snowflake.com/en/sql-reference/sql/create-task)
## Next Steps
For common errors and troubleshooting, see `snowflake-common-errors`.
Related in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.