Rust 的所有权模型和零成本抽象使其成为实现 P2P 网络协议的理想语言。libp2p 的 Rust 实现(rust-libp2p)提供了完整的协议栈,从传输层到应用层一应俱全。
环境准备
在 Cargo.toml 中配置依赖:
1
2
3
4
5
6
7
8
9
| [dependencies]
libp2p = { version = "0.53", features = [
"tokio", "tcp", "quic", "noise", "yamux", "mplex",
"ping", "identify", "kad", "gossipsub", "relay", "dcutr", "macros",
] }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"
futures = "0.3"
anyhow = "1.0"
|
创建项目:
1
2
3
4
| cargo new p2p-tutorial
cd p2p-tutorial
# 将上面的依赖粘贴到 Cargo.toml
cargo build # 首次编译需要几分钟,下载和编译大量依赖
|
构建 P2P 节点
最简 Ping 节点
Ping 是 libp2p 的"Hello World",通过它理解节点的基本生命周期:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| use libp2p::{
identity, ping::{Behaviour, Config as PingConfig, Event},
swarm::{SwarmBuilder, SwarmEvent}, Multiaddr, PeerId,
};
use tokio::io::{AsyncBufReadExt, BufReader};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. 生成节点身份
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
println!("Local Peer ID: {peer_id}");
// 2. 构建传输层 + 安全层 + 多路复用
let transport = libp2p::tokio_development_transport(id_keys)?;
// 3. 定义网络行为
let behaviour = Behaviour::new(PingConfig::default());
// 4. 创建 Swarm(网络事件循环核心)
let mut swarm = SwarmBuilder::with_tokio_executor(
transport, behaviour, peer_id,
).build();
// 5. 开始监听
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// 6. 命令行交互:输入对方地址来建立连接
let mut stdin = BufReader::new(tokio::io::stdin()).lines();
println!("Enter peer multiaddr to dial (or 'quit' to exit):");
loop {
tokio::select! {
// 处理网络事件
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {address}");
}
SwarmEvent::Behaviour(Event::Ping { peer, result }) => {
println!("Ping to {peer}: {result:?}");
}
_ => {}
},
// 处理用户输入
line = stdin.next_line() => {
let line = line?.expect("stdin closed");
if line == "quit" { break; }
if let Ok(addr) = line.parse::<Multiaddr>() {
swarm.dial(addr)?;
println!("Dialing {addr}");
}
}
}
}
Ok(())
}
|
运行和测试:
1
2
3
4
5
6
7
8
9
10
11
12
13
| # 终端 1:启动第一个节点
cargo run
# 输出:Local Peer ID: 12D3Koo...
# 输出:Listening on /ip4/127.0.0.1/tcp/12345
# 终端 2:启动第二个节点
cargo run
# 输出:Local Peer ID: 12D3Koo...
# 输出:Listening on /ip4/127.0.0.1/tcp/54321
# 在终端 2 中,输入终端 1 的监听地址:
# /ip4/127.0.0.1/tcp/12345/p2p/12D3Koo...(替换为实际 Peer ID)
# 两个节点开始互相 Ping
|
这个例子展示了 libp2p 节点的四个核心组件:身份生成、传输层构建、行为定义和事件循环。tokio::select! 同时监听网络事件和用户输入,实现了交互式的节点管理。
组合多种协议
实际 P2P 应用通常需要同时运行多种协议。libp2p 的 NetworkBehaviour 派生宏允许将多个协议组合为一个:
1
2
3
4
5
6
7
8
| #[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
ping: ping::Behaviour,
identify: identify::Behaviour,
kademlia: kad::Behaviour<kad::store::MemoryStore>,
gossipsub: gossipsub::Behaviour,
}
|
每个子协议的事件通过 From trait 转换为统一的事件类型,使得事件处理集中在一个 match 中。
Kademlia DHT 完整实现
下面展示一个包含 Put/Get 操作的完整 DHT 节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| use libp2p::{identity, kad, swarm::{SwarmBuilder, SwarmEvent}, PeerId};
use std::time::Duration;
async fn run_dht_node() -> anyhow::Result<()> {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
let transport = libp2p::tokio_development_transport(id_keys.clone())?;
let store = kad::store::MemoryStore::new(peer_id);
let mut kademlia = kad::Behaviour::with_config(
peer_id, store,
kad::Config::default()
.set_query_timeout(Duration::from_secs(30))
.set_replication_interval(Some(Duration::from_secs(3600))),
);
// 添加 IPFS 引导节点
let bootstrap: Vec<(PeerId, libp2p::Multiaddr)> = vec![
("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse()?,
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse()?),
];
for (peer, addr) in bootstrap {
kademlia.add_address(&peer, addr);
}
kademlia.bootstrap()?;
let mut swarm = SwarmBuilder::with_tokio_executor(
transport, kademlia, peer_id,
).build();
swarm.listen_on("/ip4/0.0.0.0/tcp/4001".parse()?)?;
// 等待引导完成
tokio::time::sleep(Duration::from_secs(5)).await;
// ===== Put 操作:存储数据 =====
let key = "my-app/greeting".as_bytes().to_vec();
let value = b"Hello from Rust DHT!".to_vec();
swarm.behaviour_mut().put_record(
kad::Record::new(key.clone(), value),
kad::Quorum::Majority, // 需要多数节点确认
)?;
println!("Put: stored key 'my-app/greeting'");
// 等待存储完成
tokio::time::sleep(Duration::from_secs(2)).await;
// ===== Get 操作:读取数据 =====
swarm.behaviour_mut().get_record(&key.into());
println!("Get: requesting key 'my-app/greeting'");
// 事件循环:处理 DHT 响应
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
result: kad::QueryResult::GetRecord(Ok(ok)),
..
}) => {
for record in ok.records {
println!("Got value: {}",
String::from_utf8_lossy(&record.record.value));
}
break; // 查到后退出
}
SwarmEvent::Behaviour(kad::Event::OutboundQueryProgressed {
result: kad::QueryResult::PutRecord(_),
..
}) => {
println!("Put succeeded");
}
_ => {}
}
}
Ok(())
}
|
关键 API 说明:
put_record():将键值对存储到 DHT 中距离 key 最近的 K 个节点get_record():从 DHT 查找 key 对应的值Quorum::Majority:要求多数节点确认写入才算成功OutboundQueryProgressed:DHT 查询进度事件,包含成功或失败的结果
Gossipsub 发布订阅
Gossipsub 适用于需要广播消息的场景。以下是完整的发布和订阅示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| use libp2p::gossipsub::{self, MessageAuthenticity, MessageId, Sha256Topic};
use sha2::{Digest, Sha256};
fn create_gossipsub(id_keys: &identity::Keypair) -> anyhow::Result<gossipsub::Behaviour> {
let message_id_fn = |message: &gossipsub::Message| {
let mut hasher = Sha256::new();
hasher.update(&message.data);
MessageId::from(hasher.finalize().as_slice())
};
let config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(1))
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(message_id_fn)
.build()?;
Ok(gossipsub::Behaviour::new(
MessageAuthenticity::Signed(id_keys.clone()),
config,
)?)
}
// 完整的发布 + 订阅使用示例
async fn pubsub_example(
swarm: &mut Swarm<MyBehaviour>,
) -> anyhow::Result<()> {
let topic = Sha256Topic::new("p2p-tutorial/chat");
// 订阅主题
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
println!("Subscribed to topic: p2p-tutorial/chat");
// 发布消息
swarm.behaviour_mut().gossipsub.publish(
topic.clone(),
b"Hello P2P World!".to_vec(),
)?;
println!("Published: Hello P2P World!");
// 接收消息(在主事件循环中处理)
// match event {
// SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(
// gossipsub::Event::Message {
// propagation_source,
// message_id,
// message,
// }
// )) => {
// println!("Received: {} from {}",
// String::from_utf8_lossy(&message.data),
// propagation_source);
// }
// }
Ok(())
}
|
NAT 穿透(DCUtR)
当节点位于 NAT 后面时,需要使用中继和打洞技术:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| #[derive(NetworkBehaviour)]
struct HolePunchBehaviour {
relay_client: relay::client::Behaviour,
dcutr: dcutr::Behaviour,
}
async fn setup_hole_punching(
swarm: &mut Swarm<HolePunchBehaviour>,
relay_peer_id: PeerId,
relay_addr: Multiaddr,
) -> anyhow::Result<()> {
// 通过中继节点建立监听
let relay_addr = relay_addr
.with(libp2p::multiaddr::Protocol::P2p(relay_peer_id))
.with(libp2p::multiaddr::Protocol::P2pCircuit);
swarm.listen_on(relay_addr)?;
// 此时其他节点可以通过中继连接到本节点
// DCUtR 会自动尝试直接打洞,成功后升级为直连
Ok(())
}
|
节点生命周期
flowchart TD
A["生成 Keypair<br/>identity::Keypair::generate_ed25519()"] --> B["创建 Transport<br/>传输+安全+多路复用"]
B --> C["定义 Behaviour<br/>组合协议行为"]
C --> D["构建 Swarm<br/>事件循环核心"]
D --> E["开始监听<br/>listen_on()"]
E --> F{"事件循环<br/>tokio::select!"}
F -->|"新连接"| G["处理 SwarmEvent<br/>协议分发"]
F -->|"用户输入"| H["dial / publish<br/>发起连接"]
G --> F
H --> F
常见编译问题
| 问题 | 解决方案 |
|---|
feature not found | 检查 Cargo.toml 的 features 列表,确保所需协议已启用 |
unresolved import | 确认 libp2p 版本 ≥ 0.53,API 在不同版本间有变化 |
trait bound not satisfied | 确保 NetworkBehaviour 的 out_event 和各 From 实现完整 |
参考资料