SQL Alchemy for pythonic pipelines

I've primarily been developing code, benchmarks, and data tables in Python, as platforms like Snowflake often present quirks and limitations in data processing that make it challenging to produce the desired deliverables. However, with the recent addition of a new Data Director, I've been focusing more on writing SQL queries rather than relying on my usual NumPy and Pandas stack. This shift was motivated by the desire to make the processing code easier to share and understand. Given the complexity of a major project I'm working on, which involves the details I'm outlining here, I feel the need to revisit SQLAlchemy from a beginner's perspective.

What is SQL Alchemy

SQL Alchemy is a Python library that interacts with relational databases using a high-level Pythonic approach. It provides two main components:

  1. Core: A low-level component that offers SQL abstraction and execution.
  2. ORM (Object Relational Mapper): A higher-level abstraction that maps Python classes to database tables, enabling developers to interact with the database in terms of objects rather than raw SQL queries.

SQL Alchemy is one of the most widely used database libraries in Python because of its flexibility, performance, and support for multiple database systems (e.g., SQLite, PostgreSQL, MySQL, Oracle).

Advantages of SQL Alchemy

  1. Database Agnosticism:
    SQLAlchemy supports multiple database engines, so you can easily switch between databases (e.g., SQLite, MySQL, PostgreSQL) by changing the connection string.
  2. ORM Abstraction:
    SQLAlchemy’s ORM lets you work with Python objects instead of writing raw SQL, making the code cleaner and easier to maintain.
  3. SQLAlchemy Core:
    Provides flexibility for advanced users to write custom SQL queries if the ORM does not fit a particular use case.
  4. Ease of Use with Relationships:
    SQLAlchemy makes defining and managing relationships between tables intuitive through Python objects.
  5. Query Composition:
    You can build queries dynamically using Python expressions, making complex query construction more manageable.
  6. Better Security:
    It handles SQL injection prevention automatically by using parameterized queries.
  7. Scalability and Performance:
    SQLAlchemy uses connection pooling and efficient query generation to work well in high-performance scenarios.

Differences Between SQLAlchemy and Straight SQL

FeatureSQLAlchemyStraight SQL
AbstractionProvides Pythonic abstraction over SQL, especially with the ORM.Requires writing raw SQL queries for all interactions.
Database AgnosticismSupports multiple database engines without changing code structure.SQL queries are typically specific to a database engine.
Ease of RelationshipsRelationships between tables are handled with Python objects and defined declaratively.Relationships must be handled explicitly in SQL.
Code ReadabilityQueries are more readable and integrated into Python's syntax.SQL queries are separate and less Pythonic.
Complex Query BuildingQueries can be built dynamically using Python.Queries must be written explicitly in SQL.
SecurityAutomatically escapes parameters to prevent SQL injection.Requires manual handling to prevent SQL injection.
Performance TuningSupports features like connection pooling and lazy loading.Performance tuning requires manual effort and expertise.

Examples: SQLAlchemy vs. Straight SQL

Example 1: Fetching Data

  • Straight SQL
import sqlite3

connection = sqlite3.connect('example.db')
cursor = connection.cursor()

# Execute a raw SQL query
cursor.execute("SELECT * FROM users WHERE age > ?", (25,))
results = cursor.fetchall()

for row in results:
    print(row)

connection.close()
  • SQLAlchemy
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.orm import sessionmaker

# Create database engine and session
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()

# ORM Example
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

# Query using ORM
results = session.query(User).filter(User.age > 25).all()
for user in results:
    print(user.name, user.age)

# Alternatively, using Core
metadata = MetaData(bind=engine)
users_table = Table('users', metadata, autoload_with=engine)
query = users_table.select().where(users_table.c.age > 25)
connection = engine.connect()
results = connection.execute(query)

for row in results:
    print(row)

Example 2: Inserting Data

  • Straight SQL
connection = sqlite3.connect('example.db')
cursor = connection.cursor()

# Insert a record
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 30))
connection.commit()
connection.close()
  • SQLAlchemy
# ORM Example
new_user = User(name='Alice', age=30)
session.add(new_user)
session.commit()

# Alternatively, using Core
insert_query = users_table.insert().values(name='Alice', age=30)
connection = engine.connect()
connection.execute(insert_query)

Example 3: Relationships Between Tables

  • Straight SQL
-- Example SQL
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders (id INTEGER PRIMARY KEY, user_id INTEGER, description TEXT,
                     FOREIGN KEY(user_id) REFERENCES users(id));

SELECT users.name, orders.description
FROM users
JOIN orders ON users.id = orders.user_id;
  • SQLAlchemy
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    description = Column(String)
    user = relationship("User", back_populates="orders")

User.orders = relationship("Order", order_by=Order.id, back_populates="user")

# Querying
results = session.query(User).join(User.orders).filter(Order.description == 'Some order').all()
for user in results:
    print(user.name)

When to Use SQLAlchemy vs. Straight SQL

Use SQLAlchemy:

  • When you need portability across different databases.
  • When the project requires object-oriented, maintainable code.
  • When you’re building complex applications with relationships.

Use Straight SQL:

  • When performance is critical, you can optimize raw queries.
  • For simple scripts or one-off database operations.
  • When you need to execute non-standard SQL specific to the database engine.

SQLAlchemy provides a balance between flexibility and abstraction, making it an excellent choice for most Python projects that involve databases.

Two different paths to SQLAlchemy

SQLAlchemy provides two distinct ways to interact with a database: ORM (Object Relational Mapper) using classes and Core using tables. Both approaches are powerful and suited to different use cases. Here’s an explanation of the differences between declaring classes in SQLAlchemy ORM and using the Core approach, with examples to clarify.

1. Declaring Classes (SQLAlchemy ORM)

When using SQLAlchemy ORM, you define database tables as Python classes. These classes are mapped to database tables whose attributes represent the table’s columns.

How It Works:

  • Classes are declared using the declarative_base() class or the Declarative Base system.
  • Each class represents a table in the database.
  • Relationships between tables are defined using Python attributes and ForeignKey or relationship.

Advantages:

  • Object-Oriented Approach: Work directly with Python objects, which is more intuitive for many developers.
  • Ease of Use: Queries and relationships are handled in terms of objects, simplifying code.
  • Abstraction: Abstracts away SQL queries, making it easier to focus on business logic.

Example:

from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

# Declare a base class
Base = declarative_base()

# Define User and Order classes (tables)
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

    # Relationship to orders
    orders = relationship('Order', back_populates='user')


class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    description = Column(String)

    user = relationship('User', back_populates='orders')

# Create tables and add data
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

new_user = User(name='Alice', orders=[Order(description='Order 1')])
session.add(new_user)
session.commit()

# Querying
user = session.query(User).filter_by(name='Alice').first()
print(user.orders[0].description)  # Output: Order 1

2. Using Core (Table-Based)

In SQLAlchemy Core, you define tables explicitly using the Table class. Instead of objects, you work with raw SQL expressions and the table objects directly.

How It Works:

  • Tables are declared using the Table class.
  • SQL queries are built using SQLAlchemy’s expression language.
  • No direct Python object representation exists for database rows; results are just dictionaries or tuples.

Advantages:

  • Flexibility: Gives you more control over raw SQL queries.
  • Performance: No ORM overhead; you work directly with SQL and table definitions.
  • Minimalism: Useful for simple scripts or scenarios where an ORM is unnecessary.

Example:

from sqlalchemy import Table, Column, Integer, String, ForeignKey, MetaData, create_engine

# Define tables
metadata = MetaData()

users = Table(
    'users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String)
)

orders = Table(
    'orders', metadata,
    Column('id', Integer, primary_key=True),
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('description', String)
)

# Create tables and add data
engine = create_engine('sqlite:///example.db')
metadata.create_all(engine)

# Insert data
with engine.connect() as conn:
    conn.execute(users.insert().values(name='Alice'))
    conn.execute(orders.insert().values(user_id=1, description='Order 1'))

# Querying
with engine.connect() as conn:
    result = conn.execute(users.select().where(users.c.name == 'Alice'))
    for row in result:
        print(row)  # Output: (1, 'Alice')

Key Differences Between Declaring Classes (ORM) and Core

AspectDeclaring Classes (ORM)Core (Table-Based)
RepresentationPython classes represent database tables.Explicit Table objects represent database tables.
Data AccessReturns objects with attributes (e.g., user.name).Returns rows as dictionaries or tuples (e.g., row['name']).
RelationshipsHandled via relationship() and ForeignKey.Handled explicitly via joins in SQL.
Query SyntaxHigh-level, object-oriented queries (e.g., session.query(User).filter(...)).Low-level, SQL-like queries (e.g., users.select().where(...)).
FlexibilityAbstracts SQL; better for simpler or object-heavy use cases.Offers fine-grained control for custom or complex queries.
PerformanceSlightly slower due to ORM overhead.Faster due to the absence of an ORM layer.
Learning CurveEasier for Python developers, but adds abstraction.Requires more knowledge of SQL and the Core API.
Use CaseComplex applications with many relationships.Simple scripts or scenarios requiring precise SQL control.

When to Use Which?

Use ORM (Classes):

  • When your application involves complex relationships between tables.
  • When working with Python objects fits naturally with the problem domain.
  • When you want to reduce boilerplate and make your code more Pythonic.

Use Core:

  • When you need fine-grained control over SQL queries.
  • For performance-critical scenarios where ORM overhead is unacceptable.
  • When you need flexibility to execute complex or database-specific queries.

Blending ORM and Core

You can also mix the two approaches in SQLAlchemy. For example, you might define tables using Core but use ORM for parts of the application.

# Define a table using Core
users = Table(
    'users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String)
)

# Map the table to an ORM class
from sqlalchemy.orm import mapper

class User:
    pass

mapper(User, users)

# Now you can use the ORM with the mapped class
session.add(User(name='Alice'))

Other Industry Standard Data Engines

Using less common data engines like Snowflake or other database systems with SQLAlchemy requires leveraging its database-agnostic architecture. SQLAlchemy supports a wide range of database engines through dialects, which are plugins that enable communication with specific databases. Snowflake, for instance, has a dedicated SQLAlchemy dialect.

Here’s how you can use SQLAlchemy with non-common databases like Snowflake or others, including any nuances or key considerations.

1. Snowflake with SQLAlchemy

Setup

To connect SQLAlchemy to Snowflake, you need the Snowflake SQLAlchemy dialect. Install the required libraries:

pip install snowflake-sqlalchemy

Connection Setup

Here’s how you can connect SQLAlchemy to a Snowflake database:

from sqlalchemy import create_engine

# Snowflake connection string
engine = create_engine(
    'snowflake://{user}:{password}@{account}/{database}/{schema}'.format(
        user='YOUR_USER',
        password='YOUR_PASSWORD',
        account='YOUR_ACCOUNT',
        database='YOUR_DATABASE',
        schema='YOUR_SCHEMA'
    )
)

# Test connection
connection = engine.connect()
result = connection.execute("SELECT CURRENT_VERSION()")
for row in result:
    print(row)
connection.close()

Using ORM with Snowflake

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

# Define a table using ORM
class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    salary = Column(Integer)

# Bind engine and create tables
Base.metadata.create_all(engine)

# Insert data
Session = sessionmaker(bind=engine)
session = Session()
new_employee = Employee(name="Alice", salary=100000)
session.add(new_employee)
session.commit()

# Query data
employees = session.query(Employee).all()
for employee in employees:
    print(employee.name, employee.salary)

Using Core with Snowflake

from sqlalchemy import Table, MetaData, Column, Integer, String

metadata = MetaData()

# Define the table
employees = Table(
    'employees', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('salary', Integer)
)

# Create the table in Snowflake
metadata.create_all(engine)

# Insert data
with engine.connect() as conn:
    conn.execute(employees.insert().values(name='Alice', salary=100000))

# Query data
with engine.connect() as conn:
    results = conn.execute(employees.select())
    for row in results:
        print(row)

2. General Approach for Other Databases

For other databases, SQLAlchemy relies on a specific dialect. Here's how to adapt to various data engines:

Steps:

Install the Dialect for the Database:
Search for a SQLAlchemy-compatible dialect for your database. Many major databases have official dialects, while others have community-maintained ones.

Examples:

  • MongoDB: pip install sqlalchemy-mongo
  • ClickHouse: pip install clickhouse-sqlalchemy
  • BigQuery: pip install pybigquery
  1. Find the Connection String Format:
    Check the dialect documentation for the correct connection string format. Each database has unique requirements, such as authentication details or specific parameters.
  2. Use SQLAlchemy's Core or ORM:
    Use the same create_engine() syntax and define tables or ORM classes as needed.

Examples of Other Databases

MongoDB (with SQLAlchemy-mongo)

While MongoDB is a NoSQL database, some projects provide an SQL-like interface for MongoDB.

from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer

# Install MongoDB dialect first: pip install sqlalchemy-mongo
engine = create_engine('mongodb://localhost:27017/mydatabase')
metadata = MetaData()

# Define a table-like structure
users = Table(
    'users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('age', Integer)
)

# Insert and query data
with engine.connect() as conn:
    conn.execute(users.insert().values(id=1, name='Alice', age=30))
    result = conn.execute(users.select())
    for row in result:
        print(row)

ClickHouse

ClickHouse is a fast columnar database for analytical queries.

from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String

# Install ClickHouse dialect first: pip install clickhouse-sqlalchemy
engine = create_engine('clickhouse://default:@localhost/test')
metadata = MetaData()

# Define a table
users = Table(
    'users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('age', Integer)
)

# Create the table in ClickHouse
metadata.create_all(engine)

# Insert and query data
with engine.connect() as conn:
    conn.execute(users.insert().values(id=1, name='Alice', age=30))
    results = conn.execute(users.select())
    for row in results:
        print(row)

Google BigQuery

BigQuery works with SQLAlchemy via the pybigquery dialect.

from sqlalchemy import create_engine

# Install BigQuery dialect: pip install pybigquery
engine = create_engine('bigquery://project_id/dataset')

# Query BigQuery
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM my_table LIMIT 10")
    for row in result:
        print(row)

Key Considerations for Non-Common Data Engines

  1. Install the Correct Dialect:
    Ensure the dialect matches your database engine. For some niche engines, the dialect might not be actively maintained, so check compatibility with SQLAlchemy’s current version.
  2. Understand Database-Specific Features:
    Some databases (e.g., Snowflake, ClickHouse) have unique capabilities like columnar storage or serverless queries. SQLAlchemy’s Core can often handle these features better than ORM.
  3. Performance Tuning:
    For large-scale or analytical databases (e.g., Snowflake, BigQuery), batch inserts and asynchronous connections (using libraries like asyncio) can improve performance.
  4. Custom SQL:
    For specialized databases, you might need to write raw SQL queries when SQLAlchemy abstractions fall short. Use SQLAlchemy’s text() for this purpose.

Fallback Raw SQL Examples in SQLAlchemy

When working with non-common data engines, certain scenarios may require raw SQL queries to leverage database-specific features or handle operations that SQLAlchemy’s abstractions don’t natively support. SQLAlchemy allows you to execute raw SQL through the text() function, which provides full flexibility to write and execute custom queries.

Here are two examples of using raw SQL with SQLAlchemy:

Example 1: Custom Query for Analytical Databases (e.g., Snowflake, BigQuery)

In analytical databases, you might need to use specialized SQL features, such as window functions or partitioning, that are not straightforward to implement with SQLAlchemy’s ORM or Core.

from sqlalchemy import create_engine, text

# Connect to the database
engine = create_engine(
    'snowflake://{user}:{password}@{account}/{database}/{schema}'.format(
        user='YOUR_USER',
        password='YOUR_PASSWORD',
        account='YOUR_ACCOUNT',
        database='YOUR_DATABASE',
        schema='YOUR_SCHEMA'
    )
)

# Write and execute a custom SQL query
query = text("""
    SELECT 
        department, 
        COUNT(employee_id) AS total_employees,
        AVG(salary) AS avg_salary
    FROM employees
    WHERE hire_date >= :start_date
    GROUP BY department
    ORDER BY avg_salary DESC
""")

# Execute the query and pass parameters
with engine.connect() as conn:
    result = conn.execute(query, {"start_date": "2022-01-01"})
    for row in result:
        print(row)

Example 2: Bulk Data Insertion Using text()

For databases optimized for high-volume data processing, such as ClickHouse or Snowflake, you might want to perform bulk inserts directly using raw SQL.

from sqlalchemy import create_engine, text

# Connect to the database
engine = create_engine('clickhouse://default:@localhost/test')

# Define raw SQL for bulk insertion
bulk_insert_sql = text("""
    INSERT INTO users (id, name, age)
    VALUES 
        (:id1, :name1, :age1),
        (:id2, :name2, :age2),
        (:id3, :name3, :age3)
""")

# Data to be inserted
data = {
    "id1": 1, "name1": "Alice", "age1": 30,
    "id2": 2, "name2": "Bob", "age2": 25,
    "id3": 3, "name3": "Charlie", "age3": 35
}

# Execute the raw SQL
with engine.connect() as conn:
    conn.execute(bulk_insert_sql, data)

# Verify the insertion
select_query = text("SELECT * FROM users")
with engine.connect() as conn:
    result = conn.execute(select_query)
    for row in result:
        print(row)

Using text() with raw SQL gives you complete control when SQLAlchemy abstractions are not enough, while still benefiting from connection pooling and parameterized queries for security and performance.. Of note raw queries can utilize native features of the data engines--for example--time travel in Snowflake

Conclusion

As we've explored, SQLAlchemy is a versatile tool that offers a Pythonic approach to managing databases, providing both ORM and Core interfaces for different use cases. While its abstractions make working with relational databases cleaner and more intuitive, certain scenarios, especially when working with less common data engines like Snowflake, ClickHouse, or BigQuery, may still require fallback raw SQL queries to unlock engine-specific features.

Use Cases May Vary:

  • If your goal is to avoid database-specific errors and ensure portability, SQLAlchemy's ORM and Core provide excellent abstraction layers, making it easier to share and maintain code.
  • On the other hand, when you want to lock down pipeline logic and leverage the full power of a specific database's unique features (like Snowflake's time travel or BigQuery's partitioned tables), raw SQL combined with SQLAlchemy's text() function offers the flexibility you need.

A Primer for Experimentation:
If you’re new to SQLAlchemy, think of this article as a primer to help you get started. Dive into the ORM to handle objects and relationships intuitively, experiment with Core for raw control, and don't hesitate to mix in text() queries when needed. The flexibility of SQLAlchemy allows you to balance simplicity with power, scaling up as your needs evolve.

Ultimately, SQLAlchemy is a foundation for building robust, maintainable pipelines in Python. Its ability to combine high-level abstractions with low-level control makes it a critical tool for any data engineer or developer. Whether you're optimizing for performance or building cross-database solutions, SQLAlchemy’s features and flexibility ensure you're well-equipped for the task.

So, keep experimenting, refining your workflows, and finding new ways to streamline your database interactions. The journey of mastering SQLAlchemy is as rewarding as the pipelines it powers. Hey everybody! Keep learning!