在现代 Web 应用中,大文件上传是一个常见但复杂的需求。传统的一次性上传方式在面对网络不稳定、服务器重启或用户页面刷新时显得力不从心。Phoenix LiveView 作为 Elixir 生态中的实时 Web 框架,提供了一套完整的流式上传解决方案,支持分块传输、断点续传和实时进度反馈。本文将深入探讨这一技术的工程实现细节。
LiveView 上传基础机制
Phoenix LiveView 通过allow_upload/3函数提供原生的文件上传支持。这个看似简单的 API 背后隐藏着复杂的分块传输逻辑。正如 Fly.io 的技术文章所指出的,"LiveView 自动将大文件分割成小块进行传输,开发者无需手动管理这一过程"。
基础配置示例如下:
def mount(_params, _session, socket) do
{:ok,
socket
|> assign(:uploaded_files, [])
|> allow_upload(:documents,
accept: ~w(.pdf .doc .docx),
max_entries: 10,
max_file_size: 100_000_000, # 100MB
chunk_size: 64_000, # 64KB分块
auto_upload: true
)
}
end
关键参数说明:
max_entries: 最大文件数量,控制并发上传max_file_size: 单个文件大小限制,需根据业务需求调整chunk_size: 分块大小,默认 64KB,对于大文件可适当增大auto_upload: 自动开始上传,避免用户额外操作
分块流式上传的 UploadWriter 实现
对于需要自定义处理逻辑的场景,LiveView 提供了UploadWriter接口。这个接口允许开发者完全控制文件块的接收和处理过程,实现真正的流式上传。
自定义 UploadWriter 示例
defmodule CustomStreamWriter do
@behaviour Phoenix.LiveView.UploadWriter
@impl true
def init(_opts) do
# 初始化状态,如创建临时文件、连接云存储等
file_name = generate_unique_filename()
{:ok, %{
file_name: file_name,
chunks_received: 0,
total_size: 0,
storage_backend: init_storage_backend(file_name)
}}
end
@impl true
def write_chunk(data, state) do
# 处理每个数据块
case process_chunk(data, state.storage_backend) do
{:ok, updated_backend} ->
new_state = %{
state |
chunks_received: state.chunks_received + 1,
total_size: state.total_size + byte_size(data),
storage_backend: updated_backend
}
{:ok, new_state}
{:error, reason} ->
{:error, reason, state}
end
end
@impl true
def close(state, reason) do
# 上传完成或出错时的清理工作
case reason do
:done -> finalize_upload(state.storage_backend)
_ -> cleanup(state.storage_backend)
end
{:ok, state}
end
@impl true
def meta(state) do
# 返回前端可用的元数据
%{
file_name: state.file_name,
progress: calculate_progress(state),
chunks_received: state.chunks_received
}
end
end
多目的地上传策略
一个实用的场景是同时将文件保存到本地和云存储。通过自定义 UploadWriter,可以实现 "一次上传,多处存储":
defmodule MultiDestinationWriter do
@behaviour Phoenix.LiveView.UploadWriter
def init(_opts) do
# 初始化本地文件和S3多部分上传
local_path = Plug.Upload.random_file("upload")
s3_upload_id = ExAws.S3.initiate_multipart_upload("bucket", "key")
{:ok, %{
local_file: File.open!(local_path, [:write, :binary]),
s3_upload_id: s3_upload_id,
s3_parts: [],
current_part: 1
}}
end
def write_chunk(data, state) do
# 写入本地文件
:ok = IO.binwrite(state.local_file, data)
# 上传到S3
part_result = ExAws.S3.upload_part(
"bucket",
"key",
state.current_part,
data,
upload_id: state.s3_upload_id
)
{:ok, %{
state |
s3_parts: [part_result | state.s3_parts],
current_part: state.current_part + 1
}}
end
end
断点续传的客户端与服务端协同
断点续传是大文件上传的核心需求之一。实现这一功能需要客户端和服务端的紧密配合。
客户端存储策略
对于断点续传,客户端需要持久化上传状态。IndexDB 是理想的选择:
// 前端JavaScript实现
class ResumableUploadManager {
constructor() {
this.db = null;
this.initIndexDB();
}
async initIndexDB() {
const request = indexedDB.open('UploadCache', 1);
request.onupgradeneeded = (event) => {
const db = event.target.result;
if (!db.objectStoreNames.contains('uploads')) {
const store = db.createObjectStore('uploads', { keyPath: 'fileId' });
store.createIndex('status', 'status', { unique: false });
}
};
this.db = await new Promise((resolve, reject) => {
request.onsuccess = () => resolve(request.result);
request.onerror = () => reject(request.error);
});
}
async saveUploadState(fileId, fileData, uploadedChunks) {
const transaction = this.db.transaction(['uploads'], 'readwrite');
const store = transaction.objectStore('uploads');
await store.put({
fileId,
fileData,
uploadedChunks,
status: 'in_progress',
timestamp: Date.now()
});
}
async getPendingUploads() {
const transaction = this.db.transaction(['uploads'], 'readonly');
const store = transaction.objectStore('uploads');
const index = store.index('status');
return new Promise((resolve) => {
const request = index.getAll('in_progress');
request.onsuccess = () => resolve(request.result);
});
}
}
服务端状态管理
服务端需要跟踪每个文件的上传进度,以便在中断后能够恢复:
defmodule UploadTracker do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(state) do
{:ok, state}
end
def track_progress(file_id, chunk_index, total_chunks) do
GenServer.cast(__MODULE__, {:track, file_id, chunk_index, total_chunks})
end
def get_progress(file_id) do
GenServer.call(__MODULE__, {:get_progress, file_id})
end
def handle_cast({:track, file_id, chunk_index, total_chunks}, state) do
progress = Float.round(chunk_index / total_chunks * 100, 1)
new_state = Map.put(state, file_id, %{
progress: progress,
last_updated: DateTime.utc_now(),
chunk_index: chunk_index,
total_chunks: total_chunks
})
{:noreply, new_state}
end
def handle_call({:get_progress, file_id}, _from, state) do
progress = Map.get(state, file_id, %{progress: 0})
{:reply, progress, state}
end
end
断点续传流程
- 初始化上传:客户端生成唯一文件 ID,服务端记录初始状态
- 分块上传:客户端按顺序上传文件块,每块成功后更新状态
- 中断处理:网络断开或页面刷新时,状态保存到 IndexDB
- 恢复上传:重新加载页面后,从 IndexDB 读取状态,从断点继续
实时进度反馈与性能优化
进度反馈实现
LiveView 自动提供上传进度反馈,通过@uploadsassign 可以轻松访问:
<!-- 在模板中显示上传进度 -->
<%= for entry <- @uploads.documents.entries do %>
<div class="upload-item">
<span><%= entry.client_name %></span>
<div class="progress-bar">
<div class="progress-fill" style="width: <%= entry.progress %>%"></div>
</div>
<span><%= entry.progress %>%</span>
<%= if entry.done? do %>
<span class="status-done">✓ 完成</span>
<%= else if entry.error do %>
<span class="status-error">✗ 错误: <%= entry.error %></span>
<% end %>
</div>
<% end %>
性能优化参数
针对不同场景,需要调整上传参数以获得最佳性能:
# 小文件场景(<10MB)
allow_upload(socket, :small_files,
max_file_size: 10_000_000,
chunk_size: 32_000, # 较小分块
max_concurrency: 5 # 较低并发
)
# 大文件场景(>100MB)
allow_upload(socket, :large_files,
max_file_size: 1_000_000_000,
chunk_size: 256_000, # 较大分块减少请求数
max_concurrency: 2, # 控制并发避免内存压力
chunk_timeout: 30_000 # 延长超时时间
)
# 批量上传场景
allow_upload(socket, :batch_uploads,
max_entries: 50,
max_file_size: 50_000_000,
chunk_size: 64_000,
max_concurrency: 3, # 限制总并发数
chunk_timeout: 15_000
)
内存管理策略
大文件上传需要特别注意内存使用:
- 流式处理:使用 UploadWriter 确保数据不会全部加载到内存
- 分块大小:根据可用内存调整 chunk_size
- 并发控制:通过 max_concurrency 限制同时处理的文件数
- 超时设置:合理设置 chunk_timeout 避免长时间占用资源
defmodule MemoryAwareWriter do
@behaviour Phoenix.LiveView.UploadWriter
@max_memory_usage 100_000_000 # 100MB内存限制
def write_chunk(data, state) do
# 检查内存使用
current_memory = :erlang.memory(:total)
if current_memory > @max_memory_usage do
# 内存过高,暂停处理
Process.sleep(100)
write_chunk(data, state)
else
# 正常处理
{:ok, process_data(data, state)}
end
end
end
错误处理与监控
错误恢复机制
defmodule ResilientUploadHandler do
def handle_upload_error(:network_error, file_id, chunk_index) do
# 网络错误,重试机制
case retry_upload(file_id, chunk_index, max_retries: 3) do
{:ok, _} -> :continue
{:error, _} -> :abort
end
end
def handle_upload_error(:server_error, file_id, _chunk_index) do
# 服务器错误,记录日志并通知管理员
Logger.error("Upload server error for file #{file_id}")
notify_admin(:upload_server_error, file_id)
:pause # 暂停上传等待修复
end
defp retry_upload(file_id, chunk_index, opts) do
# 实现带指数退避的重试逻辑
max_retries = Keyword.get(opts, :max_retries, 3)
Enum.reduce_while(1..max_retries, {:error, :initial}, fn attempt, _acc ->
case attempt_upload(file_id, chunk_index) do
{:ok, result} ->
{:halt, {:ok, result}}
{:error, reason} when attempt < max_retries ->
delay = :math.pow(2, attempt) * 1000 # 指数退避
Process.sleep(round(delay))
{:cont, {:error, reason}}
{:error, reason} ->
{:halt, {:error, reason}}
end
end)
end
end
监控指标
建立完整的监控体系对于生产环境至关重要:
- 上传成功率:跟踪成功 / 失败比例
- 平均上传时间:按文件大小分段统计
- 内存使用:监控 UploadWriter 的内存消耗
- 网络质量:记录分块传输的延迟和丢包率
- 用户行为:分析中断频率和恢复成功率
最佳实践总结
- 分块大小选择:根据网络条件和文件类型动态调整,一般 64KB-256KB 为宜
- 并发控制:避免同时处理过多大文件,根据服务器资源设置合理限制
- 状态持久化:客户端使用 IndexDB,服务端使用 ETS 或数据库
- 进度反馈:提供详细的上传状态,包括预估剩余时间
- 错误处理:实现智能重试机制,区分可恢复和不可恢复错误
- 安全考虑:验证文件类型和大小,防止恶意上传
结语
Phoenix LiveView 的大文件流式上传方案提供了从基础到高级的完整工具链。通过合理利用 UploadWriter 接口、客户端存储和服务端状态管理,可以构建出既高效又可靠的文件上传系统。正如开发者 Uzair Aslam 在实践中所发现的,"结合 IndexDB 和 gcs-browser-upload 库,可以实现无缝的断点续传体验"。
在实际应用中,需要根据具体业务需求调整参数和策略。对于需要处理超大文件或高并发上传的场景,可能还需要结合 CDN、对象存储等外部服务。但无论如何,LiveView 提供的这套基础设施都是一个优秀的起点。
资料来源: