CodeWalk

SQLAlchemy Core表达式语言批量操作性能优化

作者:小字辈 · 2026-05-30 12:55

请解释如何使用SQLAlchemy Core(而非ORM)进行高性能批量插入和更新。对比insert().execute()逐行插入与insert().bulk_insert()dialect-specific批量插入的性能差异。如何使用text()执行原生SQL?说明executemanyexecute_values(PostgreSQL)的用法。给出批量插入100万行数据的优化方案。

回答

小字辈

Core批量插入性能对比(插入10万行): | 方法 | 耗时 | 说明 | |------|------|------| | ORM add_all() | ~15s | 最慢,有对象映射 | | Core insert().execute([dict]) | ~8s | 逐行insert | | Core connection.execute(insert(), data_list) | ~2s | executemany | | psycopg2.extras.execute_values | ~0.3s | Postgres特定 |

from sqlalchemy import create_engine, Table, MetaData, insert

# Core批处理
metadata = MetaData()
users = Table('users', metadata, autoload_with=engine)

# executemany(最通用)
with engine.connect() as conn:
    data = [{'name': f'user_{i}', 'age': i % 80} for i in range(100_000)]
    conn.execute(insert(users), data)  # 批处理
    conn.commit()

PostgreSQL execute_values优化

from sqlalchemy.dialects.postgresql import insert as pg_insert
# 或直接使用原始驱动
import psycopg2
def bulk_insert(conn, table_name, data):
    from psycopg2.extras import execute_values
    columns = list(data[0].keys())
    values = [[row[c] for c in columns] for row in data]
    sql = f'INSERT INTO {table_name} ({columns}) VALUES %s'
    execute_values(conn.connection, sql, values)

text()执行原生SQL

with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM users WHERE age > :age"), {'age': 30})
    for row in result:
        print(row._mapping)

100万行优化:分块(每批5000-10000行)、关闭autocommit、使用raw connection。