Rust P2P 开发实战:从 Ping 到 Gossipsub

Rust 的所有权模型和零成本抽象使其成为实现 P2P 网络协议的理想语言。libp2p 的 Rust 实现(rust-libp2p)提供了完整的协议栈,从传输层到应用层一应俱全。

环境准备

Cargo.toml 中配置依赖:

toml
1
2
3
4
5
6
7
8
9
[dependencies]
libp2p = { version = "0.53", features = [
    "tokio", "tcp", "quic", "noise", "yamux", "mplex",
    "ping", "identify", "kad", "gossipsub", "relay", "dcutr", "macros",
] }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"
futures = "0.3"
anyhow = "1.0"

创建项目:

bash
1
2
3
4
cargo new p2p-tutorial
cd p2p-tutorial
# 将上面的依赖粘贴到 Cargo.toml
cargo build  # 首次编译需要几分钟,下载和编译大量依赖

构建 P2P 节点

最简 Ping 节点

Ping 是 libp2p 的"Hello World",通过它理解节点的基本生命周期:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
use libp2p::{
    identity, ping::{Behaviour, Config as PingConfig, Event},
    swarm::{SwarmBuilder, SwarmEvent}, Multiaddr, PeerId,
};
use tokio::io::{AsyncBufReadExt, BufReader};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. 生成节点身份
    let id_keys = identity::Keypair::generate_ed25519();
    let peer_id = PeerId::from(id_keys.public());
    println!("Local Peer ID: {peer_id}");

    // 2. 构建传输层 + 安全层 + 多路复用
    let transport = libp2p::tokio_development_transport(id_keys)?;

    // 3. 定义网络行为
    let behaviour = Behaviour::new(PingConfig::default());

    // 4. 创建 Swarm(网络事件循环核心)
    let mut swarm = SwarmBuilder::with_tokio_executor(
        transport, behaviour, peer_id,
    ).build();

    // 5. 开始监听
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

    // 6. 命令行交互:输入对方地址来建立连接
    let mut stdin = BufReader::new(tokio::io::stdin()).lines();
    println!("Enter peer multiaddr to dial (or 'quit' to exit):");

    loop {
        tokio::select! {
            // 处理网络事件
            event = swarm.select_next_some() => match event {
                SwarmEvent::NewListenAddr { address, .. } => {
                    println!("Listening on {address}");
                }
                SwarmEvent::Behaviour(Event::Ping { peer, result }) => {
                    println!("Ping to {peer}: {result:?}");
                }
                _ => {}
            },
            // 处理用户输入
            line = stdin.next_line() => {
                let line = line?.expect("stdin closed");
                if line == "quit" { break; }
                if let Ok(addr) = line.parse::<Multiaddr>() {
                    swarm.dial(addr)?;
                    println!("Dialing {addr}");
                }
            }
        }
    }

    Ok(())
}

运行和测试:

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 终端 1:启动第一个节点
cargo run
# 输出:Local Peer ID: 12D3Koo...
# 输出:Listening on /ip4/127.0.0.1/tcp/12345

# 终端 2:启动第二个节点
cargo run
# 输出:Local Peer ID: 12D3Koo...
# 输出:Listening on /ip4/127.0.0.1/tcp/54321

# 在终端 2 中,输入终端 1 的监听地址:
# /ip4/127.0.0.1/tcp/12345/p2p/12D3Koo...(替换为实际 Peer ID)
# 两个节点开始互相 Ping

这个例子展示了 libp2p 节点的四个核心组件:身份生成传输层构建行为定义事件循环tokio::select! 同时监听网络事件和用户输入,实现了交互式的节点管理。

组合多种协议

实际 P2P 应用通常需要同时运行多种协议。libp2p 的 NetworkBehaviour 派生宏允许将多个协议组合为一个:

rust
1
2
3
4
5
6
7
8
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
    ping: ping::Behaviour,
    identify: identify::Behaviour,
    kademlia: kad::Behaviour<kad::store::MemoryStore>,
    gossipsub: gossipsub::Behaviour,
}

每个子协议的事件通过 From trait 转换为统一的事件类型,使得事件处理集中在一个 match 中。

Kademlia DHT 完整实现

下面展示一个包含 Put/Get 操作的完整 DHT 节点:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use libp2p::{identity, kad, swarm::{SwarmBuilder, SwarmEvent}, PeerId};
use std::time::Duration;

async fn run_dht_node() -> anyhow::Result<()> {
    let id_keys = identity::Keypair::generate_ed25519();
    let peer_id = PeerId::from(id_keys.public());
    let transport = libp2p::tokio_development_transport(id_keys.clone())?;

    let store = kad::store::MemoryStore::new(peer_id);
    let mut kademlia = kad::Behaviour::with_config(
        peer_id, store,
        kad::Config::default()
            .set_query_timeout(Duration::from_secs(30))
            .set_replication_interval(Some(Duration::from_secs(3600))),
    );

    // 添加 IPFS 引导节点
    let bootstrap: Vec<(PeerId, libp2p::Multiaddr)> = vec![
        ("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse()?,
         "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse()?),
    ];
    for (peer, addr) in bootstrap {
        kademlia.add_address(&peer, addr);
    }
    kademlia.bootstrap()?;

    let mut swarm = SwarmBuilder::with_tokio_executor(
        transport, kademlia, peer_id,
    ).build();
    swarm.listen_on("/ip4/0.0.0.0/tcp/4001".parse()?)?;

    // 等待引导完成
    tokio::time::sleep(Duration::from_secs(5)).await;

    // ===== Put 操作:存储数据 =====
    let key = "my-app/greeting".as_bytes().to_vec();
    let value = b"Hello from Rust DHT!".to_vec();
    swarm.behaviour_mut().put_record(
        kad::Record::new(key.clone(), value),
        kad::Quorum::Majority,  // 需要多数节点确认
    )?;
    println!("Put: stored key 'my-app/greeting'");

    // 等待存储完成
    tokio::time::sleep(Duration::from_secs(2)).await;

    // ===== Get 操作:读取数据 =====
    swarm.behaviour_mut().get_record(&key.into());
    println!("Get: requesting key 'my-app/greeting'");

    // 事件循环:处理 DHT 响应
    loop {
        match swarm.select_next_some().await {
            SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
                result: kad::QueryResult::GetRecord(Ok(ok)),
                ..
            }) => {
                for record in ok.records {
                    println!("Got value: {}",
                        String::from_utf8_lossy(&record.record.value));
                }
                break;  // 查到后退出
            }
            SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
                result: kad::QueryResult::PutRecord(_),
                ..
            }) => {
                println!("Put succeeded");
            }
            _ => {}
        }
    }
    Ok(())
}

关键 API 说明:

  • put_record():将键值对存储到 DHT 中距离 key 最近的 K 个节点
  • get_record():从 DHT 查找 key 对应的值
  • Quorum::Majority:要求多数节点确认写入才算成功
  • OutboundQueryProgressed:DHT 查询进度事件,包含成功或失败的结果

Gossipsub 发布订阅

Gossipsub 适用于需要广播消息的场景。以下是完整的发布和订阅示例

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use libp2p::gossipsub::{self, MessageAuthenticity, MessageId, Sha256Topic};
use sha2::{Digest, Sha256};

fn create_gossipsub(id_keys: &identity::Keypair) -> anyhow::Result<gossipsub::Behaviour> {
    let message_id_fn = |message: &gossipsub::Message| {
        let mut hasher = Sha256::new();
        hasher.update(&message.data);
        MessageId::from(hasher.finalize().as_slice())
    };

    let config = gossipsub::ConfigBuilder::default()
        .heartbeat_interval(Duration::from_secs(1))
        .validation_mode(gossipsub::ValidationMode::Strict)
        .message_id_fn(message_id_fn)
        .build()?;

    Ok(gossipsub::Behaviour::new(
        MessageAuthenticity::Signed(id_keys.clone()),
        config,
    )?)
}

// 完整的发布 + 订阅使用示例
async fn pubsub_example(
    swarm: &mut Swarm<MyBehaviour>,
) -> anyhow::Result<()> {
    let topic = Sha256Topic::new("p2p-tutorial/chat");

    // 订阅主题
    swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
    println!("Subscribed to topic: p2p-tutorial/chat");

    // 发布消息
    swarm.behaviour_mut().gossipsub.publish(
        topic.clone(),
        b"Hello P2P World!".to_vec(),
    )?;
    println!("Published: Hello P2P World!");

    // 接收消息(在主事件循环中处理)
    // match event {
    //     SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(
    //         gossipsub::Event::Message {
    //             propagation_source,
    //             message_id,
    //             message,
    //         }
    //     )) => {
    //         println!("Received: {} from {}",
    //             String::from_utf8_lossy(&message.data),
    //             propagation_source);
    //     }
    // }
    Ok(())
}

NAT 穿透(DCUtR)

当节点位于 NAT 后面时,需要使用中继和打洞技术:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#[derive(NetworkBehaviour)]
struct HolePunchBehaviour {
    relay_client: relay::client::Behaviour,
    dcutr: dcutr::Behaviour,
}

async fn setup_hole_punching(
    swarm: &mut Swarm<HolePunchBehaviour>,
    relay_peer_id: PeerId,
    relay_addr: Multiaddr,
) -> anyhow::Result<()> {
    // 通过中继节点建立监听
    let relay_addr = relay_addr
        .with(libp2p::multiaddr::Protocol::P2p(relay_peer_id))
        .with(libp2p::multiaddr::Protocol::P2pCircuit);
    swarm.listen_on(relay_addr)?;
    // 此时其他节点可以通过中继连接到本节点
    // DCUtR 会自动尝试直接打洞,成功后升级为直连
    Ok(())
}

节点生命周期

mermaid
flowchart TD
    A["生成 Keypair<br/>identity::Keypair::generate_ed25519()"] --> B["创建 Transport<br/>传输+安全+多路复用"]
    B --> C["定义 Behaviour<br/>组合协议行为"]
    C --> D["构建 Swarm<br/>事件循环核心"]
    D --> E["开始监听<br/>listen_on()"]
    E --> F{"事件循环<br/>tokio::select!"}
    F -->|"新连接"| G["处理 SwarmEvent<br/>协议分发"]
    F -->|"用户输入"| H["dial / publish<br/>发起连接"]
    G --> F
    H --> F

常见编译问题

问题解决方案
feature not found检查 Cargo.toml 的 features 列表,确保所需协议已启用
unresolved import确认 libp2p 版本 ≥ 0.53,API 在不同版本间有变化
trait bound not satisfied确保 NetworkBehaviourout_event 和各 From 实现完整

参考资料