Go P2P Development: From Basics to PubSub

Go’s simple concurrency model and fast compilation make it a popular choice for P2P network development. go-libp2p is one of the most complete libp2p implementations and is widely used in large projects like IPFS (Kubo).

Environment Setup

bash
1
2
3
go get github.com/libp2p/go-libp2p
go get github.com/libp2p/go-libp2p-kad-dht
go get github.com/libp2p/go-libp2p-pubsub

go.mod configuration:

1
2
3
4
5
6
7
8
9
module p2p-tutorial

go 1.21

require (
    github.com/libp2p/go-libp2p v0.35.0
    github.com/libp2p/go-libp2p-kad-dht v0.25.0
    github.com/libp2p/go-libp2p-pubsub v0.10.0
)

Building Host and Node Interconnection

Go’s libp2p host creation uses the options pattern. Here’s a complete Echo node with host creation, stream handler, and node interconnection:

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "os/signal"
    "syscall"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    h, err := libp2p.New(
        libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
        libp2p.NATPortMap(),
    )
    if err != nil {
        panic(err)
    }
    defer h.Close()

    fmt.Printf("Host ID: %s\n", h.ID())
    fmt.Println("Listening addresses:")
    for _, addr := range h.Addrs() {
        fmt.Printf("  %s/p2p/%s\n", addr, h.ID())
    }

    // Set stream handler (Echo service)
    h.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
        fmt.Printf("New stream from %s\n", s.Conn().RemotePeer())
        go func() {
            io.Copy(s, s)  // Echo: return received data as-is
            s.Close()
        }()
    })

    // If a peer address is provided as CLI arg, connect to it
    if len(os.Args) > 1 {
        peerAddr := os.Args[1]
        fmt.Printf("Dialing %s\n", peerAddr)

        peerInfo, err := peer.AddrInfoFromString(peerAddr)
        if err != nil {
            panic(err)
        }

        // Connect to peer
        if err := h.Connect(ctx, *peerInfo); err != nil {
            panic(err)
        }
        fmt.Printf("Connected to %s\n", peerInfo.ID)

        // Open a stream and test Echo
        s, err := h.NewStream(ctx, peerInfo.ID, "/echo/1.0.0")
        if err != nil {
            panic(err)
        }
        s.Write([]byte("Hello from Go!\n"))

        buf := make([]byte, 1024)
        n, _ := s.Read(buf)
        fmt.Printf("Echo response: %s", string(buf[:n]))
        s.Close()
    }

    // Wait for interrupt signal
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    <-ch
    fmt.Println("Shutting down...")
}

Running and testing:

bash
1
2
3
4
5
6
7
8
9
# Terminal 1: Start first node
go run main.go
# Output: Host ID: 12D3Koo...
# Output: Listening addresses: /ip4/127.0.0.1/tcp/12345/p2p/12D3Koo...

# Terminal 2: Start second node and connect to first
go run main.go /ip4/127.0.0.1/tcp/12345/p2p/12D3Koo...
# Output: Connected to 12D3Koo...
# Output: Echo response: Hello from Go!

Context Management

Go’s context.Context is the core mechanism for lifecycle control in P2P development:

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Create cancellable context (for entire app lifecycle)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create timeout context (for single operations)
dialCtx, dialCancel := context.WithTimeout(ctx, 10*time.Second)
defer dialCancel()

// All operations using this context abort on timeout/cancel
h.Connect(dialCtx, peerInfo)

All go-libp2p network operations (Connect, NewStream, PutValue, etc.) accept context.Context as the first parameter. Proper context usage prevents goroutine leaks — when the node needs to shut down gracefully, canceling the top-level context aborts all in-flight operations.

Kademlia DHT Implementation

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p/core/host"
    dht "github.com/libp2p/go-libp2p-kad-dht"
)

func NewDHTHost(ctx context.Context) (host.Host, *dht.IpfsDHT, error) {
    h, err := libp2p.New(
        libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/4001"),
    )
    if err != nil {
        return nil, nil, err
    }

    // Create DHT in server mode (participates in routing)
    kadDHT, err := dht.New(ctx, h,
        dht.Mode(dht.ModeServer),
        dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
    )

    if err = kadDHT.Bootstrap(ctx); err != nil {
        return nil, nil, err
    }
    return h, kadDHT, nil
}

func dhtOperations(ctx context.Context, kadDHT *dht.IpfsDHT) error {
    // ===== Store key-value pair =====
    key := "/p2p-tutorial/example-key"
    err := kadDHT.PutValue(ctx, key, []byte("Hello DHT!"),
        dht.Quorum(2),
    )
    if err != nil {
        return err
    }
    fmt.Println("Put: stored value for key")

    // ===== Retrieve value =====
    retrieved, err := kadDHT.GetValue(ctx, key,
        dht.Quorum(1),
        dht.MaxRecordAge(1*time.Hour),
    )
    if err != nil {
        return err
    }
    fmt.Printf("Get: retrieved value = %s\n", string(retrieved))

    // ===== Find content providers =====
    contentID := "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"
    providers := kadDHT.FindProvidersAsync(ctx, contentID, 10)
    for pi := range providers {
        fmt.Printf("Provider found: %s\n", pi.ID)
    }
    return nil
}

PubSub Publish/Subscribe

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/libp2p/go-libp2p"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
)

func runPubSub(ctx context.Context) error {
    h, _ := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
    defer h.Close()

    // Create GossipSub instance
    ps, err := pubsub.NewGossipSub(ctx, h,
        pubsub.WithMessageSignaturePolicy(pubsub.StrictSign),
    )
    if err != nil {
        return err
    }

    // Join topic
    topic, err := ps.Join("p2p-tutorial/chat")
    if err != nil {
        return err
    }

    // Subscribe to topic
    sub, err := topic.Subscribe()
    if err != nil {
        return err
    }

    // Background publisher
    go func() {
        for i := 0; i < 5; i++ {
            msg := fmt.Sprintf("Message %d from %s", i, h.ID())
            topic.Publish(ctx, []byte(msg))
            time.Sleep(1 * time.Second)
        }
    }()

    // Receive loop
    for {
        msg, err := sub.Next(ctx)
        if err != nil {
            return err  // Exits when context is cancelled
        }
        // Skip own messages
        if msg.ReceivedFrom == h.ID() {
            continue
        }
        fmt.Printf("[%s]: %s\n", msg.GetFrom(), string(msg.Data))
    }
}

Note: sub.Next(ctx) blocks until a message is received or context is cancelled. When ctx is cancelled (e.g., app shutdown), Next returns an error and the receive loop exits automatically — this is key to graceful shutdown.

Custom File Sharing Protocol

Here’s a complete custom protocol with both request and response sides:

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
    "context"
    "encoding/binary"
    "fmt"
    "io"
    "os"

    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/libp2p/go-libp2p/core/protocol"
)

const FileShareProtocolID = protocol.ID("/p2p-tutorial/file-share/1.0.0")

type FileShareProtocol struct {
    host host.Host
}

func NewFileShareProtocol(h host.Host) *FileShareProtocol {
    fs := &FileShareProtocol{host: h}
    // Register stream handler (server-side logic)
    h.SetStreamHandler(FileShareProtocolID, fs.handleStream)
    return fs
}

// handleStream handles inbound connections (server: responds to file requests)
func (fs *FileShareProtocol) handleStream(s network.Stream) {
    defer s.Close()

    var msgType uint8
    if err := binary.Read(s, binary.BigEndian, &msgType); err != nil {
        return
    }

    if msgType == 1 { // File request
        var nameLen uint32
        binary.Read(s, binary.BigEndian, &nameLen)
        nameBuf := make([]byte, nameLen)
        io.ReadFull(s, nameBuf)
        filename := string(nameBuf)

        fmt.Printf("File requested: %s\n", filename)

        data, err := os.ReadFile(filename)
        if err != nil {
            binary.Write(s, binary.BigEndian, uint8(3)) // Error response
            return
        }
        binary.Write(s, binary.BigEndian, uint8(2)) // Success response
        binary.Write(s, binary.BigEndian, uint32(len(data)))
        s.Write(data)
    }
}

// RequestFile requests a file (client-side logic)
func (fs *FileShareProtocol) RequestFile(
    ctx context.Context,
    peerID peer.ID,
    filename string,
) ([]byte, error) {
    s, err := fs.host.NewStream(ctx, peerID, FileShareProtocolID)
    if err != nil {
        return nil, err
    }
    defer s.Close()

    // Send request
    binary.Write(s, binary.BigEndian, uint8(1))              // Request type
    binary.Write(s, binary.BigEndian, uint32(len(filename))) // Filename length
    s.Write([]byte(filename))                                 // Filename

    // Read response
    var respType uint8
    binary.Read(s, binary.BigEndian, &respType)
    if respType == 3 {
        return nil, fmt.Errorf("peer could not read file")
    }
    var dataLen uint32
    binary.Read(s, binary.BigEndian, &dataLen)
    data := make([]byte, dataLen)
    io.ReadFull(s, data)
    return data, nil
}

Architecture Comparison

mermaid
flowchart LR
    subgraph Rust
        RR["Rust libp2p<br/>Derive macro composition<br/>tokio event loop<br/>Compile-time safety"]
    end
    subgraph Go
        GG["Go libp2p<br/>Options pattern config<br/>goroutine concurrency<br/>Runtime safety"]
    end

    RR -->|"NetworkBehaviour Trait"| RBeh["Protocol composition<br/>Type safe"]
    GG -->|"Option Pattern"| GBeh["Flexible config<br/>Fast iteration"]

    style RR fill:#FFD43B
    style GG fill:#00ADD8,color:#fff

References