实战:构建分布式文件共享系统

理论结合实践,我们来构建一个真正的分布式文件共享系统。这个系统将使用前面各篇文章介绍的技术——Kademlia DHT 用于节点发现和元数据分发,Gossipsub 用于广播,以及一个自定义的文件传输协议。

系统架构设计

mermaid
flowchart TD
    subgraph 应用层
        CLI["CLI 界面"]
        API["REST API"]
    end

    subgraph 业务逻辑层
        Index["文件索引管理"]
        Scheduler["分块下载调度"]
        Verify["校验与重组"]
    end

    subgraph P2P 网络层
        Discovery["Kad-DHT<br/>节点发现 + 元数据分发"]
        Broadcast["GossipSub<br/>消息广播"]
        Transfer["自定义协议<br/>文件传输"]
    end

    CLI --> Index
    API --> Index
    Index --> Scheduler
    Scheduler --> Verify
    Index --> Discovery
    Scheduler --> Transfer
    Broadcast -->|"新节点通知"| Index

核心设计原则:

  • 分层抽象:业务逻辑与 P2P 网络层严格分离
  • 模块化:每个组件独立可测试
  • 容错设计:节点故障不影响整体系统可用性

文件元数据分发

下载者如何知道文件被分成了哪些 Piece、每个 Piece 的哈希是什么?这通过 DHT 分发 FileMetadata 实现:

mermaid
flowchart LR
    S["Seeder 节点"] -->|"1. 分块并计算哈希"| FM["FileMetadata<br/>{filename, piece_size,<br/> piece_hashes[]}"]
    FM -->|"2. hash(fileID) 作为 DHT key<br/>存入 K 个最近节点"| DHT["Kademlia DHT"]
    D["Downloader 节点"] -->|"3. 用 fileID 查询 DHT"| DHT
    DHT -->|"4. 返回 FileMetadata"| D
    D -->|"5. 开始按 Piece 下载"| S

Seeder 将文件的元数据(包含每个 Piece 的 SHA-256 哈希)以 fileID 为键存储到 DHT。Downloader 只要知道 fileID,就能从 DHT 获取完整的元数据,然后开始下载。fileID 通常取文件内容的哈希,这样相同内容的文件总是映射到相同的 ID。

Rust 核心模块实现

Piece 状态管理

首先定义 Piece 的状态枚举和线程安全的共享状态:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;

/// Piece 下载状态
#[derive(Debug, Clone, PartialEq)]
enum PieceStatus {
    Missing,     // 尚未开始下载
    Downloading, // 正在从某个节点下载
    Complete,    // 下载完成且校验通过
}

/// 线程安全的下载调度器
/// Arc<RwLock<>> 允许多个异步任务安全地并发读写
struct DownloadScheduler {
    metadata: FileMetadata,
    piece_status: Arc<RwLock<Vec<PieceStatus>>>,
    peer_pieces: Arc<RwLock<HashMap<PeerId, HashSet<usize>>>>,
    output_path: String,
}

为什么用 Arc<RwLock<>> 在异步 P2P 程序中,多个 Piece 可能同时从不同节点下载,每个下载任务是一个独立的异步任务。这些任务需要并发地更新 piece_status(标记某个 Piece 为 Downloading/Complete)。RwLock 允许多个任务同时读取状态(不阻塞),但写入时独占访问,避免数据竞争。Arc 提供跨任务的共享所有权。

文件分块与校验

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use sha2::{Sha256, Digest};
use std::fs::File;
use std::io::{Read, Write, Seek, SeekFrom};

const PIECE_SIZE: usize = 256 * 1024; // 256KB

#[derive(Debug, Clone, Serialize, Deserialize)]
struct FileMetadata {
    filename: String,
    total_size: u64,
    piece_size: usize,
    piece_hashes: Vec<String>,
}

impl FileMetadata {
    fn from_file(path: &str) -> anyhow::Result<Self> {
        let mut file = File::open(path)?;
        let total_size = file.metadata()?.len();
        let mut piece_hashes = Vec::new();
        let mut buffer = vec![0u8; PIECE_SIZE];

        loop {
            let n = file.read(&mut buffer)?;
            if n == 0 { break; }
            let mut hasher = Sha256::new();
            hasher.update(&buffer[..n]);
            piece_hashes.push(hex::encode(hasher.finalize()));
        }

        Ok(Self {
            filename: path.to_string(),
            total_size,
            piece_size: PIECE_SIZE,
            piece_hashes,
        })
    }

    fn verify_piece(&self, index: usize, data: &[u8]) -> bool {
        if index >= self.piece_hashes.len() { return false; }
        let mut hasher = Sha256::new();
        hasher.update(data);
        hex::encode(hasher.finalize()) == self.piece_hashes[index]
    }
}

稀有优先下载调度

BitTorrent 中的稀有优先(Rarest First)算法——优先下载整个网络中副本最少的 Piece,最大化数据可用性:

mermaid
flowchart TD
    A["某节点拥有 Piece 集合"] --> B["统计每个 Piece 的<br/>全网副本数量"]
    B --> C{"选择副本最少<br/>且本节点缺失的 Piece"}
    C --> D["向拥有该 Piece 的<br/>节点发起下载请求"]
    D --> E{"下载成功?"}
    E -->|"是"| F["校验 SHA-256"]
    E -->|"否"| G["标记节点不可用<br/>选择其他节点"]
    F -->|"通过"| H["标记 Complete<br/>通知其他节点"]
    F -->|"失败"| I["重新请求该 Piece"]
    H --> J{"所有 Piece<br/>Complete?"}
    J -->|"否"| A
    J -->|"是"| K["重组文件"]
rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
impl DownloadScheduler {
    fn new(metadata: FileMetadata, output_path: String) -> Self {
        let piece_count = metadata.piece_hashes.len();
        Self {
            metadata,
            piece_status: Arc::new(RwLock::new(vec![PieceStatus::Missing; piece_count])),
            peer_pieces: Arc::new(RwLock::new(HashMap::new())),
            output_path,
        }
    }

    /// 稀有优先选择算法
    async fn select_next_piece(&self, peer: PeerId) -> Option<usize> {
        let status = self.piece_status.read().await;
        let peer_map = self.peer_pieces.read().await;
        let peer_has = peer_map.get(&peer)?;

        let mut rarest: Vec<(usize, usize)> = status
            .iter().enumerate()
            .filter(|(i, s)| *s == PieceStatus::Missing && peer_has.contains(i))
            .map(|(i, _)| {
                let count = peer_map.values()
                    .filter(|pieces| pieces.contains(&i)).count();
                (i, count)
            }).collect();
        rarest.sort_by_key(|&(_, count)| count);
        rarest.first().map(|&(i, _)| i)
    }

    /// 安全地写入 Piece(使用写锁保护文件操作)
    async fn write_piece(&self, index: usize, data: &[u8]) -> anyhow::Result<()> {
        // 先校验哈希
        if !self.metadata.verify_piece(index, data) {
            anyhow::bail!("Piece {} hash verification failed", index);
        }

        // 写入文件(文件操作本身是同步的,但放在 async 上下文中)
        let mut file = OpenOptions::new()
            .write(true).create(true).open(&self.output_path)?;
        let offset = index as u64 * self.metadata.piece_size as u64;
        file.seek(SeekFrom::Start(offset))?;
        file.write_all(data)?;

        // 更新状态为 Complete
        let mut status = self.piece_status.write().await;
        status[index] = PieceStatus::Complete;
        Ok(())
    }

    async fn is_complete(&self) -> bool {
        let status = self.piece_status.read().await;
        status.iter().all(|s| *s == PieceStatus::Complete)
    }
}

Seeder 端:响应 Piece 请求

Seeder 节点需要监听下载请求,按 Piece 索引返回数据:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/// Seeder 端的 Piece 服务处理器
async fn handle_piece_request(
    metadata: &FileMetadata,
    file_path: &str,
    piece_index: usize,
) -> anyhow::Result<Vec<u8>> {
    if piece_index >= metadata.piece_hashes.len() {
        anyhow::bail!("Invalid piece index");
    }

    let mut file = File::open(file_path)?;
    let offset = piece_index as u64 * metadata.piece_size as u64;
    file.seek(SeekFrom::Start(offset))?;

    // 最后一个 Piece 可能不满 piece_size
    let remaining = metadata.total_size - offset;
    let read_size = std::cmp::min(metadata.piece_size as u64, remaining) as usize;
    let mut data = vec![0u8; read_size];
    file.read_exact(&mut data)?;

    // 校验后返回
    if !metadata.verify_piece(piece_index, &data) {
        anyhow::bail!("Local file corrupted at piece {}", piece_index);
    }
    Ok(data)
}

运行与测试

启动提供者节点(种子节点):

bash
1
2
3
4
5
# Seeder:分块文件,计算哈希,将元数据存入 DHT
cargo run -- serve --file ./ubuntu-22.04.iso
# 输出:File ID: QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
# 输出:Metadata published to DHT
# 输出:Listening for piece requests...

启动下载者节点:

bash
1
2
3
4
5
6
7
8
cargo run -- download \
    --peer /ip4/127.0.0.1/tcp/4001/p2p/Qm... \
    --file-id QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
# 输出:Fetching metadata from DHT...
# 输出:Got metadata: 1024 pieces, 256KB each
# 输出:Downloading piece 0/1024 (rarest first)...
# 输出:Verifying piece 0... OK
# 输出:All pieces downloaded. Reassembling...

下载流程

mermaid
sequenceDiagram
    participant D as 下载者
    participant T as DHT
    participant S as 种子节点

    D->>T: 查找 fileID 对应的元数据
    T-->>D: 返回 FileMetadata (piece_hashes[])
    D->>S: 根据稀有优先算法请求 Piece 0
    S-->>D: 发送 Piece 0 数据
    Note over D: 校验 SHA-256 哈希 → 标记 Complete
    D->>S: 请求 Piece 1
    S-->>D: 发送 Piece 1 数据
    Note over D: 所有 Piece Complete
    Note over D: 按序重组为完整文件

参考资料