SQL Alchemy for pythonic pipelines
Reading Time:
Reading Time:
I've primarily been developing code, benchmarks, and data tables in Python, as platforms like Snowflake often present quirks and limitations in data processing that make it challenging to produce the desired deliverables. However, with the recent addition of a new Data Director, I've been focusing more on writing SQL queries rather than relying on my usual NumPy and Pandas stack. This shift was motivated by the desire to make the processing code easier to share and understand. Given the complexity of a major project I'm working on, which involves the details I'm outlining here, I feel the need to revisit SQLAlchemy from a beginner's perspective.
SQL Alchemy is a Python library that interacts with relational databases using a high-level Pythonic approach. It provides two main components:
SQL Alchemy is one of the most widely used database libraries in Python because of its flexibility, performance, and support for multiple database systems (e.g., SQLite, PostgreSQL, MySQL, Oracle).
Feature | SQLAlchemy | Straight SQL |
---|---|---|
Abstraction | Provides Pythonic abstraction over SQL, especially with the ORM. | Requires writing raw SQL queries for all interactions. |
Database Agnosticism | Supports multiple database engines without changing code structure. | SQL queries are typically specific to a database engine. |
Ease of Relationships | Relationships between tables are handled with Python objects and defined declaratively. | Relationships must be handled explicitly in SQL. |
Code Readability | Queries are more readable and integrated into Python's syntax. | SQL queries are separate and less Pythonic. |
Complex Query Building | Queries can be built dynamically using Python. | Queries must be written explicitly in SQL. |
Security | Automatically escapes parameters to prevent SQL injection. | Requires manual handling to prevent SQL injection. |
Performance Tuning | Supports features like connection pooling and lazy loading. | Performance tuning requires manual effort and expertise. |
import sqlite3
connection = sqlite3.connect('example.db')
cursor = connection.cursor()
# Execute a raw SQL query
cursor.execute("SELECT * FROM users WHERE age > ?", (25,))
results = cursor.fetchall()
for row in results:
print(row)
connection.close()
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.orm import sessionmaker
# Create database engine and session
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()
# ORM Example
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
# Query using ORM
results = session.query(User).filter(User.age > 25).all()
for user in results:
print(user.name, user.age)
# Alternatively, using Core
metadata = MetaData(bind=engine)
users_table = Table('users', metadata, autoload_with=engine)
query = users_table.select().where(users_table.c.age > 25)
connection = engine.connect()
results = connection.execute(query)
for row in results:
print(row)
connection = sqlite3.connect('example.db')
cursor = connection.cursor()
# Insert a record
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 30))
connection.commit()
connection.close()
# ORM Example
new_user = User(name='Alice', age=30)
session.add(new_user)
session.commit()
# Alternatively, using Core
insert_query = users_table.insert().values(name='Alice', age=30)
connection = engine.connect()
connection.execute(insert_query)
-- Example SQL
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders (id INTEGER PRIMARY KEY, user_id INTEGER, description TEXT,
FOREIGN KEY(user_id) REFERENCES users(id));
SELECT users.name, orders.description
FROM users
JOIN orders ON users.id = orders.user_id;
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
description = Column(String)
user = relationship("User", back_populates="orders")
User.orders = relationship("Order", order_by=Order.id, back_populates="user")
# Querying
results = session.query(User).join(User.orders).filter(Order.description == 'Some order').all()
for user in results:
print(user.name)
Use SQLAlchemy:
Use Straight SQL:
SQLAlchemy provides a balance between flexibility and abstraction, making it an excellent choice for most Python projects that involve databases.
SQLAlchemy provides two distinct ways to interact with a database: ORM (Object Relational Mapper) using classes and Core using tables. Both approaches are powerful and suited to different use cases. Here’s an explanation of the differences between declaring classes in SQLAlchemy ORM and using the Core approach, with examples to clarify.
When using SQLAlchemy ORM, you define database tables as Python classes. These classes are mapped to database tables whose attributes represent the table’s columns.
declarative_base()
class or the Declarative Base
system.ForeignKey
or relationship
.from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# Declare a base class
Base = declarative_base()
# Define User and Order classes (tables)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
# Relationship to orders
orders = relationship('Order', back_populates='user')
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
description = Column(String)
user = relationship('User', back_populates='orders')
# Create tables and add data
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
new_user = User(name='Alice', orders=[Order(description='Order 1')])
session.add(new_user)
session.commit()
# Querying
user = session.query(User).filter_by(name='Alice').first()
print(user.orders[0].description) # Output: Order 1
In SQLAlchemy Core, you define tables explicitly using the Table
class. Instead of objects, you work with raw SQL expressions and the table objects directly.
Table
class.from sqlalchemy import Table, Column, Integer, String, ForeignKey, MetaData, create_engine
# Define tables
metadata = MetaData()
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String)
)
orders = Table(
'orders', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.id')),
Column('description', String)
)
# Create tables and add data
engine = create_engine('sqlite:///example.db')
metadata.create_all(engine)
# Insert data
with engine.connect() as conn:
conn.execute(users.insert().values(name='Alice'))
conn.execute(orders.insert().values(user_id=1, description='Order 1'))
# Querying
with engine.connect() as conn:
result = conn.execute(users.select().where(users.c.name == 'Alice'))
for row in result:
print(row) # Output: (1, 'Alice')
Aspect | Declaring Classes (ORM) | Core (Table-Based) |
---|---|---|
Representation | Python classes represent database tables. | Explicit Table objects represent database tables. |
Data Access | Returns objects with attributes (e.g., user.name ). | Returns rows as dictionaries or tuples (e.g., row['name'] ). |
Relationships | Handled via relationship() and ForeignKey . | Handled explicitly via joins in SQL. |
Query Syntax | High-level, object-oriented queries (e.g., session.query(User).filter(...) ). | Low-level, SQL-like queries (e.g., users.select().where(...) ). |
Flexibility | Abstracts SQL; better for simpler or object-heavy use cases. | Offers fine-grained control for custom or complex queries. |
Performance | Slightly slower due to ORM overhead. | Faster due to the absence of an ORM layer. |
Learning Curve | Easier for Python developers, but adds abstraction. | Requires more knowledge of SQL and the Core API. |
Use Case | Complex applications with many relationships. | Simple scripts or scenarios requiring precise SQL control. |
Use ORM (Classes):
Use Core:
You can also mix the two approaches in SQLAlchemy. For example, you might define tables using Core but use ORM for parts of the application.
# Define a table using Core
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String)
)
# Map the table to an ORM class
from sqlalchemy.orm import mapper
class User:
pass
mapper(User, users)
# Now you can use the ORM with the mapped class
session.add(User(name='Alice'))
Using less common data engines like Snowflake or other database systems with SQLAlchemy requires leveraging its database-agnostic architecture. SQLAlchemy supports a wide range of database engines through dialects, which are plugins that enable communication with specific databases. Snowflake, for instance, has a dedicated SQLAlchemy dialect.
Here’s how you can use SQLAlchemy with non-common databases like Snowflake or others, including any nuances or key considerations.
To connect SQLAlchemy to Snowflake, you need the Snowflake SQLAlchemy dialect. Install the required libraries:
pip install snowflake-sqlalchemy
Here’s how you can connect SQLAlchemy to a Snowflake database:
from sqlalchemy import create_engine
# Snowflake connection string
engine = create_engine(
'snowflake://{user}:{password}@{account}/{database}/{schema}'.format(
user='YOUR_USER',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT',
database='YOUR_DATABASE',
schema='YOUR_SCHEMA'
)
)
# Test connection
connection = engine.connect()
result = connection.execute("SELECT CURRENT_VERSION()")
for row in result:
print(row)
connection.close()
Using ORM with Snowflake
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
# Define a table using ORM
class Employee(Base):
__tablename__ = 'employees'
id = Column(Integer, primary_key=True)
name = Column(String)
salary = Column(Integer)
# Bind engine and create tables
Base.metadata.create_all(engine)
# Insert data
Session = sessionmaker(bind=engine)
session = Session()
new_employee = Employee(name="Alice", salary=100000)
session.add(new_employee)
session.commit()
# Query data
employees = session.query(Employee).all()
for employee in employees:
print(employee.name, employee.salary)
Using Core with Snowflake
from sqlalchemy import Table, MetaData, Column, Integer, String
metadata = MetaData()
# Define the table
employees = Table(
'employees', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('salary', Integer)
)
# Create the table in Snowflake
metadata.create_all(engine)
# Insert data
with engine.connect() as conn:
conn.execute(employees.insert().values(name='Alice', salary=100000))
# Query data
with engine.connect() as conn:
results = conn.execute(employees.select())
for row in results:
print(row)
For other databases, SQLAlchemy relies on a specific dialect. Here's how to adapt to various data engines:
Install the Dialect for the Database:
Search for a SQLAlchemy-compatible dialect for your database. Many major databases have official dialects, while others have community-maintained ones.
Examples:
pip install sqlalchemy-mongo
pip install clickhouse-sqlalchemy
pip install pybigquery
create_engine()
syntax and define tables or ORM classes as needed.While MongoDB is a NoSQL database, some projects provide an SQL-like interface for MongoDB.
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer
# Install MongoDB dialect first: pip install sqlalchemy-mongo
engine = create_engine('mongodb://localhost:27017/mydatabase')
metadata = MetaData()
# Define a table-like structure
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('age', Integer)
)
# Insert and query data
with engine.connect() as conn:
conn.execute(users.insert().values(id=1, name='Alice', age=30))
result = conn.execute(users.select())
for row in result:
print(row)
ClickHouse is a fast columnar database for analytical queries.
from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String
# Install ClickHouse dialect first: pip install clickhouse-sqlalchemy
engine = create_engine('clickhouse://default:@localhost/test')
metadata = MetaData()
# Define a table
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('age', Integer)
)
# Create the table in ClickHouse
metadata.create_all(engine)
# Insert and query data
with engine.connect() as conn:
conn.execute(users.insert().values(id=1, name='Alice', age=30))
results = conn.execute(users.select())
for row in results:
print(row)
BigQuery works with SQLAlchemy via the pybigquery
dialect.
from sqlalchemy import create_engine
# Install BigQuery dialect: pip install pybigquery
engine = create_engine('bigquery://project_id/dataset')
# Query BigQuery
with engine.connect() as conn:
result = conn.execute("SELECT * FROM my_table LIMIT 10")
for row in result:
print(row)
asyncio
) can improve performance.text()
for this purpose.When working with non-common data engines, certain scenarios may require raw SQL queries to leverage database-specific features or handle operations that SQLAlchemy’s abstractions don’t natively support. SQLAlchemy allows you to execute raw SQL through the text()
function, which provides full flexibility to write and execute custom queries.
Here are two examples of using raw SQL with SQLAlchemy:
In analytical databases, you might need to use specialized SQL features, such as window functions or partitioning, that are not straightforward to implement with SQLAlchemy’s ORM or Core.
from sqlalchemy import create_engine, text
# Connect to the database
engine = create_engine(
'snowflake://{user}:{password}@{account}/{database}/{schema}'.format(
user='YOUR_USER',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT',
database='YOUR_DATABASE',
schema='YOUR_SCHEMA'
)
)
# Write and execute a custom SQL query
query = text("""
SELECT
department,
COUNT(employee_id) AS total_employees,
AVG(salary) AS avg_salary
FROM employees
WHERE hire_date >= :start_date
GROUP BY department
ORDER BY avg_salary DESC
""")
# Execute the query and pass parameters
with engine.connect() as conn:
result = conn.execute(query, {"start_date": "2022-01-01"})
for row in result:
print(row)
text()
For databases optimized for high-volume data processing, such as ClickHouse or Snowflake, you might want to perform bulk inserts directly using raw SQL.
from sqlalchemy import create_engine, text
# Connect to the database
engine = create_engine('clickhouse://default:@localhost/test')
# Define raw SQL for bulk insertion
bulk_insert_sql = text("""
INSERT INTO users (id, name, age)
VALUES
(:id1, :name1, :age1),
(:id2, :name2, :age2),
(:id3, :name3, :age3)
""")
# Data to be inserted
data = {
"id1": 1, "name1": "Alice", "age1": 30,
"id2": 2, "name2": "Bob", "age2": 25,
"id3": 3, "name3": "Charlie", "age3": 35
}
# Execute the raw SQL
with engine.connect() as conn:
conn.execute(bulk_insert_sql, data)
# Verify the insertion
select_query = text("SELECT * FROM users")
with engine.connect() as conn:
result = conn.execute(select_query)
for row in result:
print(row)
Using text()
with raw SQL gives you complete control when SQLAlchemy abstractions are not enough, while still benefiting from connection pooling and parameterized queries for security and performance.. Of note raw queries can utilize native features of the data engines--for example--time travel in Snowflake
As we've explored, SQLAlchemy is a versatile tool that offers a Pythonic approach to managing databases, providing both ORM and Core interfaces for different use cases. While its abstractions make working with relational databases cleaner and more intuitive, certain scenarios, especially when working with less common data engines like Snowflake, ClickHouse, or BigQuery, may still require fallback raw SQL queries to unlock engine-specific features.
Use Cases May Vary:
text()
function offers the flexibility you need.A Primer for Experimentation:
If you’re new to SQLAlchemy, think of this article as a primer to help you get started. Dive into the ORM to handle objects and relationships intuitively, experiment with Core for raw control, and don't hesitate to mix in text()
queries when needed. The flexibility of SQLAlchemy allows you to balance simplicity with power, scaling up as your needs evolve.
Ultimately, SQLAlchemy is a foundation for building robust, maintainable pipelines in Python. Its ability to combine high-level abstractions with low-level control makes it a critical tool for any data engineer or developer. Whether you're optimizing for performance or building cross-database solutions, SQLAlchemy’s features and flexibility ensure you're well-equipped for the task.
So, keep experimenting, refining your workflows, and finding new ways to streamline your database interactions. The journey of mastering SQLAlchemy is as rewarding as the pipelines it powers. Hey everybody! Keep learning!