Go 语言 P2P 开发实战

Go 语言以其简洁的并发模型和快速的编译速度,成为 P2P 网络开发的热门选择。go-libp2p 是目前功能最完整的 libp2p 实现之一,被广泛应用于 IPFS(Kubo)等大型项目。

环境准备

bash
1
2
3
go get github.com/libp2p/go-libp2p
go get github.com/libp2p/go-libp2p-kad-dht
go get github.com/libp2p/go-libp2p-pubsub

go.mod 配置:

1
2
3
4
5
6
7
8
9
module p2p-tutorial

go 1.21

require (
    github.com/libp2p/go-libp2p v0.35.0
    github.com/libp2p/go-libp2p-kad-dht v0.25.0
    github.com/libp2p/go-libp2p-pubsub v0.10.0
)

构建基础主机与节点互联

Go 的 libp2p 主机创建通过选项模式配置。以下是一个完整的 Echo 节点,包含主机创建、流处理器和节点互联

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "os/signal"
    "syscall"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 创建 libp2p 主机
    h, err := libp2p.New(
        libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
        libp2p.NATPortMap(),  // 启用 UPnP 端口映射
    )
    if err != nil {
        panic(err)
    }
    defer h.Close()

    // 打印本节点信息
    fmt.Printf("Host ID: %s\n", h.ID())
    fmt.Println("Listening addresses:")
    for _, addr := range h.Addrs() {
        fmt.Printf("  %s/p2p/%s\n", addr, h.ID())
    }

    // 设置流处理器(Echo 服务)
    h.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
        fmt.Printf("New stream from %s\n", s.Conn().RemotePeer())
        go func() {
            io.Copy(s, s)  // Echo: 把收到的数据原样返回
            s.Close()
        }()
    })

    // 如果命令行参数提供了对方地址,则建立连接
    if len(os.Args) > 1 {
        peerAddr := os.Args[1]
        fmt.Printf("Dialing %s\n", peerAddr)

        // 解析对方地址为 peer.AddrInfo
        peerInfo, err := peer.AddrInfoFromString(peerAddr)
        if err != nil {
            panic(err)
        }

        // 连接对方节点
        if err := h.Connect(ctx, *peerInfo); err != nil {
            panic(err)
        }
        fmt.Printf("Connected to %s\n", peerInfo.ID)

        // 打开一个流并测试 Echo
        s, err := h.NewStream(ctx, peerInfo.ID, "/echo/1.0.0")
        if err != nil {
            panic(err)
        }
        s.Write([]byte("Hello from Go!\n"))

        buf := make([]byte, 1024)
        n, _ := s.Read(buf)
        fmt.Printf("Echo response: %s", string(buf[:n]))
        s.Close()
    }

    // 等待中断信号
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    <-ch
    fmt.Println("Shutting down...")
}

运行和测试:

bash
1
2
3
4
5
6
7
8
9
# 终端 1:启动第一个节点
go run main.go
# 输出:Host ID: 12D3Koo...
# 输出:Listening addresses: /ip4/127.0.0.1/tcp/12345/p2p/12D3Koo...

# 终端 2:启动第二个节点并连接第一个节点
go run main.go /ip4/127.0.0.1/tcp/12345/p2p/12D3Koo...
# 输出:Connected to 12D3Koo...
# 输出:Echo response: Hello from Go!

Context 管理

Go 的 context.Context 是控制生命周期的核心机制。在 P2P 开发中:

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 创建可取消的 context(用于整个应用生命周期)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 创建带超时的 context(用于单次操作)
dialCtx, dialCancel := context.WithTimeout(ctx, 10*time.Second)
defer dialCancel()

// 在超时或取消时,所有使用该 context 的操作自动中止
h.Connect(dialCtx, peerInfo)

所有 go-libp2p 的网络操作(Connect、NewStream、PutValue 等)都接受 context.Context 作为第一个参数。正确使用 context 可以避免 goroutine 泄漏——当节点需要优雅关闭时,取消顶层 context 即可中止所有进行中的操作。

Kademlia DHT 实现

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p/core/host"
    dht "github.com/libp2p/go-libp2p-kad-dht"
)

func NewDHTHost(ctx context.Context) (host.Host, *dht.IpfsDHT, error) {
    h, err := libp2p.New(
        libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/4001"),
    )
    if err != nil {
        return nil, nil, err
    }

    // 创建 DHT(服务器模式,参与路由)
    kadDHT, err := dht.New(ctx, h,
        dht.Mode(dht.ModeServer),
        dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
    )
    if err != nil {
        return nil, nil, err
    }

    // 引导节点连接
    if err = kadDHT.Bootstrap(ctx); err != nil {
        return nil, nil, err
    }
    return h, kadDHT, nil
}

func dhtOperations(ctx context.Context, kadDHT *dht.IpfsDHT) error {
    // ===== 存储键值对 =====
    key := "/p2p-tutorial/example-key"
    err := kadDHT.PutValue(ctx, key, []byte("Hello DHT!"),
        dht.Quorum(2),  // 需要 2 个节点确认写入
    )
    if err != nil {
        return err
    }
    fmt.Println("Put: stored value for key")

    // ===== 读取键值对 =====
    retrieved, err := kadDHT.GetValue(ctx, key,
        dht.Quorum(1),
        dht.MaxRecordAge(1*time.Hour),
    )
    if err != nil {
        return err
    }
    fmt.Printf("Get: retrieved value = %s\n", string(retrieved))

    // ===== 查找内容提供者 =====
    contentID := "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"
    providers := kadDHT.FindProvidersAsync(ctx, contentID, 10)
    for pi := range providers {
        fmt.Printf("Provider found: %s\n", pi.ID)
    }
    return nil
}

PubSub 发布订阅

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/libp2p/go-libp2p"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
)

func runPubSub(ctx context.Context) error {
    h, _ := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
    defer h.Close()

    // 创建 GossipSub 实例
    ps, err := pubsub.NewGossipSub(ctx, h,
        pubsub.WithMessageSignaturePolicy(pubsub.StrictSign),
    )
    if err != nil {
        return err
    }

    // 加入主题
    topic, err := ps.Join("p2p-tutorial/chat")
    if err != nil {
        return err
    }

    // 订阅主题
    sub, err := topic.Subscribe()
    if err != nil {
        return err
    }

    // 后台发布消息
    go func() {
        for i := 0; i < 5; i++ {
            msg := fmt.Sprintf("Message %d from %s", i, h.ID())
            topic.Publish(ctx, []byte(msg))
            time.Sleep(1 * time.Second)
        }
    }()

    // 接收消息循环
    for {
        msg, err := sub.Next(ctx)
        if err != nil {
            return err  // context 取消时退出
        }
        // 跳过自己发送的消息
        if msg.ReceivedFrom == h.ID() {
            continue
        }
        fmt.Printf("[%s]: %s\n", msg.GetFrom(), string(msg.Data))
    }
}

注意:sub.Next(ctx) 会阻塞直到收到消息或 context 被取消。当 ctx 被取消时(如应用关闭),Next 返回 error,接收循环自动退出——这是优雅关闭的关键。

自定义文件共享协议

以下是一个完整的自定义文件共享协议,包含请求端和响应端

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
    "context"
    "encoding/binary"
    "fmt"
    "io"
    "os"

    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/libp2p/go-libp2p/core/protocol"
)

const FileShareProtocolID = protocol.ID("/p2p-tutorial/file-share/1.0.0")

type FileShareProtocol struct {
    host host.Host
}

func NewFileShareProtocol(h host.Host) *FileShareProtocol {
    fs := &FileShareProtocol{host: h}
    // 注册流处理器(服务端逻辑)
    h.SetStreamHandler(FileShareProtocolID, fs.handleStream)
    return fs
}

// handleStream 处理入站连接(服务端:响应文件请求)
func (fs *FileShareProtocol) handleStream(s network.Stream) {
    defer s.Close()

    // 读取请求类型
    var msgType uint8
    if err := binary.Read(s, binary.BigEndian, &msgType); err != nil {
        return
    }

    if msgType == 1 { // 文件请求
        // 读取文件名
        var nameLen uint32
        binary.Read(s, binary.BigEndian, &nameLen)
        nameBuf := make([]byte, nameLen)
        io.ReadFull(s, nameBuf)
        filename := string(nameBuf)

        fmt.Printf("File requested: %s\n", filename)

        // 读取文件并发送
        data, err := os.ReadFile(filename)
        if err != nil {
            binary.Write(s, binary.BigEndian, uint8(3)) // 错误响应
            return
        }
        binary.Write(s, binary.BigEndian, uint8(2)) // 成功响应
        binary.Write(s, binary.BigEndian, uint32(len(data)))
        s.Write(data)
    }
}

// RequestFile 请求文件(客户端逻辑)
func (fs *FileShareProtocol) RequestFile(
    ctx context.Context,
    peerID peer.ID,
    filename string,
) ([]byte, error) {
    // 打开到对方节点的流
    s, err := fs.host.NewStream(ctx, peerID, FileShareProtocolID)
    if err != nil {
        return nil, err
    }
    defer s.Close()

    // 发送请求
    binary.Write(s, binary.BigEndian, uint8(1))              // 请求类型
    binary.Write(s, binary.BigEndian, uint32(len(filename))) // 文件名长度
    s.Write([]byte(filename))                                 // 文件名

    // 读取响应
    var respType uint8
    binary.Read(s, binary.BigEndian, &respType)
    if respType == 3 {
        return nil, fmt.Errorf("peer could not read file")
    }
    var dataLen uint32
    binary.Read(s, binary.BigEndian, &dataLen)
    data := make([]byte, dataLen)
    io.ReadFull(s, data)
    return data, nil
}

架构对比

mermaid
flowchart LR
    subgraph Rust
        RR["Rust libp2p<br/>派生宏组合行为<br/>tokio 事件循环<br/>编译时安全"]
    end
    subgraph Go
        GG["Go libp2p<br/>选项模式配置<br/>goroutine 并发<br/>运行时安全"]
    end

    RR -->|"NetworkBehaviour Trait"| RBeh["组合协议<br/>类型安全"]
    GG -->|"Option Pattern"| GBeh["灵活配置<br/>快速迭代"]

    style RR fill:#FFD43B
    style GG fill:#00ADD8,color:#fff

参考资料