Prisma的expand-contract模式:零停机数据库schema演进策略
在现代微服务架构中,数据库schema变更往往是最具挑战性的运维任务之一。传统的一次性DDL操作不仅会导致服务中断,还可能造成数据不一致和回滚困难。本文将深入探讨如何借助Prisma的迁移能力,结合expand-contract模式,实现零停机的数据库schema演进。
传统迁移模式的痛点
传统的数据库迁移通常采用"直接替换"的方式:停机→应用DDL→重启服务。这种方式在单体应用中可能还能接受,但在微服务架构和24/7运行的生产环境中,任何服务中断都可能导致:
- 用户体验中断
- 业务损失
- 数据不一致风险
- 复杂的回滚流程
这就需要一种更优雅的迁移策略:expand-contract模式。
Expand-Contract模式核心原理
expand-contract模式将schema变更分为两个明确的阶段:
1. Expand阶段(扩展阶段)
- 只进行加法操作:创建新表、新列、新索引
- 保持向后兼容:旧应用继续正常运行
- 数据同步:建立触发器或后台任务同步数据
- 功能验证:确保新结构正常工作
2. Contract阶段(收缩阶段)
- 进行减法操作:删除旧表、旧列、旧索引
- 确认安全:验证所有应用已升级
- 清理操作:移除临时结构和旧数据
这种模式确保了在任何时候至少有一个版本的schema可以正常工作,从而实现零停机迁移。
Prisma中的expand-contract实现策略
虽然Prisma本身不直接提供expand-contract模式,但我们可以巧妙利用其迁移系统来实现这一策略。
基础环境设置
首先,确保你的Prisma项目配置正确:
npm install prisma @prisma/client --save
npx prisma init
echo "DATABASE_URL=\"postgresql://user:password@localhost:5432/mydb\"" > .env
场景1:列重命名迁移
假设我们需要将users表中的username列重命名为user_name:
Expand阶段
步骤1:创建新的临时列
// schema.prisma
model User {
id String @id @default(cuid())
username String? // 保留旧列,暂时设为可选
user_name String? // 新列名称
email String @unique
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
步骤2:生成迁移文件
npx prisma migrate dev --name add_user_name_column
步骤3:创建数据同步脚本
import { PrismaClient } from '@prisma/client'
const prisma = new PrismaClient()
async function syncUserNames() {
console.log('开始同步用户名称数据...')
const batchSize = 1000
let skip = 0
while (true) {
const users = await prisma.user.findMany({
skip,
take: batchSize,
where: {
username: { not: null },
user_name: null
}
})
if (users.length === 0) break
await Promise.all(
users.map(user =>
prisma.user.update({
where: { id: user.id },
data: { user_name: user.username }
})
)
)
skip += batchSize
console.log(`已处理 ${skip} 条记录`)
}
console.log('用户名称数据同步完成')
}
syncUserNames().catch(console.error)
Contract阶段
步骤1:更新应用代码
class UserService {
async createUser(data: { email: string; username: string }) {
return await prisma.user.create({
data: {
email: data.email,
user_name: data.username
}
})
}
async getUserByName(name: string) {
return await prisma.user.findFirst({
where: {
OR: [
{ user_name: name },
{ username: name }
]
}
})
}
}
步骤2:清理旧列
// schema.prisma
model User {
id String @id @default(cuid())
user_name String? // 现在这是主要字段
email String @unique
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
步骤3:生成最终的迁移
npx prisma migrate dev --name remove_old_username_column
场景2:表结构重构
对于更复杂的情况,比如将orders表的customer_info字段拆分为独立表:
Expand阶段
步骤1:创建新表
// schema.prisma
model Order {
id String @id @default(cuid())
customer_info Json? // 暂时保留旧字段
customerId String? // 新外键
totalAmount Decimal @db.Decimal(10, 2)
status String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关系
customer Customer? @relation(fields: [customerId], references: [id])
}
model Customer {
id String @id @default(cuid())
email String @unique
firstName String
lastName String
phone String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关系
orders Order[]
}
步骤2:数据迁移脚本
import { PrismaClient } from '@prisma/client'
const prisma = new PrismaClient()
async function migrateCustomerData() {
console.log('开始迁移客户数据...')
const orders = await prisma.order.findMany({
where: {
customer_info: { not: null },
customerId: null
},
select: { id: true, customer_info: true }
})
console.log(`找到 ${orders.length} 条需要迁移的订单`)
for (const order of orders) {
try {
const customerInfo = order.customer_info as any
let customer = await prisma.customer.findUnique({
where: { email: customerInfo.email }
})
if (!customer) {
customer = await prisma.customer.create({
data: {
email: customerInfo.email,
firstName: customerInfo.firstName || '',
lastName: customerInfo.lastName || '',
phone: customerInfo.phone || null
}
})
}
await prisma.order.update({
where: { id: order.id },
data: {
customerId: customer.id,
customer_info: null
}
})
console.log(`已迁移订单 ${order.id}`)
} catch (error) {
console.error(`迁移订单 ${order.id} 失败:`, error)
}
}
console.log('客户数据迁移完成')
}
migrateCustomerData().catch(console.error)
向前兼容性设计
1. 数据库层兼容性
使用默认值和可空字段
model User {
id String @id @default(cuid())
// 新字段提供默认值,确保现有记录正常
profileStatus String @default("pending")
// 重要字段逐步迁移
legacyField String? // 标记为可空,逐步移除
}
保持外键关系
model Post {
id String @id @default(cuid())
userId String
// 临时保持外键可选,逐步变为必需
user User? @relation(fields: [userId], references: [id])
}
2. 应用层兼容性
渐进式字段替换
class UserValidator {
validateEmail(email: string): boolean {
return this.isValidEmailFormat(email)
}
static validateUserInput(data: any): { valid: boolean; errors: string[] } {
const errors: string[] = []
if (data.email && !this.isValidEmailFormat(data.email)) {
errors.push('Invalid email format')
}
return { valid: errors.length === 0, errors }
}
private static isValidEmailFormat(email: string): boolean {
return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email)
}
}
API版本控制
app.use('/api/v1', v1Router)
app.use('/api/v2', v2Router)
const v2Router = Router()
v2Router.post('/users', async (req, res) => {
const userData = {
email: req.body.emailAddress,
firstName: req.body.first_name,
lastName: req.body.last_name
}
const user = await userService.createUser(userData)
res.json(user)
})
版本过渡策略
1. 蓝绿部署模式
数据库层
CREATE TABLE users_v2 (
id UUID PRIMARY KEY,
email VARCHAR(255) UNIQUE,
full_name VARCHAR(255),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users_v2 (id, email, full_name, created_at)
SELECT id, email, CONCAT(first_name, ' ', last_name), created_at
FROM users;
应用层
const config = {
database: {
primary: process.env.PRIMARY_DB_URL,
secondary: process.env.SECONDARY_DB_URL
},
version: process.env.APP_VERSION
}
class DatabaseRouter {
route(query: any) {
if (query.requiresNewSchema()) {
return config.database.secondary
}
return config.database.primary
}
}
2. 渐进式字段切换
class SmartFieldManager {
private newFieldThreshold = 0.1
async readUserField(userId: string, fieldName: string) {
const userCount = await prisma.user.count()
const newFieldCount = await prisma.user.count({
where: { [fieldName]: { not: null } }
})
const switchThreshold = newFieldCount / userCount
if (switchThreshold >= this.newFieldThreshold) {
return await this.readNewField(userId, fieldName)
} else {
return await this.readOldField(userId, fieldName)
}
}
private async readNewField(userId: string, fieldName: string) {
}
private async readOldField(userId: string, fieldName: string) {
}
}
回滚机制设计
1. 渐进式回滚
class MigrationRollback {
private migrationHistory: MigrationRecord[] = []
async performRollback(targetVersion: string) {
console.log(`开始回滚到版本: ${targetVersion}`)
const targetMigration = this.migrationHistory.find(
m => m.version === targetVersion
)
if (!targetMigration) {
throw new Error(`未找到目标版本: ${targetVersion}`)
}
for (let i = this.migrationHistory.length - 1; i >= 0; i--) {
const migration = this.migrationHistory[i]
if (migration.version === targetVersion) break
await this.rollbackSingleMigration(migration)
console.log(`已回滚版本: ${migration.version}`)
}
}
private async rollbackSingleMigration(migration: MigrationRecord) {
switch (migration.type) {
case 'add_column':
await this.rollbackAddColumn(migration)
break
case 'create_table':
await this.rollbackCreateTable(migration)
break
case 'data_migration':
await this.rollbackDataMigration(migration)
break
}
}
private async rollbackAddColumn(migration: MigrationRecord) {
const { table, column } = migration.details
await prisma.$executeRaw`
ALTER TABLE ${Prisma.raw(table)}
DROP COLUMN ${Prisma.raw(column)}
`
}
}
2. 数据备份策略
class MigrationBackup {
async createBackup(migrationName: string) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
const backupName = `${migrationName}_${timestamp}`
console.log(`创建备份: ${backupName}`)
await prisma.$executeRaw`
CREATE TABLE users_backup_${backupName} AS
SELECT * FROM users
`
await this.recordBackup({
name: backupName,
originalTable: 'users',
timestamp: new Date(),
migrationName
})
}
async restoreFromBackup(backupName: string) {
console.log(`从备份恢复: ${backupName}`)
const backupTable = `users_backup_${backupName}`
const tableExists = await this.checkTableExists(backupTable)
if (!tableExists) {
throw new Error(`备份表不存在: ${backupTable}`)
}
await prisma.$transaction([
prisma.$executeRaw`TRUNCATE TABLE users`,
prisma.$executeRaw`
INSERT INTO users
SELECT * FROM ${Prisma.raw(backupTable)}
`
])
console.log(`恢复完成`)
}
private async checkTableExists(tableName: string): Promise<boolean> {
const result = await prisma.$queryRaw`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = ${tableName}
) as exists
`
return result[0].exists
}
}
性能优化和监控
1. 迁移性能优化
class OptimizedMigration {
async batchProcess<T>(
items: T[],
processor: (batch: T[]) => Promise<void>,
batchSize: number = 1000
) {
const batches = this.chunkArray(items, batchSize)
for (const [index, batch] of batches.entries()) {
const startTime = Date.now()
try {
await processor(batch)
const duration = Date.now() - startTime
console.log(`批次 ${index + 1}/${batches.length} 完成,耗时: ${duration}ms`)
if (duration < 1000) {
await this.sleep(100)
}
} catch (error) {
console.error(`批次 ${index + 1} 处理失败:`, error)
throw error
}
}
}
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = []
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size))
}
return chunks
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
2. 迁移监控
class MigrationMonitor {
private metrics: MigrationMetrics = {
startTime: null,
endTime: null,
recordsProcessed: 0,
errors: [],
checkpoints: []
}
async startMonitoring(migrationName: string) {
this.metrics.startTime = new Date()
console.log(`开始监控迁移: ${migrationName}`)
setInterval(() => {
this.recordCheckpoint()
}, 30000)
}
recordCheckpoint() {
this.metrics.checkpoints.push({
timestamp: new Date(),
memoryUsage: process.memoryUsage(),
connections: this.getActiveConnections(),
recordsProcessed: this.metrics.recordsProcessed
})
}
getMigrationStatus(): MigrationStatus {
const duration = this.metrics.endTime
? this.metrics.endTime.getTime() - this.metrics.startTime!.getTime()
: Date.now() - this.metrics.startTime!.getTime()
return {
duration,
recordsProcessed: this.metrics.recordsProcessed,
errorCount: this.metrics.errors.length,
checkpoints: this.metrics.checkpoints.length,
isRunning: !this.metrics.endTime
}
}
}
最佳实践总结
1. 迁移前准备
- 完整备份:确保数据可恢复
- 环境隔离:在生产环境执行前充分测试
- 回滚计划:制定详细的回滚策略
- 监控准备:设置性能监控和告警
2. 迁移执行原则
- 渐进式变更:避免大规模同时修改
- 向前兼容:确保旧版本应用可继续运行
- 性能监控:实时监控数据库性能
- 数据验证:定期验证数据一致性
3. 迁移后处理
- 清理工作:移除临时结构和代码
- 性能优化:重新分析查询计划
- 文档更新:更新相关文档
- 团队沟通:通知相关团队变更完成
结论
通过合理运用expand-contract模式和Prisma强大的迁移能力,我们可以实现几乎零停机的数据库schema演进。关键在于:
- 分阶段执行:严格遵循expand-contract的阶段性原则
- 向前兼容:确保新旧版本应用可以共存
- 数据安全:建立完善的备份和回滚机制
- 持续监控:实时监控迁移过程和系统性能
这种模式虽然增加了迁移的复杂性,但大大降低了生产环境的风险,为现代微服务架构提供了更安全、更可靠的数据库演进方案。
参考资料