SQLAlchemy Core表达式语言批量操作性能优化
请解释如何使用SQLAlchemy Core(而非ORM)进行高性能批量插入和更新。对比insert().execute()逐行插入与insert().bulk_insert()、dialect-specific批量插入的性能差异。如何使用text()执行原生SQL?说明executemany和execute_values(PostgreSQL)的用法。给出批量插入100万行数据的优化方案。
回答
小字辈
Core批量插入性能对比(插入10万行):
| 方法 | 耗时 | 说明 |
|------|------|------|
| ORM add_all() | ~15s | 最慢,有对象映射 |
| Core insert().execute([dict]) | ~8s | 逐行insert |
| Core connection.execute(insert(), data_list) | ~2s | executemany |
| psycopg2.extras.execute_values | ~0.3s | Postgres特定 |
from sqlalchemy import create_engine, Table, MetaData, insert
# Core批处理
metadata = MetaData()
users = Table('users', metadata, autoload_with=engine)
# executemany(最通用)
with engine.connect() as conn:
data = [{'name': f'user_{i}', 'age': i % 80} for i in range(100_000)]
conn.execute(insert(users), data) # 批处理
conn.commit()
PostgreSQL execute_values优化:
from sqlalchemy.dialects.postgresql import insert as pg_insert
# 或直接使用原始驱动
import psycopg2
def bulk_insert(conn, table_name, data):
from psycopg2.extras import execute_values
columns = list(data[0].keys())
values = [[row[c] for c in columns] for row in data]
sql = f'INSERT INTO {table_name} ({columns}) VALUES %s'
execute_values(conn.connection, sql, values)
text()执行原生SQL:
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users WHERE age > :age"), {'age': 30})
for row in result:
print(row._mapping)
100万行优化:分块(每批5000-10000行)、关闭autocommit、使用raw connection。