理论结合实践,我们来构建一个真正的分布式文件共享系统。这个系统将使用前面各篇文章介绍的技术——Kademlia DHT 用于节点发现和元数据分发,Gossipsub 用于广播,以及一个自定义的文件传输协议。
系统架构设计
flowchart TD
subgraph 应用层
CLI["CLI 界面"]
API["REST API"]
end
subgraph 业务逻辑层
Index["文件索引管理"]
Scheduler["分块下载调度"]
Verify["校验与重组"]
end
subgraph P2P 网络层
Discovery["Kad-DHT<br/>节点发现 + 元数据分发"]
Broadcast["GossipSub<br/>消息广播"]
Transfer["自定义协议<br/>文件传输"]
end
CLI --> Index
API --> Index
Index --> Scheduler
Scheduler --> Verify
Index --> Discovery
Scheduler --> Transfer
Broadcast -->|"新节点通知"| Index
核心设计原则:
- 分层抽象:业务逻辑与 P2P 网络层严格分离
- 模块化:每个组件独立可测试
- 容错设计:节点故障不影响整体系统可用性
文件元数据分发
下载者如何知道文件被分成了哪些 Piece、每个 Piece 的哈希是什么?这通过 DHT 分发 FileMetadata 实现:
flowchart LR
S["Seeder 节点"] -->|"1. 分块并计算哈希"| FM["FileMetadata<br/>{filename, piece_size,<br/> piece_hashes[]}"]
FM -->|"2. hash(fileID) 作为 DHT key<br/>存入 K 个最近节点"| DHT["Kademlia DHT"]
D["Downloader 节点"] -->|"3. 用 fileID 查询 DHT"| DHT
DHT -->|"4. 返回 FileMetadata"| D
D -->|"5. 开始按 Piece 下载"| S
Seeder 将文件的元数据(包含每个 Piece 的 SHA-256 哈希)以 fileID 为键存储到 DHT。Downloader 只要知道 fileID,就能从 DHT 获取完整的元数据,然后开始下载。fileID 通常取文件内容的哈希,这样相同内容的文件总是映射到相同的 ID。
Rust 核心模块实现
Piece 状态管理
首先定义 Piece 的状态枚举和线程安全的共享状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
/// Piece 下载状态
#[derive(Debug, Clone, PartialEq)]
enum PieceStatus {
Missing, // 尚未开始下载
Downloading, // 正在从某个节点下载
Complete, // 下载完成且校验通过
}
/// 线程安全的下载调度器
/// Arc<RwLock<>> 允许多个异步任务安全地并发读写
struct DownloadScheduler {
metadata: FileMetadata,
piece_status: Arc<RwLock<Vec<PieceStatus>>>,
peer_pieces: Arc<RwLock<HashMap<PeerId, HashSet<usize>>>>,
output_path: String,
}
|
为什么用 Arc<RwLock<>>? 在异步 P2P 程序中,多个 Piece 可能同时从不同节点下载,每个下载任务是一个独立的异步任务。这些任务需要并发地更新 piece_status(标记某个 Piece 为 Downloading/Complete)。RwLock 允许多个任务同时读取状态(不阻塞),但写入时独占访问,避免数据竞争。Arc 提供跨任务的共享所有权。
文件分块与校验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| use sha2::{Sha256, Digest};
use std::fs::File;
use std::io::{Read, Write, Seek, SeekFrom};
const PIECE_SIZE: usize = 256 * 1024; // 256KB
#[derive(Debug, Clone, Serialize, Deserialize)]
struct FileMetadata {
filename: String,
total_size: u64,
piece_size: usize,
piece_hashes: Vec<String>,
}
impl FileMetadata {
fn from_file(path: &str) -> anyhow::Result<Self> {
let mut file = File::open(path)?;
let total_size = file.metadata()?.len();
let mut piece_hashes = Vec::new();
let mut buffer = vec![0u8; PIECE_SIZE];
loop {
let n = file.read(&mut buffer)?;
if n == 0 { break; }
let mut hasher = Sha256::new();
hasher.update(&buffer[..n]);
piece_hashes.push(hex::encode(hasher.finalize()));
}
Ok(Self {
filename: path.to_string(),
total_size,
piece_size: PIECE_SIZE,
piece_hashes,
})
}
fn verify_piece(&self, index: usize, data: &[u8]) -> bool {
if index >= self.piece_hashes.len() { return false; }
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize()) == self.piece_hashes[index]
}
}
|
稀有优先下载调度
BitTorrent 中的稀有优先(Rarest First)算法——优先下载整个网络中副本最少的 Piece,最大化数据可用性:
flowchart TD
A["某节点拥有 Piece 集合"] --> B["统计每个 Piece 的<br/>全网副本数量"]
B --> C{"选择副本最少<br/>且本节点缺失的 Piece"}
C --> D["向拥有该 Piece 的<br/>节点发起下载请求"]
D --> E{"下载成功?"}
E -->|"是"| F["校验 SHA-256"]
E -->|"否"| G["标记节点不可用<br/>选择其他节点"]
F -->|"通过"| H["标记 Complete<br/>通知其他节点"]
F -->|"失败"| I["重新请求该 Piece"]
H --> J{"所有 Piece<br/>Complete?"}
J -->|"否"| A
J -->|"是"| K["重组文件"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| impl DownloadScheduler {
fn new(metadata: FileMetadata, output_path: String) -> Self {
let piece_count = metadata.piece_hashes.len();
Self {
metadata,
piece_status: Arc::new(RwLock::new(vec![PieceStatus::Missing; piece_count])),
peer_pieces: Arc::new(RwLock::new(HashMap::new())),
output_path,
}
}
/// 稀有优先选择算法
async fn select_next_piece(&self, peer: PeerId) -> Option<usize> {
let status = self.piece_status.read().await;
let peer_map = self.peer_pieces.read().await;
let peer_has = peer_map.get(&peer)?;
let mut rarest: Vec<(usize, usize)> = status
.iter().enumerate()
.filter(|(i, s)| *s == PieceStatus::Missing && peer_has.contains(i))
.map(|(i, _)| {
let count = peer_map.values()
.filter(|pieces| pieces.contains(&i)).count();
(i, count)
}).collect();
rarest.sort_by_key(|&(_, count)| count);
rarest.first().map(|&(i, _)| i)
}
/// 安全地写入 Piece(使用写锁保护文件操作)
async fn write_piece(&self, index: usize, data: &[u8]) -> anyhow::Result<()> {
// 先校验哈希
if !self.metadata.verify_piece(index, data) {
anyhow::bail!("Piece {} hash verification failed", index);
}
// 写入文件(文件操作本身是同步的,但放在 async 上下文中)
let mut file = OpenOptions::new()
.write(true).create(true).open(&self.output_path)?;
let offset = index as u64 * self.metadata.piece_size as u64;
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
// 更新状态为 Complete
let mut status = self.piece_status.write().await;
status[index] = PieceStatus::Complete;
Ok(())
}
async fn is_complete(&self) -> bool {
let status = self.piece_status.read().await;
status.iter().all(|s| *s == PieceStatus::Complete)
}
}
|
Seeder 端:响应 Piece 请求
Seeder 节点需要监听下载请求,按 Piece 索引返回数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| /// Seeder 端的 Piece 服务处理器
async fn handle_piece_request(
metadata: &FileMetadata,
file_path: &str,
piece_index: usize,
) -> anyhow::Result<Vec<u8>> {
if piece_index >= metadata.piece_hashes.len() {
anyhow::bail!("Invalid piece index");
}
let mut file = File::open(file_path)?;
let offset = piece_index as u64 * metadata.piece_size as u64;
file.seek(SeekFrom::Start(offset))?;
// 最后一个 Piece 可能不满 piece_size
let remaining = metadata.total_size - offset;
let read_size = std::cmp::min(metadata.piece_size as u64, remaining) as usize;
let mut data = vec![0u8; read_size];
file.read_exact(&mut data)?;
// 校验后返回
if !metadata.verify_piece(piece_index, &data) {
anyhow::bail!("Local file corrupted at piece {}", piece_index);
}
Ok(data)
}
|
运行与测试
启动提供者节点(种子节点):
1
2
3
4
5
| # Seeder:分块文件,计算哈希,将元数据存入 DHT
cargo run -- serve --file ./ubuntu-22.04.iso
# 输出:File ID: QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
# 输出:Metadata published to DHT
# 输出:Listening for piece requests...
|
启动下载者节点:
1
2
3
4
5
6
7
8
| cargo run -- download \
--peer /ip4/127.0.0.1/tcp/4001/p2p/Qm... \
--file-id QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
# 输出:Fetching metadata from DHT...
# 输出:Got metadata: 1024 pieces, 256KB each
# 输出:Downloading piece 0/1024 (rarest first)...
# 输出:Verifying piece 0... OK
# 输出:All pieces downloaded. Reassembling...
|
下载流程
sequenceDiagram
participant D as 下载者
participant T as DHT
participant S as 种子节点
D->>T: 查找 fileID 对应的元数据
T-->>D: 返回 FileMetadata (piece_hashes[])
D->>S: 根据稀有优先算法请求 Piece 0
S-->>D: 发送 Piece 0 数据
Note over D: 校验 SHA-256 哈希 → 标记 Complete
D->>S: 请求 Piece 1
S-->>D: 发送 Piece 1 数据
Note over D: 所有 Piece Complete
Note over D: 按序重组为完整文件
参考资料