Hands-On: Building a Distributed File Sharing System

Combining theory with practice, we’ll build a real distributed file sharing system. This system will leverage technologies introduced in previous articles — Kademlia DHT for node discovery and metadata distribution, Gossipsub for broadcasting, and a custom file transfer protocol.

System Architecture

mermaid
flowchart TD
    subgraph Application
        CLI["CLI Interface"]
        API["REST API"]
    end

    subgraph Business Logic
        Index["File Index Manager"]
        Scheduler["Piece Download Scheduler"]
        Verify["Verify & Reassemble"]
    end

    subgraph P2P Network
        Discovery["Kad-DHT<br/>Peer Discovery + Metadata"]
        Broadcast["GossipSub<br/>Message Broadcast"]
        Transfer["Custom Protocol<br/>File Transfer"]
    end

    CLI --> Index
    API --> Index
    Index --> Scheduler
    Scheduler --> Verify
    Index --> Discovery
    Scheduler --> Transfer
    Broadcast -->|"New Peer Notification"| Index

Core design principles:

  • Layered abstraction: Business logic strictly separated from P2P network layer
  • Modularity: Each component independently testable
  • Fault tolerance: Node failures don’t affect overall system availability

Metadata Distribution

How does a downloader know which Pieces a file has and their hashes? This is done through DHT-based FileMetadata distribution:

mermaid
flowchart LR
    S["Seeder Node"] -->|"1. Chunk and hash file"| FM["FileMetadata<br/>{filename, piece_size,<br/> piece_hashes[]}"]
    FM -->|"2. hash(fileID) as DHT key<br/>store on K closest nodes"| DHT["Kademlia DHT"]
    D["Downloader Node"] -->|"3. Query DHT with fileID"| DHT
    DHT -->|"4. Return FileMetadata"| D
    D -->|"5. Start per-Piece download"| S

The Seeder stores file metadata (including each Piece’s SHA-256 hash) in the DHT keyed by fileID. The Downloader only needs to know fileID to retrieve complete metadata and start downloading. fileID is typically the hash of file content, so identical files always map to the same ID.

Rust Core Module Implementation

Piece State Management

First, define the Piece status enum and thread-safe shared state:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Debug, Clone, PartialEq)]
enum PieceStatus {
    Missing,     // Not yet downloaded
    Downloading, // Currently downloading from a peer
    Complete,    // Downloaded and verified
}

/// Thread-safe download scheduler
/// Arc<RwLock<>> allows multiple async tasks to safely read/write concurrently
struct DownloadScheduler {
    metadata: FileMetadata,
    piece_status: Arc<RwLock<Vec<PieceStatus>>>,
    peer_pieces: Arc<RwLock<HashMap<PeerId, HashSet<usize>>>>,
    output_path: String,
}

Why Arc<RwLock<>>? In async P2P programs, multiple Pieces may download simultaneously from different peers, each as an independent async task. These tasks need to concurrently update piece_status (marking Pieces as Downloading/Complete). RwLock allows multiple tasks to read state simultaneously (non-blocking) while exclusive access during writes prevents data races. Arc provides shared ownership across tasks.

File Chunking and Verification

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use sha2::{Sha256, Digest};
use std::fs::File;
use std::io::{Read, Write, Seek, SeekFrom};

const PIECE_SIZE: usize = 256 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct FileMetadata {
    filename: String,
    total_size: u64,
    piece_size: usize,
    piece_hashes: Vec<String>,
}

impl FileMetadata {
    fn from_file(path: &str) -> anyhow::Result<Self> {
        let mut file = File::open(path)?;
        let total_size = file.metadata()?.len();
        let mut piece_hashes = Vec::new();
        let mut buffer = vec![0u8; PIECE_SIZE];
        loop {
            let n = file.read(&mut buffer)?;
            if n == 0 { break; }
            let mut hasher = Sha256::new();
            hasher.update(&buffer[..n]);
            piece_hashes.push(hex::encode(hasher.finalize()));
        }
        Ok(Self { filename: path.to_string(), total_size, piece_size: PIECE_SIZE, piece_hashes })
    }

    fn verify_piece(&self, index: usize, data: &[u8]) -> bool {
        if index >= self.piece_hashes.len() { return false; }
        let mut hasher = Sha256::new();
        hasher.update(data);
        hex::encode(hasher.finalize()) == self.piece_hashes[index]
    }
}

Rarest First Download Scheduling

mermaid
flowchart TD
    A["Peer has Piece set"] --> B["Count replicas of<br/>each Piece globally"]
    B --> C{"Select rarest<br/>missing Piece"}
    C --> D["Request Piece from<br/>a peer that has it"]
    D --> E{"Download OK?"}
    E -->|"Yes"| F["Verify SHA-256"]
    E -->|"No"| G["Mark peer unavailable<br/>Pick another"]
    F -->|"Passed"| H["Mark Complete<br/>Notify peers"]
    F -->|"Failed"| I["Re-request Piece"]
    H --> J{"All Pieces<br/>Complete?"}
    J -->|"No"| A
    J -->|"Yes"| K["Reassemble file"]
rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
impl DownloadScheduler {
    fn new(metadata: FileMetadata, output_path: String) -> Self {
        let piece_count = metadata.piece_hashes.len();
        Self {
            metadata,
            piece_status: Arc::new(RwLock::new(vec![PieceStatus::Missing; piece_count])),
            peer_pieces: Arc::new(RwLock::new(HashMap::new())),
            output_path,
        }
    }

    async fn select_next_piece(&self, peer: PeerId) -> Option<usize> {
        let status = self.piece_status.read().await;
        let peer_map = self.peer_pieces.read().await;
        let peer_has = peer_map.get(&peer)?;

        let mut rarest: Vec<(usize, usize)> = status
            .iter().enumerate()
            .filter(|(i, s)| *s == PieceStatus::Missing && peer_has.contains(i))
            .map(|(i, _)| {
                let count = peer_map.values()
                    .filter(|pieces| pieces.contains(&i)).count();
                (i, count)
            }).collect();
        rarest.sort_by_key(|&(_, count)| count);
        rarest.first().map(|&(i, _)| i)
    }

    async fn write_piece(&self, index: usize, data: &[u8]) -> anyhow::Result<()> {
        if !self.metadata.verify_piece(index, data) {
            anyhow::bail!("Piece {} hash verification failed", index);
        }

        let mut file = OpenOptions::new()
            .write(true).create(true).open(&self.output_path)?;
        let offset = index as u64 * self.metadata.piece_size as u64;
        file.seek(SeekFrom::Start(offset))?;
        file.write_all(data)?;

        let mut status = self.piece_status.write().await;
        status[index] = PieceStatus::Complete;
        Ok(())
    }

    async fn is_complete(&self) -> bool {
        let status = self.piece_status.read().await;
        status.iter().all(|s| *s == PieceStatus::Complete)
    }
}

Seeder Side: Serving Piece Requests

The seeder node must listen for download requests and return data by Piece index:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async fn handle_piece_request(
    metadata: &FileMetadata,
    file_path: &str,
    piece_index: usize,
) -> anyhow::Result<Vec<u8>> {
    if piece_index >= metadata.piece_hashes.len() {
        anyhow::bail!("Invalid piece index");
    }

    let mut file = File::open(file_path)?;
    let offset = piece_index as u64 * metadata.piece_size as u64;
    file.seek(SeekFrom::Start(offset))?;

    let remaining = metadata.total_size - offset;
    let read_size = std::cmp::min(metadata.piece_size as u64, remaining) as usize;
    let mut data = vec![0u8; read_size];
    file.read_exact(&mut data)?;

    if !metadata.verify_piece(piece_index, &data) {
        anyhow::bail!("Local file corrupted at piece {}", piece_index);
    }
    Ok(data)
}

Running and Testing

Start a seeder node:

bash
1
2
3
4
cargo run -- serve --file ./ubuntu-22.04.iso
# Output: File ID: QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
# Output: Metadata published to DHT
# Output: Listening for piece requests...

Start a downloader node:

bash
1
2
3
cargo run -- download \
    --peer /ip4/127.0.0.1/tcp/4001/p2p/Qm... \
    --file-id QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N

Download Flow

mermaid
sequenceDiagram
    participant D as Downloader
    participant T as DHT
    participant S as Seeder

    D->>T: Query fileID metadata
    T-->>D: Return FileMetadata (piece_hashes[])
    D->>S: Request Piece 0 (rarest first)
    S-->>D: Send Piece 0 data
    Note over D: Verify SHA-256 → Mark Complete
    D->>S: Request Piece 1
    S-->>D: Send Piece 1 data
    Note over D: All Pieces Complete
    Note over D: Reassemble in order

References