Hotdry.
systems

Twenty开源CRM的多租户架构与实时同步工程实现

深入分析Twenty开源CRM的多租户数据隔离架构设计,探讨实时同步的WebSocket实现方案与工程化参数配置。

在构建企业级 CRM 系统时,多租户架构设计与实时数据同步是两个核心挑战。Twenty 作为开源的 Salesforce 替代品,采用 TypeScript、NestJS、PostgreSQL 和 Redis 等技术栈,为开发者提供了一个可参考的工程实现范例。本文将从架构设计原则、数据隔离模式、实时同步机制三个维度,深入分析企业级 CRM 系统的工程实现方案。

多租户架构的设计挑战与核心原则

企业级 CRM 系统需要同时服务多个客户(租户),每个租户的数据必须严格隔离,同时又要保证系统的可扩展性和运维效率。Twenty 的设计目标明确:构建一个现代化的开源 CRM,既要满足企业级的数据安全要求,又要保持社区驱动的灵活性。

多租户架构的核心设计原则包括:

  1. 数据隔离性:确保不同租户的数据在存储、访问、处理层面完全隔离
  2. 性能可预测性:避免一个租户的高负载影响其他租户的系统性能
  3. 运维可管理性:支持租户级别的备份、恢复、监控和迁移
  4. 成本可优化性:在保证隔离性的前提下,最大化资源共享以降低成本

根据 Microsoft 的多租户 SaaS 数据库模式文档,多租户架构主要有三种实现模式:单租户数据库模式、多租户数据库模式和混合模式。每种模式都有其适用场景和权衡点。

三种多租户数据隔离模式的工程实现对比

1. 单租户数据库模式(Database-per-Tenant)

在这种模式下,每个租户拥有独立的数据库实例。Twenty 如果采用这种模式,技术实现相对简单:

// 伪代码示例:基于租户ID动态选择数据库连接
class TenantDatabaseManager {
  private connections: Map<string, Connection> = new Map();
  
  async getConnection(tenantId: string): Promise<Connection> {
    if (!this.connections.has(tenantId)) {
      const config = await this.getTenantDatabaseConfig(tenantId);
      const connection = await createConnection(config);
      this.connections.set(tenantId, connection);
    }
    return this.connections.get(tenantId)!;
  }
  
  // 执行租户特定的查询
  async executeQuery(tenantId: string, query: string, params: any[]) {
    const connection = await this.getConnection(tenantId);
    return connection.query(query, params);
  }
}

优势

  • 数据隔离性最强,物理层面完全分离
  • 支持租户级别的备份和恢复
  • 性能隔离性好,一个租户的查询不会影响其他租户

劣势

  • 数据库连接数随租户数量线性增长
  • 运维复杂度高,需要管理大量数据库实例
  • 成本较高,每个数据库都有基础开销

2. 多租户数据库模式(Shared Database, Shared Schema)

这是最常用的多租户模式,所有租户共享同一个数据库和表结构,通过tenant_id字段进行逻辑隔离:

-- 所有表都包含tenant_id字段
CREATE TABLE companies (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL,
  name VARCHAR(255) NOT NULL,
  created_at TIMESTAMP DEFAULT NOW(),
  FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);

-- 创建索引确保查询性能
CREATE INDEX idx_companies_tenant_id ON companies(tenant_id);
CREATE INDEX idx_companies_tenant_created ON companies(tenant_id, created_at);

在应用层,需要确保所有查询都包含租户过滤条件:

// NestJS服务层实现租户数据隔离
@Injectable()
export class CompanyService {
  constructor(
    @InjectRepository(Company)
    private companyRepository: Repository<Company>,
    private tenantContext: TenantContextService
  ) {}
  
  async findAll(): Promise<Company[]> {
    const tenantId = this.tenantContext.getCurrentTenantId();
    return this.companyRepository.find({
      where: { tenantId },
      order: { createdAt: 'DESC' }
    });
  }
  
  // 自动注入租户ID到新建记录
  async create(createCompanyDto: CreateCompanyDto): Promise<Company> {
    const tenantId = this.tenantContext.getCurrentTenantId();
    const company = this.companyRepository.create({
      ...createCompanyDto,
      tenantId
    });
    return this.companyRepository.save(company);
  }
}

优势

  • 资源利用率高,共享数据库连接池
  • 运维简单,只需管理少量数据库
  • 成本效益好,适合中小型租户

劣势

  • 数据隔离依赖应用层逻辑,存在误操作风险
  • 性能隔离较差,大租户可能影响小租户
  • 租户级别的备份恢复复杂

3. 混合模式(Hybrid Approach)

混合模式结合了前两种模式的优点,根据租户规模和需求采用不同的策略:

// 根据租户规模选择不同的存储策略
enum TenantStorageStrategy {
  SHARED = 'shared',      // 小租户,共享数据库
  DEDICATED = 'dedicated', // 中租户,专用schema
  ISOLATED = 'isolated'   // 大企业,独立数据库
}

class HybridTenantManager {
  async getStorageStrategy(tenantId: string): Promise<TenantStorageStrategy> {
    const tenant = await this.getTenantInfo(tenantId);
    
    // 根据租户规模、数据敏感性、性能要求决定策略
    if (tenant.userCount > 1000 || tenant.isEnterprise) {
      return TenantStorageStrategy.ISOLATED;
    } else if (tenant.userCount > 100) {
      return TenantStorageStrategy.DEDICATED;
    } else {
      return TenantStorageStrategy.SHARED;
    }
  }
  
  async executeQuery(tenantId: string, query: string): Promise<any> {
    const strategy = await this.getStorageStrategy(tenantId);
    
    switch (strategy) {
      case TenantStorageStrategy.SHARED:
        return this.executeSharedQuery(tenantId, query);
      case TenantStorageStrategy.DEDICATED:
        return this.executeDedicatedSchemaQuery(tenantId, query);
      case TenantStorageStrategy.ISOLATED:
        return this.executeIsolatedDatabaseQuery(tenantId, query);
    }
  }
}

工程实现要点

  • 需要统一的租户路由层,透明处理不同存储策略
  • 实现跨策略的数据迁移工具
  • 建立租户元数据管理系统,跟踪存储策略和配置

Twenty 的实时同步架构与 WebSocket 实现

企业级 CRM 系统需要实时反映数据变化,Twenty 通过 WebSocket 实现前端实时更新。根据 GitHub issue #14671 的要求,实时更新需要满足以下关键指标:

实时同步的核心需求

  1. 低延迟要求:数据变更到前端显示的延迟 ≤ 1 秒
  2. 上下文保持:刷新时保持用户的筛选、分页、滚动位置
  3. 权限感知:只推送用户有权限查看的数据变更
  4. 优雅降级:WebSocket 连接断开时自动重连,不影响核心功能
  5. 变更审计:每条实时消息包含变更元数据(谁、何时、什么操作)

WebSocket 服务架构设计

// WebSocket网关实现(基于NestJS)
@WebSocketGateway({
  cors: true,
  transports: ['websocket'],
  pingInterval: 25000,      // 25秒心跳间隔
  pingTimeout: 5000,        // 5秒超时
  maxPayload: 1048576       // 1MB最大消息大小
})
export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect {
  private clients: Map<string, WebSocket> = new Map();
  private tenantRooms: Map<string, Set<string>> = new Map();
  
  @WebSocketServer()
  server: Server;
  
  // 客户端连接处理
  async handleConnection(client: WebSocket, request: Request) {
    const token = this.extractToken(request);
    const payload = await this.authService.verifyToken(token);
    
    const clientId = payload.userId;
    const tenantId = payload.tenantId;
    
    // 存储客户端连接
    this.clients.set(clientId, client);
    
    // 加入租户房间
    if (!this.tenantRooms.has(tenantId)) {
      this.tenantRooms.set(tenantId, new Set());
    }
    this.tenantRooms.get(tenantId)!.add(clientId);
    
    // 发送连接确认
    client.send(JSON.stringify({
      type: 'CONNECTED',
      timestamp: Date.now(),
      clientId,
      tenantId
    }));
    
    // 记录连接日志
    this.logger.log(`Client ${clientId} connected for tenant ${tenantId}`);
  }
  
  // 广播租户级别的数据变更
  async broadcastToTenant(tenantId: string, event: RealtimeEvent) {
    const clientIds = this.tenantRooms.get(tenantId) || new Set();
    
    for (const clientId of clientIds) {
      const client = this.clients.get(clientId);
      if (client && client.readyState === WebSocket.OPEN) {
        try {
          client.send(JSON.stringify(event));
        } catch (error) {
          this.logger.error(`Failed to send to client ${clientId}:`, error);
        }
      }
    }
  }
  
  // 处理数据库变更事件
  @OnEvent('record.created')
  async handleRecordCreated(payload: RecordCreatedPayload) {
    const { tenantId, record, userId, recordType } = payload;
    
    // 构建实时事件
    const event: RealtimeEvent = {
      type: 'RECORD_CREATED',
      timestamp: Date.now(),
      tenantId,
      userId,
      data: {
        recordType,
        record,
        operation: 'CREATE'
      },
      metadata: {
        source: 'database',
        version: '1.0'
      }
    };
    
    // 广播给租户内所有客户端
    await this.broadcastToTenant(tenantId, event);
  }
}

Redis Pub/Sub 实现事件分发

为了支持水平扩展,Twenty 可以使用 Redis 作为消息中间件:

// Redis事件发布者
@Injectable()
export class RedisEventPublisher {
  constructor(private redisService: RedisService) {}
  
  async publishRecordEvent(
    tenantId: string,
    eventType: string,
    data: any
  ): Promise<void> {
    const channel = `tenant:${tenantId}:events`;
    const message = JSON.stringify({
      type: eventType,
      timestamp: Date.now(),
      data,
      tenantId
    });
    
    await this.redisService.publish(channel, message);
    
    // 同时发布到全局监控通道
    await this.redisService.publish(
      'monitoring:realtime_events',
      JSON.stringify({
        tenantId,
        eventType,
        size: message.length,
        timestamp: Date.now()
      })
    );
  }
}

// Redis事件订阅者(每个WebSocket实例)
@Injectable()
export class RedisEventSubscriber implements OnModuleInit {
  private subscribers: Map<string, RedisClient> = new Map();
  
  async onModuleInit() {
    // 为每个租户创建独立的订阅连接
    // 避免一个租户的高频事件阻塞其他租户
  }
  
  async subscribeToTenant(tenantId: string, callback: (message: any) => void) {
    const client = createRedisClient();
    const channel = `tenant:${tenantId}:events`;
    
    await client.subscribe(channel);
    client.on('message', (ch, msg) => {
      if (ch === channel) {
        try {
          const parsed = JSON.parse(msg);
          callback(parsed);
        } catch (error) {
          this.logger.error(`Failed to parse message for tenant ${tenantId}:`, error);
        }
      }
    });
    
    this.subscribers.set(tenantId, client);
  }
}

可落地的监控指标与性能优化清单

关键监控指标

  1. WebSocket 连接健康度

    • 活跃连接数(按租户统计)
    • 连接建立成功率(目标:>99.9%)
    • 平均连接时长
    • 心跳丢失率(阈值:<1%)
  2. 实时消息性能

    • 端到端延迟 P95(目标:≤1 秒)
    • 消息投递成功率(目标:>99.95%)
    • 消息积压队列长度(阈值:<1000)
    • 每秒消息处理量(按租户统计)
  3. 多租户隔离效果

    • 租户间性能影响系数(目标:<5%)
    • 租户资源使用分布(识别热点租户)
    • 跨租户查询比例(安全审计)

性能优化参数配置

# 实时同步服务配置示例
realtime:
  websocket:
    max_connections_per_instance: 10000
    heartbeat_interval_ms: 25000
    heartbeat_timeout_ms: 5000
    max_message_size_bytes: 1048576
    compression_threshold_bytes: 1024
    
  redis:
    connection_pool_size: 50
    max_retries: 3
    retry_delay_ms: 100
    pubsub_channels_per_tenant: 5
    
  batching:
    enabled: true
    max_batch_size: 50
    max_batch_delay_ms: 100
    flush_on_idle: true
    
  backpressure:
    max_pending_messages: 1000
    slow_client_timeout_ms: 30000
    tenant_rate_limit_per_second: 1000

运维检查清单

  1. 容量规划

    • 预估每个租户的活跃用户数
    • 计算峰值消息吞吐量需求
    • 预留 30% 的容量缓冲
  2. 故障恢复

    • 实现 WebSocket 连接自动重连(退避策略:1s, 2s, 4s, 8s...)
    • 建立消息重发机制(幂等性保证)
    • 配置租户级别的熔断器
  3. 安全加固

    • WebSocket 连接认证与授权
    • 消息内容加密(敏感字段)
    • 租户数据访问审计日志
    • DDoS 防护与速率限制
  4. 监控告警

    • 延迟超过 1 秒的告警
    • 消息投递失败率 > 0.1% 的告警
    • 租户资源使用异常的告警
    • 连接数突增 / 突降的告警

工程实践建议

基于 Twenty 的架构分析,对于构建类似的企业级多租户 CRM 系统,建议:

  1. 渐进式架构演进:从小规模的多租户共享数据库开始,随着业务增长逐步引入混合模式。过早优化会增加系统复杂度。

  2. 实时同步的分层设计:将实时消息分为关键业务事件(如订单创建)和非关键事件(如状态更新),采用不同的可靠性和延迟要求。

  3. 租户感知的限流策略:根据租户的付费等级和服务等级协议(SLA)实施差异化的速率限制和性能保障。

  4. 自动化测试覆盖:建立多租户场景的自动化测试套件,包括数据隔离测试、并发冲突测试、实时同步一致性测试。

  5. 可观测性优先:在架构设计阶段就考虑监控需求,确保能够追踪每个租户的端到端性能指标。

Twenty 作为开源 CRM 项目,其架构设计体现了现代 SaaS 应用的工程最佳实践。通过合理的多租户隔离策略和高效的实时同步机制,它为企业级 CRM 系统提供了一个可参考的实现范例。在实际工程落地时,需要根据具体的业务规模、安全要求和性能目标,对这些架构模式进行适当的调整和优化。

资料来源

  1. GitHub - twentyhq/twenty: Building a modern alternative to Salesforce, powered by the community.
  2. Microsoft Learn - Multitenant SaaS database tenancy patterns
查看归档