AugmentClaude

Snowflake SDK Patterns

Build production-ready Snowflake connections with pooling and async wrappers.

Installation

  1. Make sure Claude is on your device and in your terminal.

    Skills load from ~/.claude/skills/ when Claude Code starts up — so you need it on your machine first. If you don't have it yet, install it once with the command below, then run claude in any terminal to verify.

    One-time setup
    npm i -g @anthropic-ai/claude-code

    Already have it? Skip ahead.

  2. Paste into Claude Code or into your terminal.

    This copies the whole skill folder into ~/.claude/skills/snowflake-sdk-patterns-jeremylongshore/ — the SKILL.md plus any scripts, reference docs, or templates the skill ships with. Safe default: works for every skill.

    Faster alternative (instruction-only skills)

    Skips the clone and grabs only the SKILL.md file. Don't use this if the skill ships Python scripts, reference markdowns, or asset templates — they won't be downloaded and the skill will fail when it tries to load them.

    Quick install (SKILL.md only)
    Sign up to copy
  3. Restart Claude Code.

    Quit and reopen Claude Code (or any other agent that loads from ~/.claude/skills/). New skills are picked up on startup.

  4. Just ask Claude.

    Skills auto-activate when your request matches the skill's description — no slash command needed. Trigger phrases live in the skill's own frontmatter; you can read them in the “What this skill does” section above.

Prefer to read the source first? Open on GitHub.

When Claude uses it

Apply production-ready Snowflake SDK patterns for snowflake-sdk and snowflake-connector-python. Use when implementing connection pooling, async execute wrappers, streaming results, or establishing team coding standards for Snowflake. Trigger with phrases like "snowflake SDK patterns", "snowflake best practices", "snowflake code patterns", "idiomatic snowflake", "snowflake connection pool".

What this skill does

Snowflake SDK Patterns

Overview

Production-ready patterns for snowflake-sdk (Node.js) and snowflake-connector-python using real driver APIs.

Prerequisites

  • Completed snowflake-install-auth setup
  • Understanding of callback-to-promise conversion patterns
  • Familiarity with Snowflake's callback-based Node.js API

Instructions

Step 1: Connection Pool (Node.js)

// src/snowflake/pool.ts
import snowflake from 'snowflake-sdk';

interface PoolConfig {
  max: number;
  idleTimeoutMs: number;
}

class SnowflakePool {
  private pool: snowflake.Connection[] = [];
  private available: snowflake.Connection[] = [];
  private waiting: ((conn: snowflake.Connection) => void)[] = [];
  private config: PoolConfig;

  constructor(
    private connConfig: snowflake.ConnectionOptions,
    config: Partial<PoolConfig> = {}
  ) {
    this.config = { max: 10, idleTimeoutMs: 60000, ...config };
  }

  async acquire(): Promise<snowflake.Connection> {
    // Return available connection
    if (this.available.length > 0) {
      return this.available.pop()!;
    }
    // Create new if under limit
    if (this.pool.length < this.config.max) {
      const conn = snowflake.createConnection(this.connConfig);
      await new Promise<void>((resolve, reject) => {
        conn.connect((err) => (err ? reject(err) : resolve()));
      });
      this.pool.push(conn);
      return conn;
    }
    // Wait for one to become available
    return new Promise((resolve) => {
      this.waiting.push(resolve);
    });
  }

  release(conn: snowflake.Connection): void {
    if (this.waiting.length > 0) {
      const next = this.waiting.shift()!;
      next(conn);
    } else {
      this.available.push(conn);
    }
  }

  async withConnection<T>(fn: (conn: snowflake.Connection) => Promise<T>): Promise<T> {
    const conn = await this.acquire();
    try {
      return await fn(conn);
    } finally {
      this.release(conn);
    }
  }
}

// Singleton pool
export const pool = new SnowflakePool({
  account: process.env.SNOWFLAKE_ACCOUNT!,
  username: process.env.SNOWFLAKE_USER!,
  password: process.env.SNOWFLAKE_PASSWORD!,
  warehouse: process.env.SNOWFLAKE_WAREHOUSE || 'COMPUTE_WH',
  database: process.env.SNOWFLAKE_DATABASE!,
  schema: process.env.SNOWFLAKE_SCHEMA || 'PUBLIC',
});

Step 2: Promise-Based Query Helper

// src/snowflake/query.ts
import snowflake from 'snowflake-sdk';

interface QueryResult<T = Record<string, any>> {
  rows: T[];
  statement: snowflake.Statement;
  sqlText: string;
}

export function query<T = Record<string, any>>(
  conn: snowflake.Connection,
  sqlText: string,
  binds?: snowflake.Binds
): Promise<QueryResult<T>> {
  return new Promise((resolve, reject) => {
    conn.execute({
      sqlText,
      binds,
      complete: (err, stmt, rows) => {
        if (err) {
          reject(Object.assign(err, { sqlText }));
        } else {
          resolve({ rows: (rows || []) as T[], statement: stmt, sqlText });
        }
      },
    });
  });
}

// Multi-statement execution
export async function multiQuery(
  conn: snowflake.Connection,
  statements: string[]
): Promise<QueryResult[]> {
  const results: QueryResult[] = [];
  for (const sql of statements) {
    results.push(await query(conn, sql));
  }
  return results;
}

Step 3: Streaming for Large Result Sets

// src/snowflake/stream.ts
export async function* streamQuery<T = Record<string, any>>(
  conn: snowflake.Connection,
  sqlText: string,
  binds?: snowflake.Binds
): AsyncGenerator<T> {
  const stmt = await new Promise<snowflake.Statement>((resolve, reject) => {
    conn.execute({
      sqlText,
      binds,
      streamResult: true,
      complete: (err, stmt) => {
        if (err) reject(err);
        else resolve(stmt);
      },
    });
  });

  const stream = stmt.streamRows();
  for await (const row of stream) {
    yield row as T;
  }
}

// Usage: process millions of rows without memory pressure
// for await (const row of streamQuery(conn, 'SELECT * FROM big_table')) {
//   await processRow(row);
// }

Step 4: Python Context Manager Pattern

# src/snowflake_pool.py
import snowflake.connector
from contextlib import contextmanager
from typing import Generator, Any

class SnowflakePool:
    def __init__(self, **conn_params):
        self._params = conn_params

    @contextmanager
    def connection(self) -> Generator[snowflake.connector.SnowflakeConnection, None, None]:
        conn = snowflake.connector.connect(**self._params)
        try:
            yield conn
        finally:
            conn.close()

    @contextmanager
    def cursor(self) -> Generator[snowflake.connector.cursor.SnowflakeCursor, None, None]:
        with self.connection() as conn:
            cur = conn.cursor()
            try:
                yield cur
            finally:
                cur.close()

    def execute(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
        with self.cursor() as cur:
            cur.execute(sql, params)
            columns = [desc[0] for desc in cur.description] if cur.description else []
            return [dict(zip(columns, row)) for row in cur.fetchall()]

    def execute_many(self, sql: str, params_list: list[tuple]) -> int:
        """Batch insert — much faster than individual inserts."""
        with self.cursor() as cur:
            cur.executemany(sql, params_list)
            return cur.rowcount

# Usage
pool = SnowflakePool(
    account=os.environ['SNOWFLAKE_ACCOUNT'],
    user=os.environ['SNOWFLAKE_USER'],
    password=os.environ['SNOWFLAKE_PASSWORD'],
    warehouse='COMPUTE_WH',
    database='MY_DB',
    schema='PUBLIC',
)

users = pool.execute("SELECT * FROM users WHERE status = %s", ('active',))

Step 5: Error Handling Wrapper

// src/snowflake/errors.ts
export class SnowflakeQueryError extends Error {
  constructor(
    message: string,
    public readonly sqlState: string,
    public readonly code: number,
    public readonly sqlText: string,
    public readonly retryable: boolean
  ) {
    super(message);
    this.name = 'SnowflakeQueryError';
  }
}

const RETRYABLE_CODES = new Set([
  390114, // Connection token expired — reconnect
  390503, // Service unavailable
]);

export async function safeQuery<T>(
  conn: snowflake.Connection,
  sqlText: string,
  binds?: snowflake.Binds,
  maxRetries = 3
): Promise<T[]> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const { rows } = await query<T>(conn, sqlText, binds);
      return rows;
    } catch (err: any) {
      const retryable = RETRYABLE_CODES.has(err.code);
      if (!retryable || attempt === maxRetries) {
        throw new SnowflakeQueryError(
          err.message, err.sqlState, err.code, sqlText, retryable
        );
      }
      const delay = 1000 * Math.pow(2, attempt - 1);
      await new Promise((r) => setTimeout(r, delay));
    }
  }
  throw new Error('Unreachable');
}

Error Handling

PatternUse CaseBenefit
Connection poolHigh concurrency appsReuses connections, prevents exhaustion
Promise wrapperAll Node.js codeClean async/await instead of callbacks
StreamingLarge result sets (>100K rows)Constant memory usage
Context managerAll Python codeGuarantees connection cleanup
Retry with backoffTransient failuresHandles token expiry, service blips

Resources

Next Steps

Apply patterns in snowflake-core-workflow-a for real-world data pipeline usage.

Related skills