Hotdry.
application-security

Phoenix LiveView大文件流式上传:分块传输与断点续传工程实践

深入探讨在Phoenix LiveView中实现大文件流式上传的完整方案,包括分块传输、断点续传机制、实时进度反馈,以及关键配置参数与性能优化策略。

在现代 Web 应用中,大文件上传是一个常见但复杂的需求。传统的一次性上传方式在面对网络不稳定、服务器重启或用户页面刷新时显得力不从心。Phoenix LiveView 作为 Elixir 生态中的实时 Web 框架,提供了一套完整的流式上传解决方案,支持分块传输、断点续传和实时进度反馈。本文将深入探讨这一技术的工程实现细节。

LiveView 上传基础机制

Phoenix LiveView 通过allow_upload/3函数提供原生的文件上传支持。这个看似简单的 API 背后隐藏着复杂的分块传输逻辑。正如 Fly.io 的技术文章所指出的,"LiveView 自动将大文件分割成小块进行传输,开发者无需手动管理这一过程"。

基础配置示例如下:

def mount(_params, _session, socket) do
  {:ok, 
    socket
    |> assign(:uploaded_files, [])
    |> allow_upload(:documents,
      accept: ~w(.pdf .doc .docx),
      max_entries: 10,
      max_file_size: 100_000_000,  # 100MB
      chunk_size: 64_000,          # 64KB分块
      auto_upload: true
    )
  }
end

关键参数说明:

  • max_entries: 最大文件数量,控制并发上传
  • max_file_size: 单个文件大小限制,需根据业务需求调整
  • chunk_size: 分块大小,默认 64KB,对于大文件可适当增大
  • auto_upload: 自动开始上传,避免用户额外操作

分块流式上传的 UploadWriter 实现

对于需要自定义处理逻辑的场景,LiveView 提供了UploadWriter接口。这个接口允许开发者完全控制文件块的接收和处理过程,实现真正的流式上传。

自定义 UploadWriter 示例

defmodule CustomStreamWriter do
  @behaviour Phoenix.LiveView.UploadWriter
  
  @impl true
  def init(_opts) do
    # 初始化状态,如创建临时文件、连接云存储等
    file_name = generate_unique_filename()
    {:ok, %{
      file_name: file_name,
      chunks_received: 0,
      total_size: 0,
      storage_backend: init_storage_backend(file_name)
    }}
  end
  
  @impl true
  def write_chunk(data, state) do
    # 处理每个数据块
    case process_chunk(data, state.storage_backend) do
      {:ok, updated_backend} ->
        new_state = %{
          state | 
          chunks_received: state.chunks_received + 1,
          total_size: state.total_size + byte_size(data),
          storage_backend: updated_backend
        }
        {:ok, new_state}
      {:error, reason} ->
        {:error, reason, state}
    end
  end
  
  @impl true
  def close(state, reason) do
    # 上传完成或出错时的清理工作
    case reason do
      :done -> finalize_upload(state.storage_backend)
      _ -> cleanup(state.storage_backend)
    end
    {:ok, state}
  end
  
  @impl true
  def meta(state) do
    # 返回前端可用的元数据
    %{
      file_name: state.file_name,
      progress: calculate_progress(state),
      chunks_received: state.chunks_received
    }
  end
end

多目的地上传策略

一个实用的场景是同时将文件保存到本地和云存储。通过自定义 UploadWriter,可以实现 "一次上传,多处存储":

defmodule MultiDestinationWriter do
  @behaviour Phoenix.LiveView.UploadWriter
  
  def init(_opts) do
    # 初始化本地文件和S3多部分上传
    local_path = Plug.Upload.random_file("upload")
    s3_upload_id = ExAws.S3.initiate_multipart_upload("bucket", "key")
    
    {:ok, %{
      local_file: File.open!(local_path, [:write, :binary]),
      s3_upload_id: s3_upload_id,
      s3_parts: [],
      current_part: 1
    }}
  end
  
  def write_chunk(data, state) do
    # 写入本地文件
    :ok = IO.binwrite(state.local_file, data)
    
    # 上传到S3
    part_result = ExAws.S3.upload_part(
      "bucket", 
      "key", 
      state.current_part, 
      data, 
      upload_id: state.s3_upload_id
    )
    
    {:ok, %{
      state | 
      s3_parts: [part_result | state.s3_parts],
      current_part: state.current_part + 1
    }}
  end
end

断点续传的客户端与服务端协同

断点续传是大文件上传的核心需求之一。实现这一功能需要客户端和服务端的紧密配合。

客户端存储策略

对于断点续传,客户端需要持久化上传状态。IndexDB 是理想的选择:

// 前端JavaScript实现
class ResumableUploadManager {
  constructor() {
    this.db = null;
    this.initIndexDB();
  }
  
  async initIndexDB() {
    const request = indexedDB.open('UploadCache', 1);
    
    request.onupgradeneeded = (event) => {
      const db = event.target.result;
      if (!db.objectStoreNames.contains('uploads')) {
        const store = db.createObjectStore('uploads', { keyPath: 'fileId' });
        store.createIndex('status', 'status', { unique: false });
      }
    };
    
    this.db = await new Promise((resolve, reject) => {
      request.onsuccess = () => resolve(request.result);
      request.onerror = () => reject(request.error);
    });
  }
  
  async saveUploadState(fileId, fileData, uploadedChunks) {
    const transaction = this.db.transaction(['uploads'], 'readwrite');
    const store = transaction.objectStore('uploads');
    
    await store.put({
      fileId,
      fileData,
      uploadedChunks,
      status: 'in_progress',
      timestamp: Date.now()
    });
  }
  
  async getPendingUploads() {
    const transaction = this.db.transaction(['uploads'], 'readonly');
    const store = transaction.objectStore('uploads');
    const index = store.index('status');
    
    return new Promise((resolve) => {
      const request = index.getAll('in_progress');
      request.onsuccess = () => resolve(request.result);
    });
  }
}

服务端状态管理

服务端需要跟踪每个文件的上传进度,以便在中断后能够恢复:

defmodule UploadTracker do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def init(state) do
    {:ok, state}
  end
  
  def track_progress(file_id, chunk_index, total_chunks) do
    GenServer.cast(__MODULE__, {:track, file_id, chunk_index, total_chunks})
  end
  
  def get_progress(file_id) do
    GenServer.call(__MODULE__, {:get_progress, file_id})
  end
  
  def handle_cast({:track, file_id, chunk_index, total_chunks}, state) do
    progress = Float.round(chunk_index / total_chunks * 100, 1)
    new_state = Map.put(state, file_id, %{
      progress: progress,
      last_updated: DateTime.utc_now(),
      chunk_index: chunk_index,
      total_chunks: total_chunks
    })
    {:noreply, new_state}
  end
  
  def handle_call({:get_progress, file_id}, _from, state) do
    progress = Map.get(state, file_id, %{progress: 0})
    {:reply, progress, state}
  end
end

断点续传流程

  1. 初始化上传:客户端生成唯一文件 ID,服务端记录初始状态
  2. 分块上传:客户端按顺序上传文件块,每块成功后更新状态
  3. 中断处理:网络断开或页面刷新时,状态保存到 IndexDB
  4. 恢复上传:重新加载页面后,从 IndexDB 读取状态,从断点继续

实时进度反馈与性能优化

进度反馈实现

LiveView 自动提供上传进度反馈,通过@uploadsassign 可以轻松访问:

<!-- 在模板中显示上传进度 -->
<%= for entry <- @uploads.documents.entries do %>
  <div class="upload-item">
    <span><%= entry.client_name %></span>
    <div class="progress-bar">
      <div class="progress-fill" style="width: <%= entry.progress %>%"></div>
    </div>
    <span><%= entry.progress %>%</span>
    
    <%= if entry.done? do %>
      <span class="status-done">✓ 完成</span>
    <%= else if entry.error do %>
      <span class="status-error">✗ 错误: <%= entry.error %></span>
    <% end %>
  </div>
<% end %>

性能优化参数

针对不同场景,需要调整上传参数以获得最佳性能:

# 小文件场景(<10MB)
allow_upload(socket, :small_files,
  max_file_size: 10_000_000,
  chunk_size: 32_000,      # 较小分块
  max_concurrency: 5       # 较低并发
)

# 大文件场景(>100MB)  
allow_upload(socket, :large_files,
  max_file_size: 1_000_000_000,
  chunk_size: 256_000,     # 较大分块减少请求数
  max_concurrency: 2,      # 控制并发避免内存压力
  chunk_timeout: 30_000    # 延长超时时间
)

# 批量上传场景
allow_upload(socket, :batch_uploads,
  max_entries: 50,
  max_file_size: 50_000_000,
  chunk_size: 64_000,
  max_concurrency: 3,      # 限制总并发数
  chunk_timeout: 15_000
)

内存管理策略

大文件上传需要特别注意内存使用:

  1. 流式处理:使用 UploadWriter 确保数据不会全部加载到内存
  2. 分块大小:根据可用内存调整 chunk_size
  3. 并发控制:通过 max_concurrency 限制同时处理的文件数
  4. 超时设置:合理设置 chunk_timeout 避免长时间占用资源
defmodule MemoryAwareWriter do
  @behaviour Phoenix.LiveView.UploadWriter
  
  @max_memory_usage 100_000_000  # 100MB内存限制
  
  def write_chunk(data, state) do
    # 检查内存使用
    current_memory = :erlang.memory(:total)
    
    if current_memory > @max_memory_usage do
      # 内存过高,暂停处理
      Process.sleep(100)
      write_chunk(data, state)
    else
      # 正常处理
      {:ok, process_data(data, state)}
    end
  end
end

错误处理与监控

错误恢复机制

defmodule ResilientUploadHandler do
  def handle_upload_error(:network_error, file_id, chunk_index) do
    # 网络错误,重试机制
    case retry_upload(file_id, chunk_index, max_retries: 3) do
      {:ok, _} -> :continue
      {:error, _} -> :abort
    end
  end
  
  def handle_upload_error(:server_error, file_id, _chunk_index) do
    # 服务器错误,记录日志并通知管理员
    Logger.error("Upload server error for file #{file_id}")
    notify_admin(:upload_server_error, file_id)
    :pause  # 暂停上传等待修复
  end
  
  defp retry_upload(file_id, chunk_index, opts) do
    # 实现带指数退避的重试逻辑
    max_retries = Keyword.get(opts, :max_retries, 3)
    
    Enum.reduce_while(1..max_retries, {:error, :initial}, fn attempt, _acc ->
      case attempt_upload(file_id, chunk_index) do
        {:ok, result} -> 
          {:halt, {:ok, result}}
        {:error, reason} when attempt < max_retries ->
          delay = :math.pow(2, attempt) * 1000  # 指数退避
          Process.sleep(round(delay))
          {:cont, {:error, reason}}
        {:error, reason} ->
          {:halt, {:error, reason}}
      end
    end)
  end
end

监控指标

建立完整的监控体系对于生产环境至关重要:

  1. 上传成功率:跟踪成功 / 失败比例
  2. 平均上传时间:按文件大小分段统计
  3. 内存使用:监控 UploadWriter 的内存消耗
  4. 网络质量:记录分块传输的延迟和丢包率
  5. 用户行为:分析中断频率和恢复成功率

最佳实践总结

  1. 分块大小选择:根据网络条件和文件类型动态调整,一般 64KB-256KB 为宜
  2. 并发控制:避免同时处理过多大文件,根据服务器资源设置合理限制
  3. 状态持久化:客户端使用 IndexDB,服务端使用 ETS 或数据库
  4. 进度反馈:提供详细的上传状态,包括预估剩余时间
  5. 错误处理:实现智能重试机制,区分可恢复和不可恢复错误
  6. 安全考虑:验证文件类型和大小,防止恶意上传

结语

Phoenix LiveView 的大文件流式上传方案提供了从基础到高级的完整工具链。通过合理利用 UploadWriter 接口、客户端存储和服务端状态管理,可以构建出既高效又可靠的文件上传系统。正如开发者 Uzair Aslam 在实践中所发现的,"结合 IndexDB 和 gcs-browser-upload 库,可以实现无缝的断点续传体验"。

在实际应用中,需要根据具体业务需求调整参数和策略。对于需要处理超大文件或高并发上传的场景,可能还需要结合 CDN、对象存储等外部服务。但无论如何,LiveView 提供的这套基础设施都是一个优秀的起点。


资料来源

  1. Streaming Uploads with LiveView - Fly.io
  2. Resumable Uploading of Files in Live View - DEV Community
查看归档