在构建企业级 CRM 系统时,多租户架构设计与实时数据同步是两个核心挑战。Twenty 作为开源的 Salesforce 替代品,采用 TypeScript、NestJS、PostgreSQL 和 Redis 等技术栈,为开发者提供了一个可参考的工程实现范例。本文将从架构设计原则、数据隔离模式、实时同步机制三个维度,深入分析企业级 CRM 系统的工程实现方案。
多租户架构的设计挑战与核心原则
企业级 CRM 系统需要同时服务多个客户(租户),每个租户的数据必须严格隔离,同时又要保证系统的可扩展性和运维效率。Twenty 的设计目标明确:构建一个现代化的开源 CRM,既要满足企业级的数据安全要求,又要保持社区驱动的灵活性。
多租户架构的核心设计原则包括:
- 数据隔离性:确保不同租户的数据在存储、访问、处理层面完全隔离
- 性能可预测性:避免一个租户的高负载影响其他租户的系统性能
- 运维可管理性:支持租户级别的备份、恢复、监控和迁移
- 成本可优化性:在保证隔离性的前提下,最大化资源共享以降低成本
根据 Microsoft 的多租户 SaaS 数据库模式文档,多租户架构主要有三种实现模式:单租户数据库模式、多租户数据库模式和混合模式。每种模式都有其适用场景和权衡点。
三种多租户数据隔离模式的工程实现对比
1. 单租户数据库模式(Database-per-Tenant)
在这种模式下,每个租户拥有独立的数据库实例。Twenty 如果采用这种模式,技术实现相对简单:
// 伪代码示例:基于租户ID动态选择数据库连接
class TenantDatabaseManager {
private connections: Map<string, Connection> = new Map();
async getConnection(tenantId: string): Promise<Connection> {
if (!this.connections.has(tenantId)) {
const config = await this.getTenantDatabaseConfig(tenantId);
const connection = await createConnection(config);
this.connections.set(tenantId, connection);
}
return this.connections.get(tenantId)!;
}
// 执行租户特定的查询
async executeQuery(tenantId: string, query: string, params: any[]) {
const connection = await this.getConnection(tenantId);
return connection.query(query, params);
}
}
优势:
- 数据隔离性最强,物理层面完全分离
- 支持租户级别的备份和恢复
- 性能隔离性好,一个租户的查询不会影响其他租户
劣势:
- 数据库连接数随租户数量线性增长
- 运维复杂度高,需要管理大量数据库实例
- 成本较高,每个数据库都有基础开销
2. 多租户数据库模式(Shared Database, Shared Schema)
这是最常用的多租户模式,所有租户共享同一个数据库和表结构,通过tenant_id字段进行逻辑隔离:
-- 所有表都包含tenant_id字段
CREATE TABLE companies (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);
-- 创建索引确保查询性能
CREATE INDEX idx_companies_tenant_id ON companies(tenant_id);
CREATE INDEX idx_companies_tenant_created ON companies(tenant_id, created_at);
在应用层,需要确保所有查询都包含租户过滤条件:
// NestJS服务层实现租户数据隔离
@Injectable()
export class CompanyService {
constructor(
@InjectRepository(Company)
private companyRepository: Repository<Company>,
private tenantContext: TenantContextService
) {}
async findAll(): Promise<Company[]> {
const tenantId = this.tenantContext.getCurrentTenantId();
return this.companyRepository.find({
where: { tenantId },
order: { createdAt: 'DESC' }
});
}
// 自动注入租户ID到新建记录
async create(createCompanyDto: CreateCompanyDto): Promise<Company> {
const tenantId = this.tenantContext.getCurrentTenantId();
const company = this.companyRepository.create({
...createCompanyDto,
tenantId
});
return this.companyRepository.save(company);
}
}
优势:
- 资源利用率高,共享数据库连接池
- 运维简单,只需管理少量数据库
- 成本效益好,适合中小型租户
劣势:
- 数据隔离依赖应用层逻辑,存在误操作风险
- 性能隔离较差,大租户可能影响小租户
- 租户级别的备份恢复复杂
3. 混合模式(Hybrid Approach)
混合模式结合了前两种模式的优点,根据租户规模和需求采用不同的策略:
// 根据租户规模选择不同的存储策略
enum TenantStorageStrategy {
SHARED = 'shared', // 小租户,共享数据库
DEDICATED = 'dedicated', // 中租户,专用schema
ISOLATED = 'isolated' // 大企业,独立数据库
}
class HybridTenantManager {
async getStorageStrategy(tenantId: string): Promise<TenantStorageStrategy> {
const tenant = await this.getTenantInfo(tenantId);
// 根据租户规模、数据敏感性、性能要求决定策略
if (tenant.userCount > 1000 || tenant.isEnterprise) {
return TenantStorageStrategy.ISOLATED;
} else if (tenant.userCount > 100) {
return TenantStorageStrategy.DEDICATED;
} else {
return TenantStorageStrategy.SHARED;
}
}
async executeQuery(tenantId: string, query: string): Promise<any> {
const strategy = await this.getStorageStrategy(tenantId);
switch (strategy) {
case TenantStorageStrategy.SHARED:
return this.executeSharedQuery(tenantId, query);
case TenantStorageStrategy.DEDICATED:
return this.executeDedicatedSchemaQuery(tenantId, query);
case TenantStorageStrategy.ISOLATED:
return this.executeIsolatedDatabaseQuery(tenantId, query);
}
}
}
工程实现要点:
- 需要统一的租户路由层,透明处理不同存储策略
- 实现跨策略的数据迁移工具
- 建立租户元数据管理系统,跟踪存储策略和配置
Twenty 的实时同步架构与 WebSocket 实现
企业级 CRM 系统需要实时反映数据变化,Twenty 通过 WebSocket 实现前端实时更新。根据 GitHub issue #14671 的要求,实时更新需要满足以下关键指标:
实时同步的核心需求
- 低延迟要求:数据变更到前端显示的延迟 ≤ 1 秒
- 上下文保持:刷新时保持用户的筛选、分页、滚动位置
- 权限感知:只推送用户有权限查看的数据变更
- 优雅降级:WebSocket 连接断开时自动重连,不影响核心功能
- 变更审计:每条实时消息包含变更元数据(谁、何时、什么操作)
WebSocket 服务架构设计
// WebSocket网关实现(基于NestJS)
@WebSocketGateway({
cors: true,
transports: ['websocket'],
pingInterval: 25000, // 25秒心跳间隔
pingTimeout: 5000, // 5秒超时
maxPayload: 1048576 // 1MB最大消息大小
})
export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect {
private clients: Map<string, WebSocket> = new Map();
private tenantRooms: Map<string, Set<string>> = new Map();
@WebSocketServer()
server: Server;
// 客户端连接处理
async handleConnection(client: WebSocket, request: Request) {
const token = this.extractToken(request);
const payload = await this.authService.verifyToken(token);
const clientId = payload.userId;
const tenantId = payload.tenantId;
// 存储客户端连接
this.clients.set(clientId, client);
// 加入租户房间
if (!this.tenantRooms.has(tenantId)) {
this.tenantRooms.set(tenantId, new Set());
}
this.tenantRooms.get(tenantId)!.add(clientId);
// 发送连接确认
client.send(JSON.stringify({
type: 'CONNECTED',
timestamp: Date.now(),
clientId,
tenantId
}));
// 记录连接日志
this.logger.log(`Client ${clientId} connected for tenant ${tenantId}`);
}
// 广播租户级别的数据变更
async broadcastToTenant(tenantId: string, event: RealtimeEvent) {
const clientIds = this.tenantRooms.get(tenantId) || new Set();
for (const clientId of clientIds) {
const client = this.clients.get(clientId);
if (client && client.readyState === WebSocket.OPEN) {
try {
client.send(JSON.stringify(event));
} catch (error) {
this.logger.error(`Failed to send to client ${clientId}:`, error);
}
}
}
}
// 处理数据库变更事件
@OnEvent('record.created')
async handleRecordCreated(payload: RecordCreatedPayload) {
const { tenantId, record, userId, recordType } = payload;
// 构建实时事件
const event: RealtimeEvent = {
type: 'RECORD_CREATED',
timestamp: Date.now(),
tenantId,
userId,
data: {
recordType,
record,
operation: 'CREATE'
},
metadata: {
source: 'database',
version: '1.0'
}
};
// 广播给租户内所有客户端
await this.broadcastToTenant(tenantId, event);
}
}
Redis Pub/Sub 实现事件分发
为了支持水平扩展,Twenty 可以使用 Redis 作为消息中间件:
// Redis事件发布者
@Injectable()
export class RedisEventPublisher {
constructor(private redisService: RedisService) {}
async publishRecordEvent(
tenantId: string,
eventType: string,
data: any
): Promise<void> {
const channel = `tenant:${tenantId}:events`;
const message = JSON.stringify({
type: eventType,
timestamp: Date.now(),
data,
tenantId
});
await this.redisService.publish(channel, message);
// 同时发布到全局监控通道
await this.redisService.publish(
'monitoring:realtime_events',
JSON.stringify({
tenantId,
eventType,
size: message.length,
timestamp: Date.now()
})
);
}
}
// Redis事件订阅者(每个WebSocket实例)
@Injectable()
export class RedisEventSubscriber implements OnModuleInit {
private subscribers: Map<string, RedisClient> = new Map();
async onModuleInit() {
// 为每个租户创建独立的订阅连接
// 避免一个租户的高频事件阻塞其他租户
}
async subscribeToTenant(tenantId: string, callback: (message: any) => void) {
const client = createRedisClient();
const channel = `tenant:${tenantId}:events`;
await client.subscribe(channel);
client.on('message', (ch, msg) => {
if (ch === channel) {
try {
const parsed = JSON.parse(msg);
callback(parsed);
} catch (error) {
this.logger.error(`Failed to parse message for tenant ${tenantId}:`, error);
}
}
});
this.subscribers.set(tenantId, client);
}
}
可落地的监控指标与性能优化清单
关键监控指标
-
WebSocket 连接健康度
- 活跃连接数(按租户统计)
- 连接建立成功率(目标:>99.9%)
- 平均连接时长
- 心跳丢失率(阈值:<1%)
-
实时消息性能
- 端到端延迟 P95(目标:≤1 秒)
- 消息投递成功率(目标:>99.95%)
- 消息积压队列长度(阈值:<1000)
- 每秒消息处理量(按租户统计)
-
多租户隔离效果
- 租户间性能影响系数(目标:<5%)
- 租户资源使用分布(识别热点租户)
- 跨租户查询比例(安全审计)
性能优化参数配置
# 实时同步服务配置示例
realtime:
websocket:
max_connections_per_instance: 10000
heartbeat_interval_ms: 25000
heartbeat_timeout_ms: 5000
max_message_size_bytes: 1048576
compression_threshold_bytes: 1024
redis:
connection_pool_size: 50
max_retries: 3
retry_delay_ms: 100
pubsub_channels_per_tenant: 5
batching:
enabled: true
max_batch_size: 50
max_batch_delay_ms: 100
flush_on_idle: true
backpressure:
max_pending_messages: 1000
slow_client_timeout_ms: 30000
tenant_rate_limit_per_second: 1000
运维检查清单
-
容量规划
- 预估每个租户的活跃用户数
- 计算峰值消息吞吐量需求
- 预留 30% 的容量缓冲
-
故障恢复
- 实现 WebSocket 连接自动重连(退避策略:1s, 2s, 4s, 8s...)
- 建立消息重发机制(幂等性保证)
- 配置租户级别的熔断器
-
安全加固
- WebSocket 连接认证与授权
- 消息内容加密(敏感字段)
- 租户数据访问审计日志
- DDoS 防护与速率限制
-
监控告警
- 延迟超过 1 秒的告警
- 消息投递失败率 > 0.1% 的告警
- 租户资源使用异常的告警
- 连接数突增 / 突降的告警
工程实践建议
基于 Twenty 的架构分析,对于构建类似的企业级多租户 CRM 系统,建议:
-
渐进式架构演进:从小规模的多租户共享数据库开始,随着业务增长逐步引入混合模式。过早优化会增加系统复杂度。
-
实时同步的分层设计:将实时消息分为关键业务事件(如订单创建)和非关键事件(如状态更新),采用不同的可靠性和延迟要求。
-
租户感知的限流策略:根据租户的付费等级和服务等级协议(SLA)实施差异化的速率限制和性能保障。
-
自动化测试覆盖:建立多租户场景的自动化测试套件,包括数据隔离测试、并发冲突测试、实时同步一致性测试。
-
可观测性优先:在架构设计阶段就考虑监控需求,确保能够追踪每个租户的端到端性能指标。
Twenty 作为开源 CRM 项目,其架构设计体现了现代 SaaS 应用的工程最佳实践。通过合理的多租户隔离策略和高效的实时同步机制,它为企业级 CRM 系统提供了一个可参考的实现范例。在实际工程落地时,需要根据具体的业务规模、安全要求和性能目标,对这些架构模式进行适当的调整和优化。
资料来源:
- GitHub - twentyhq/twenty: Building a modern alternative to Salesforce, powered by the community.
- Microsoft Learn - Multitenant SaaS database tenancy patterns