Hotdry.
ai-engineering

Prisma的expand-contract模式:零停机数据库schema演进策略

深入解析Prisma的expand-contract模式,提供零停机数据库schema演进策略与具体实施细节,包括向前兼容设计、版本过渡与回滚机制。

Prisma 的 expand-contract 模式:零停机数据库 schema 演进策略

在现代微服务架构中,数据库 schema 变更往往是最具挑战性的运维任务之一。传统的一次性 DDL 操作不仅会导致服务中断,还可能造成数据不一致和回滚困难。本文将深入探讨如何借助 Prisma 的迁移能力,结合 expand-contract 模式,实现零停机的数据库 schema 演进。

传统迁移模式的痛点

传统的数据库迁移通常采用 "直接替换" 的方式:停机→应用 DDL→重启服务。这种方式在单体应用中可能还能接受,但在微服务架构和 24/7 运行的生产环境中,任何服务中断都可能导致:

  • 用户体验中断
  • 业务损失
  • 数据不一致风险
  • 复杂的回滚流程

这就需要一种更优雅的迁移策略:expand-contract 模式

Expand-Contract 模式核心原理

expand-contract 模式将 schema 变更分为两个明确的阶段:

1. Expand 阶段(扩展阶段)

  • 只进行加法操作:创建新表、新列、新索引
  • 保持向后兼容:旧应用继续正常运行
  • 数据同步:建立触发器或后台任务同步数据
  • 功能验证:确保新结构正常工作

2. Contract 阶段(收缩阶段)

  • 进行减法操作:删除旧表、旧列、旧索引
  • 确认安全:验证所有应用已升级
  • 清理操作:移除临时结构和旧数据

这种模式确保了在任何时候至少有一个版本的 schema 可以正常工作,从而实现零停机迁移。

Prisma 中的 expand-contract 实现策略

虽然 Prisma 本身不直接提供 expand-contract 模式,但我们可以巧妙利用其迁移系统来实现这一策略。

基础环境设置

首先,确保你的 Prisma 项目配置正确:

# 安装必要依赖
npm install prisma @prisma/client --save

# 初始化Prisma
npx prisma init

# 配置环境变量
echo "DATABASE_URL=\"postgresql://user:password@localhost:5432/mydb\"" > .env

场景 1:列重命名迁移

假设我们需要将users表中的username列重命名为user_name

Expand 阶段

步骤 1:创建新的临时列

// schema.prisma
model User {
  id        String   @id @default(cuid())
  username  String?  // 保留旧列,暂时设为可选
  user_name String?  // 新列名称
  email     String   @unique
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
}

步骤 2:生成迁移文件

npx prisma migrate dev --name add_user_name_column

步骤 3:创建数据同步脚本

// scripts/sync-user-names.ts
import { PrismaClient } from '@prisma/client'

const prisma = new PrismaClient()

async function syncUserNames() {
  console.log('开始同步用户名称数据...')
  
  const batchSize = 1000
  let skip = 0
  
  while (true) {
    const users = await prisma.user.findMany({
      skip,
      take: batchSize,
      where: {
        username: { not: null },
        user_name: null
      }
    })
    
    if (users.length === 0) break
    
    // 批量更新
    await Promise.all(
      users.map(user =>
        prisma.user.update({
          where: { id: user.id },
          data: { user_name: user.username }
        })
      )
    )
    
    skip += batchSize
    console.log(`已处理 ${skip} 条记录`)
  }
  
  console.log('用户名称数据同步完成')
}

// 添加后台任务或定时执行
syncUserNames().catch(console.error)

Contract 阶段

步骤 1:更新应用代码

// 应用代码逐步迁移
class UserService {
  // 暂时同时支持新旧字段
  async createUser(data: { email: string; username: string }) {
    return await prisma.user.create({
      data: {
        email: data.email,
        user_name: data.username // 写入新字段
      }
    })
  }
  
  // 读取时优先使用新字段
  async getUserByName(name: string) {
    return await prisma.user.findFirst({
      where: {
        OR: [
          { user_name: name },
          { username: name } // 向后兼容
        ]
      }
    })
  }
}

步骤 2:清理旧列

// schema.prisma
model User {
  id        String   @id @default(cuid())
  user_name String?  // 现在这是主要字段
  email     String   @unique
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
}

步骤 3:生成最终的迁移

npx prisma migrate dev --name remove_old_username_column

场景 2:表结构重构

对于更复杂的情况,比如将orders表的customer_info字段拆分为独立表:

Expand 阶段

步骤 1:创建新表

// schema.prisma
model Order {
  id            String     @id @default(cuid())
  customer_info Json?      // 暂时保留旧字段
  customerId    String?    // 新外键
  totalAmount   Decimal    @db.Decimal(10, 2)
  status        String
  createdAt     DateTime   @default(now())
  updatedAt     DateTime   @updatedAt
  
  // 关系
  customer      Customer?  @relation(fields: [customerId], references: [id])
}

model Customer {
  id         String   @id @default(cuid())
  email      String   @unique
  firstName  String
  lastName   String
  phone      String?
  createdAt  DateTime @default(now())
  updatedAt  DateTime @updatedAt
  
  // 关系
  orders     Order[]
}

步骤 2:数据迁移脚本

// scripts/migrate-customer-data.ts
import { PrismaClient } from '@prisma/client'

const prisma = new PrismaClient()

async function migrateCustomerData() {
  console.log('开始迁移客户数据...')
  
  const orders = await prisma.order.findMany({
    where: {
      customer_info: { not: null },
      customerId: null
    },
    select: { id: true, customer_info: true }
  })
  
  console.log(`找到 ${orders.length} 条需要迁移的订单`)
  
  for (const order of orders) {
    try {
      const customerInfo = order.customer_info as any
      
      // 创建或查找客户
      let customer = await prisma.customer.findUnique({
        where: { email: customerInfo.email }
      })
      
      if (!customer) {
        customer = await prisma.customer.create({
          data: {
            email: customerInfo.email,
            firstName: customerInfo.firstName || '',
            lastName: customerInfo.lastName || '',
            phone: customerInfo.phone || null
          }
        })
      }
      
      // 更新订单
      await prisma.order.update({
        where: { id: order.id },
        data: {
          customerId: customer.id,
          customer_info: null // 清空旧字段
        }
      })
      
      console.log(`已迁移订单 ${order.id}`)
      
    } catch (error) {
      console.error(`迁移订单 ${order.id} 失败:`, error)
    }
  }
  
  console.log('客户数据迁移完成')
}

migrateCustomerData().catch(console.error)

向前兼容性设计

1. 数据库层兼容性

使用默认值和可空字段

model User {
  id           String   @id @default(cuid())
  // 新字段提供默认值,确保现有记录正常
  profileStatus String  @default("pending")
  // 重要字段逐步迁移
  legacyField   String? // 标记为可空,逐步移除
}

保持外键关系

model Post {
  id      String @id @default(cuid())
  userId  String
  // 临时保持外键可选,逐步变为必需
  user    User?  @relation(fields: [userId], references: [id])
}

2. 应用层兼容性

渐进式字段替换

class UserValidator {
  validateEmail(email: string): boolean {
    // 新版本验证逻辑
    return this.isValidEmailFormat(email)
  }
  
  // 保持旧方法以确保兼容性
  static validateUserInput(data: any): { valid: boolean; errors: string[] } {
    const errors: string[] = []
    
    // 逐步迁移验证规则
    if (data.email && !this.isValidEmailFormat(data.email)) {
      errors.push('Invalid email format')
    }
    
    return { valid: errors.length === 0, errors }
  }
  
  private static isValidEmailFormat(email: string): boolean {
    return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email)
  }
}

API 版本控制

// API路由支持多版本
app.use('/api/v1', v1Router)    // 旧版本
app.use('/api/v2', v2Router)    // 新版本

// v2路由中的数据转换
const v2Router = Router()

v2Router.post('/users', async (req, res) => {
  // 转换旧格式到新格式
  const userData = {
    email: req.body.emailAddress, // 从旧字段名转换
    firstName: req.body.first_name,
    lastName: req.body.last_name
  }
  
  const user = await userService.createUser(userData)
  res.json(user)
})

版本过渡策略

1. 蓝绿部署模式

数据库层

-- 扩展阶段:创建新结构的副本
CREATE TABLE users_v2 (
  id UUID PRIMARY KEY,
  email VARCHAR(255) UNIQUE,
  full_name VARCHAR(255), -- 新字段
  created_at TIMESTAMP DEFAULT NOW()
);

-- 同步数据
INSERT INTO users_v2 (id, email, full_name, created_at)
SELECT id, email, CONCAT(first_name, ' ', last_name), created_at 
FROM users;

-- 应用层部署:绿环境使用新表

应用层

// 环境配置
const config = {
  database: {
    primary: process.env.PRIMARY_DB_URL,
    secondary: process.env.SECONDARY_DB_URL
  },
  version: process.env.APP_VERSION
}

class DatabaseRouter {
  route(query: any) {
    if (query.requiresNewSchema()) {
      return config.database.secondary // 新版本
    }
    return config.database.primary     // 兼容旧版本
  }
}

2. 渐进式字段切换

class SmartFieldManager {
  private newFieldThreshold = 0.1 // 10%新数据时切换
  
  async readUserField(userId: string, fieldName: string) {
    // 检查新字段数据比例
    const userCount = await prisma.user.count()
    const newFieldCount = await prisma.user.count({
      where: { [fieldName]: { not: null } }
    })
    
    const switchThreshold = newFieldCount / userCount
    
    if (switchThreshold >= this.newFieldThreshold) {
      // 切换到新字段
      return await this.readNewField(userId, fieldName)
    } else {
      // 使用旧字段
      return await this.readOldField(userId, fieldName)
    }
  }
  
  private async readNewField(userId: string, fieldName: string) {
    // 读取新结构的实现
  }
  
  private async readOldField(userId: string, fieldName: string) {
    // 读取旧结构的实现
  }
}

回滚机制设计

1. 渐进式回滚

class MigrationRollback {
  private migrationHistory: MigrationRecord[] = []
  
  async performRollback(targetVersion: string) {
    console.log(`开始回滚到版本: ${targetVersion}`)
    
    // 找到目标版本
    const targetMigration = this.migrationHistory.find(
      m => m.version === targetVersion
    )
    
    if (!targetMigration) {
      throw new Error(`未找到目标版本: ${targetVersion}`)
    }
    
    // 执行回滚步骤
    for (let i = this.migrationHistory.length - 1; i >= 0; i--) {
      const migration = this.migrationHistory[i]
      
      if (migration.version === targetVersion) break
      
      await this.rollbackSingleMigration(migration)
      console.log(`已回滚版本: ${migration.version}`)
    }
  }
  
  private async rollbackSingleMigration(migration: MigrationRecord) {
    switch (migration.type) {
      case 'add_column':
        await this.rollbackAddColumn(migration)
        break
      case 'create_table':
        await this.rollbackCreateTable(migration)
        break
      case 'data_migration':
        await this.rollbackDataMigration(migration)
        break
    }
  }
  
  private async rollbackAddColumn(migration: MigrationRecord) {
    const { table, column } = migration.details
    await prisma.$executeRaw`
      ALTER TABLE ${Prisma.raw(table)} 
      DROP COLUMN ${Prisma.raw(column)}
    `
  }
}

2. 数据备份策略

class MigrationBackup {
  async createBackup(migrationName: string) {
    const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
    const backupName = `${migrationName}_${timestamp}`
    
    console.log(`创建备份: ${backupName}`)
    
    // 创建备份表
    await prisma.$executeRaw`
      CREATE TABLE users_backup_${backupName} AS 
      SELECT * FROM users
    `
    
    // 记录备份信息
    await this.recordBackup({
      name: backupName,
      originalTable: 'users',
      timestamp: new Date(),
      migrationName
    })
  }
  
  async restoreFromBackup(backupName: string) {
    console.log(`从备份恢复: ${backupName}`)
    
    // 验证备份存在
    const backupTable = `users_backup_${backupName}`
    const tableExists = await this.checkTableExists(backupTable)
    
    if (!tableExists) {
      throw new Error(`备份表不存在: ${backupTable}`)
    }
    
    // 执行恢复
    await prisma.$transaction([
      // 清空当前表
      prisma.$executeRaw`TRUNCATE TABLE users`,
      // 恢复数据
      prisma.$executeRaw`
        INSERT INTO users 
        SELECT * FROM ${Prisma.raw(backupTable)}
      `
    ])
    
    console.log(`恢复完成`)
  }
  
  private async checkTableExists(tableName: string): Promise<boolean> {
    const result = await prisma.$queryRaw`
      SELECT EXISTS (
        SELECT FROM information_schema.tables 
        WHERE table_name = ${tableName}
      ) as exists
    `
    return result[0].exists
  }
}

性能优化和监控

1. 迁移性能优化

class OptimizedMigration {
  async batchProcess<T>(
    items: T[],
    processor: (batch: T[]) => Promise<void>,
    batchSize: number = 1000
  ) {
    const batches = this.chunkArray(items, batchSize)
    
    for (const [index, batch] of batches.entries()) {
      const startTime = Date.now()
      
      try {
        await processor(batch)
        const duration = Date.now() - startTime
        
        console.log(`批次 ${index + 1}/${batches.length} 完成,耗时: ${duration}ms`)
        
        // 添加延迟以避免数据库压力
        if (duration < 1000) {
          await this.sleep(100)
        }
        
      } catch (error) {
        console.error(`批次 ${index + 1} 处理失败:`, error)
        throw error
      }
    }
  }
  
  private chunkArray<T>(array: T[], size: number): T[][] {
    const chunks: T[][] = []
    for (let i = 0; i < array.length; i += size) {
      chunks.push(array.slice(i, i + size))
    }
    return chunks
  }
  
  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms))
  }
}

2. 迁移监控

class MigrationMonitor {
  private metrics: MigrationMetrics = {
    startTime: null,
    endTime: null,
    recordsProcessed: 0,
    errors: [],
    checkpoints: []
  }
  
  async startMonitoring(migrationName: string) {
    this.metrics.startTime = new Date()
    console.log(`开始监控迁移: ${migrationName}`)
    
    // 设置定期检查点
    setInterval(() => {
      this.recordCheckpoint()
    }, 30000) // 每30秒记录一次
  }
  
  recordCheckpoint() {
    this.metrics.checkpoints.push({
      timestamp: new Date(),
      memoryUsage: process.memoryUsage(),
      connections: this.getActiveConnections(),
      recordsProcessed: this.metrics.recordsProcessed
    })
  }
  
  getMigrationStatus(): MigrationStatus {
    const duration = this.metrics.endTime 
      ? this.metrics.endTime.getTime() - this.metrics.startTime!.getTime()
      : Date.now() - this.metrics.startTime!.getTime()
    
    return {
      duration,
      recordsProcessed: this.metrics.recordsProcessed,
      errorCount: this.metrics.errors.length,
      checkpoints: this.metrics.checkpoints.length,
      isRunning: !this.metrics.endTime
    }
  }
}

最佳实践总结

1. 迁移前准备

  • 完整备份:确保数据可恢复
  • 环境隔离:在生产环境执行前充分测试
  • 回滚计划:制定详细的回滚策略
  • 监控准备:设置性能监控和告警

2. 迁移执行原则

  • 渐进式变更:避免大规模同时修改
  • 向前兼容:确保旧版本应用可继续运行
  • 性能监控:实时监控数据库性能
  • 数据验证:定期验证数据一致性

3. 迁移后处理

  • 清理工作:移除临时结构和代码
  • 性能优化:重新分析查询计划
  • 文档更新:更新相关文档
  • 团队沟通:通知相关团队变更完成

结论

通过合理运用 expand-contract 模式和 Prisma 强大的迁移能力,我们可以实现几乎零停机的数据库 schema 演进。关键在于:

  1. 分阶段执行:严格遵循 expand-contract 的阶段性原则
  2. 向前兼容:确保新旧版本应用可以共存
  3. 数据安全:建立完善的备份和回滚机制
  4. 持续监控:实时监控迁移过程和系统性能

这种模式虽然增加了迁移的复杂性,但大大降低了生产环境的风险,为现代微服务架构提供了更安全、更可靠的数据库演进方案。

参考资料

查看归档