eBPF Series: DeepFlow Extended Protocol Parsing Practice (MongoDB Protocol & Kafka Protocol)

Overview

MongoDB is widely used today, but lacks effective observability capabilities. DeepFlow is an excellent solution for observability, but it lacks support for the MongoDB protocol. This article extends DeepFlow with MongoDB protocol parsing, enhancing observability in the MongoDB ecosystem. It briefly describes the process from protocol document analysis to implementing code parsing within DeepFlow.

How to Analyze a Protocol (MongoDB)

Protocol Document Analysis Approach

First, find the protocol parsing documentation from the official website. In the protocol document mongodb-wire-protocol#standard-message-header, we can see the MongoDB protocol header struct description as follows:

c
1
2
3
4
5
6
7
struct MsgHeader {
    int32   messageLength;     // total message size, including this
    int32   requestID;         // identifier for this message
    int32   responseTo;        // requestID from the original request
                               //   (used in responses from the database)
    int32   opCode;            // message type
}

The above struct can be visualized in the following diagram:

Note: The protocol document mongodb-wire-protocol states that the MongoDB protocol uses little-endian byte order.

Byte Ordering All integers in the MongoDB wire protocol use little-endian byte order: that is, least-significant

Now let’s look at actual packet capture data to see what the raw data looks like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
0000   a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00

0010   00 00 00 00 00 8e 00 00 00 01 6f 6b 00 00 00 00
0020   00 00 00 f0 3f 11 6f 70 65 72 61 74 69 6f 6e 54
0030   69 6d 65 00 01 00 00 00 bc 1d c3 64 03 24 63 6c
0040   75 73 74 65 72 54 69 6d 65 00 58 00 00 00 11 63
0050   6c 75 73 74 65 72 54 69 6d 65 00 01 00 00 00 bc
0060   1d c3 64 03 73 69 67 6e 61 74 75 72 65 00 33 00
0070   00 00 05 68 61 73 68 00 14 00 00 00 00 29 12 d4
0080   7f 78 52 55 42 04 29 2f b7 36 85 39 c1 47 66 05
0090   de 12 6b 65 79 49 64 00 01 00 00 00 8c d2 e4 63
00a0   00 00 00

The packet capture data above can be broken down as follows:

  • Field messageLength is a3 00 00 00 : message length is a3
  • Field requestID is 0a 50 88 48 : request ID is 4888500a
  • Field responseTo is 23 00 00 00 : response to ID 23
  • Field opCode is dd 07 00 00 : command number is 7dd, decimal 2013, corresponding to OP_MSG in the protocol document.

MongoDB Protocol OpCode Reference Table

OpCode NameOpCodeDescriptionNotes
OP_COMPRESSED2012Use compression
OP_MSG2013Send a message using the standard format. Used for both client requests and database replies.
OP_REPLY1Specify response to client request via responseTo.Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_UPDATE2001Update documentDeprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_INSERT2002Insert documentDeprecated in MongoDB 5.0. Removed in MongoDB 5.1.
RESERVED2003Skip
OP_QUERY2004Query documentDeprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_GET_MORE2005SkipDeprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_DELETE2006Delete documentDeprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_KILL_CURSORS2007SkipDeprecated in MongoDB 5.0. Removed in MongoDB 5.1.

Analyzing the Most Common OpCode OP_MSG

From the protocol document mongodb-wire-protocol#op_msg, check the OP_MSG struct:

c
1
2
3
4
5
6
OP_MSG {
    MsgHeader header;              // standard message header
    uint32 flagBits;               // message flags
    Sections[] sections;           // data sections
    optional<uint32> checksum;     // optional CRC-32C checksum
}

The decoding content to focus on for OP_MSG is in Sections. We only need to handle kind values of 0 and 1:

  • 0: Use BSON decoding directly afterwards;
  • 1: Offset by int32 and c_string bytes first, then use BSON to decode the remaining content.

Let’s look at the raw data from an actual packet capture. As shown below, the MongoDB protocol’s OP_MSG content starts from the 16th byte (counting from 0, consistent throughout this document).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
0000   a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00
0010   00 00 00 00 
                   00 
                      8e 00 00 00 01 6f 6b 00 00 00 00
0020   00 00 00 f0 3f 11 6f 70 65 72 61 74 69 6f 6e 54
0030   69 6d 65 00 01 00 00 00 bc 1d c3 64 03 24 63 6c
0040   75 73 74 65 72 54 69 6d 65 00 58 00 00 00 11 63
0050   6c 75 73 74 65 72 54 69 6d 65 00 01 00 00 00 bc
0060   1d c3 64 03 73 69 67 6e 61 74 75 72 65 00 33 00
0070   00 00 05 68 61 73 68 00 14 00 00 00 00 29 12 d4
0080   7f 78 52 55 42 04 29 2f b7 36 85 39 c1 47 66 05
0090   de 12 6b 65 79 49 64 00 01 00 00 00 8c d2 e4 63
00a0   00 00 00

No need to worry about the flagBits field. Skip 4 bytes and check the kind type from the 4th byte. This determines whether the following data is BSON structure data.

At this point, we’ve basically understood the MongoDB protocol’s data structure and decoding approach. Now let’s try implementing the decoding in DeepFlow Agent.

Extending Protocol Parsing in DeepFlow Agent

DeepFlow Agent Development Document Overview

Prerequisite: Native development for DeepFlow Agent requires basic Rust language skills. First, refer to the official documentation HOW_TO_SUPPORT_YOUR_PROTOCOL_EN to understand a few key points.

  • L7Protocol — used to identify protocol constants Source location: deepflow/agent/crates/public/src/l7_protocol.rs

  • L7ProtocolParser — mainly used for protocol identification and parsing into L7ProtocolInfo (basic structure info for L7 protocols) Source location: deepflow/agent/src/common/l7_protocol_log.rs

  • L7ProtocolInfo — parsed by L7ProtocolParser and used for subsequent session aggregation Source location: deepflow/agent/src/common/l7_protocol_info.rs

  • L7ProtocolInfoInterface — all L7 protocol structures L7ProtocolInfo need to implement this interface to handle feature logic Source location: deepflow/agent/src/common/l7_protocol_info.rs

  • L7ProtocolSendLog — unified structure for sending to deepflow-server Source location: deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs

General steps for development in deepflow-agent:

  • Add the corresponding protocol name and number in deepflow/agent/crates/public/src/l7_protocol.rs.
  • L7ProtocolParser::parse_payload() needs to return L7ProtocolInfo, so first define a struct, implement the L7ProtocolInfoInterface trait, and add it to the L7ProtocolInfo enum.
  • Implement the L7ProtocolParserInterface trait and add it to the impl_protocol_parser! macro in deepflow/agent/src/common/l7_protocol_log.rs.
  • On the deepflow-server side, just add a constant for search hints.

Code Guide

Define a Protocol with a Constant Identifier

Source location: deepflow/agent/crates/public/src/l7_protocol.rs. The Agent determines the application layer protocol of a flow by iterating through all supported protocols. Note: Since there’s no constraint field in industry-standard application protocols to define the application protocol type, in large-scale network packet processing, the application layer protocol is determined by iterating through known protocol decoding logic.

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pub enum L7Protocol {
    #[num_enum(default)]
    Unknown = 0,
    Other = 1,
    // HTTP
    Http1 = 20,
    Http2 = 21,
    Http1TLS = 22,
    Http2TLS = 23,
    // RPC
    Dubbo = 40,
    Grpc = 41,
    SofaRPC = 43,
    FastCGI = 44,
    // SQL
    MySQL = 60,
    PostgreSQL = 61,
    // NoSQL
    Redis = 80,
+   MongoDB = 81,
    // MQ
    Kafka = 100,
    MQTT = 101,
    // INFRA
    DNS = 120,
    Custom = 127,
    Max = 255,
}
rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
impl From<String> for L7Protocol {
    fn from(l7_protocol_str: String) -> Self {
        let l7_protocol_str = l7_protocol_str.to_lowercase();
        match l7_protocol_str.as_str() {
            "http" | "https" => Self::Http1,
            "dubbo" => Self::Dubbo,
            "grpc" => Self::Grpc,
            "fastcgi" => Self::FastCGI,
            "custom" => Self::Custom,
            "sofarpc" => Self::SofaRPC,
            "mysql" => Self::MySQL,
+           "mongodb" => Self::MongoDB,
            "postgresql" => Self::PostgreSQL,
            "redis" => Self::Redis,
            "kafka" => Self::Kafka,
            "mqtt" => Self::MQTT,
            "dns" => Self::DNS,
            _ => Self::Unknown,
        }
    }
}

Prepare Parsing Logic for the New Protocol

Define the Struct

Create the protocol parsing logic code file in a directory under deepflow/agent/src/flow_generator/protocol_logs/. In this case, the code file is placed at sql/mongo.rs under the above directory.

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
pub struct MongoDBInfo {
    msg_type: LogMessageType,
    #[serde(rename = "req_len")]
    pub req_len: u32,
    #[serde(rename = "resp_len")]
    pub resp_len: u32,
    //// Refer to "deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs"
    //   Prepare the struct to be processed.
    //   Among them, "request_id", "response_id", "op_code" and "op_code_name" are
    //   key information parsed from the mongodb header.
    #[serde(rename = "request_id")]
    pub request_id: u32,
    #[serde(rename = "response_id")]
    pub response_id: u32,
    #[serde(rename = "op_code")]
    pub op_code: u32,
    #[serde(skip)]
    pub op_code_name: String,
    //// "request", "response" and "response_code" are
    //   required information parsed from the mongodb protocol body content.
    #[serde(rename = "request_resource")]
    pub request: String,
    #[serde(skip)]
    pub response: String,
    #[serde(rename = "response_code")]
    pub response_code: i32,
    ////
    #[serde(rename = "response_status")]
    pub status: L7ResponseStatus,
}
Implement L7ProtocolParserInterface
  • First look at the source struct logic (only the functions that need to be processed are shown below; for those not needing processing, keep the default logic):
rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#[enum_dispatch]
pub trait L7ProtocolParserInterface {
    fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool;
    // Protocol parsing
    fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult>;
    // Return protocol number and protocol name. Since the bitmap uses u128, the protocol number cannot exceed 128.
    // crates/public/src/l7_protocol.rs, pub const L7_PROTOCOL_xxx is the implemented protocol.
    // ===========================================================================================
    fn protocol(&self) -> L7Protocol;
    // whether l4 is parsed when tcp, use for quickly protocol filter
    // ==============================
    fn parsable_on_tcp(&self) -> bool {
        true
    }
    // whether l4 is parsed when udp, use for quickly protocol filter
    // ==============================
    fn parsable_on_udp(&self) -> bool {
        true
    }
    // return perf data
    fn perf_stats(&mut self) -> Option<L7PerfStats>;
}
  • The first step in decoding a protocol is how to identify it. The code needs to handle L7ProtocolParserInterface::check_payload() logic.

    • Define the MongoDB protocol header and decode it
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    // Define the MongoDB protocol header struct, and decode the necessary fields
    #[derive(Clone, Debug, Default, Serialize)]
    pub struct MongoDBHeader {
        length: u32,
        request_id: u32,
        response_to: u32,
        op_code: u32,
        op_code_name: String,
    }
    
    impl MongoDBHeader {
        fn decode(&mut self, payload: &[u8]) -> isize {
            // Decode the first 16 bytes of payload according to MongoDBHeader struct,
            // and determine if it matches the MongoDB protocol.
        }
        fn is_request(&self) -> bool {
            // Decode op_code to determine if it's a request
        }
        pub fn get_op_str(&self) -> &'static str {
            // Decode op_code to the corresponding text description
        }
    }
    • Call the MongoDB protocol header decoding logic in L7ProtocolParserInterface::check_payload() In this process, also handle protocol(&self) and parsable_on_udp(&self).
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    impl L7ProtocolParserInterface for MongoDBLog {
        fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool {
            let mut header = MongoDBHeader::default();
            header.decode(payload);
            return header.is_request();
        }
        fn protocol(&self) -> L7Protocol {
            L7Protocol::MongoDB
        }
        // Skip decoding for udp protocols
        fn parsable_on_udp(&self) -> bool { false }
    }
    • Result display of the first step At this step, the decoding will produce the following display. The next step is to further decode the specific protocol opcodes.
  • The second step in decoding the protocol is to define structs and decoding interface logic for key commands, corresponding to the L7ProtocolParserInterface::parse_payload() code implementation. Let’s use OP_MSG as an example.

    • Define the OP_MSG opcode struct and decode it
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    #[derive(Clone, Debug, Default, Serialize)]
    pub struct MongoOpMsg {
        flag: u32,
        sections: Sections,
        checksum: Option<u32>,
    }
    
    impl MongoOpMsg {
        fn decode(&mut self, payload: &[u8]) -> Result<bool> {
            // Skip offset logic
            let _ = sections.decode(&payload);
            self.sections = sections;
            Ok(true)
        }
    }
    • Further decode the Sections field that needs attention in the OP_MSG opcode
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    #[derive(Clone, Debug, Default, Serialize)]
    struct Sections {
        kind: u8,
        kind_name: String,
        // kind: 0 means doc
        doc: Document,
        // kind: 1 means body
        size: Option<i32>,
        c_string: Option<String>,
    }
    
    impl Sections {
        pub fn decode(&mut self, payload: &[u8]) -> Result<bool> {
            match self.kind {
                0 => { // Body }
                1 => { // Doc }
                2 => { // Internal }
                _ => { // Unknown }
            }
            Ok(true)
        }
    }
    • Handle L7ProtocolParserInterface::parse_payload, return L7ProtocolInfo
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    #[derive(Clone, Debug, Default, Serialize)]
    pub struct MongoDBLog {
        info: MongoDBInfo,
        #[serde(skip)]
        perf_stats: Option<L7PerfStats>,
    }
    impl L7ProtocolParserInterface for MongoDBLog {
        fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
            let mut info = MongoDBInfo::default();
            self.parse(payload, param.l4_protocol, param.direction, &mut info)?;  // Decode to get L7ProtocolInfo
        }
    }
    impl MongoDBLog {
        fn parse(&mut self, payload:&[u8], proto:IpProtocol, dir:PacketDirection, info:&mut MongoDBInfo) -> Result<bool> {
            // Decode command to get request and response info
            match info.op_code {
                _OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => {
                    // OP_MSG
                    let mut msg_body = MongoOpMsg::default();
                    // TODO: Message Flags
                    msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?;
                }
            }
        }
    }
    • Implement L7ProtocolInfoInterface for MongoDBInfo
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    impl L7ProtocolInfoInterface for MongoDBInfo {
        fn session_id(&self) -> Option<u32> {
            // Return stream identifier id, e.g., http2 returns streamid, dns returns transaction id, return None if not applicable
        }
        fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> {
            // Here self is definitely the request, other is definitely the response
            if let L7ProtocolInfo::MongoDBInfo(other) = other {
                self.merge(other);
            }
            Ok(())
        }
        fn app_proto_head(&self) -> Option<AppProtoHead> {
            // Return an AppProtoHead struct, return None to directly discard this data
            Some(AppProtoHead {
                proto: L7Protocol::MongoDB,
            })
        }
        fn is_tls(&self) -> bool {
            self.is_tls
        }
    }
    • Implement L7ProtocolSendLog for MongoDBInfo
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    impl From<MongoDBInfo> for L7ProtocolSendLog {
        fn from(f: MongoDBInfo) -> Self {
            let log = L7ProtocolSendLog {
                // Convert info to the unified send structure L7ProtocolSendLog
            };
            return log;
        }
    }
    
    // Reference source: deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs
    pub struct L7ProtocolSendLog {
        pub req_len: Option<u32>,
        pub resp_len: Option<u32>,
        pub row_effect: u32,
        pub req: L7Request,
        pub resp: L7Response,
        pub version: Option<String>,
        pub trace_info: Option<TraceInfo>,
        pub ext_info: Option<ExtendedInfo>,
    }
    • Add the interface implementing L7ProtocolParserInterface to the impl_protocol_parser! macro in deepflow/agent/src/common/l7_protocol_log.rs.
    rust
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    impl_protocol_parser! {
        pub enum L7ProtocolParser {
            // http have two version but one parser, can not place in macro param.
            // custom must in first so can not place in macro
            DNS(DnsLog),
            SofaRPC(SofaRpcLog),
            MySQL(MysqlLog),
            Kafka(KafkaLog),
            Redis(RedisLog),
    +       MongoDB(MongoDBLog),
            PostgreSQL(PostgresqlLog),
            Dubbo(DubboLog),
            FastCGI(FastCGILog),
            MQTT(MqttLog),
            // add protocol below
        }
    }
    • Result of the second step
  • Record QPS, Latency, and Errors through perf_stats:

rust
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
impl L7ProtocolParserInterface for MongoDBLog {
    fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
        let mut info = MongoDBInfo::default();
        self.parse(payload, param.l4_protocol, param.direction, &mut info)?;  // Decode to get L7ProtocolInfo
        info.cal_rrt(param, None).map(|rrt| {
            info.rrt = rrt;
+           self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); // Latency
        });
    }
}
impl MongoDBLog {
    fn parse(&mut self, payload:&[u8], proto:IpProtocol, dir:PacketDirection, info:&mut MongoDBInfo) -> Result<bool> {
        if header.is_request() {
+           self.perf_stats.as_mut().map(|p: &mut L7PerfStats| p.inc_req()); // Request record
        } else {
+           self.perf_stats.as_mut().map(|p| p.inc_resp()); // Response record
        }
        match info.op_code {
            _OP_REPLY if payload.len() > _HEADER_SIZE => {
                let mut msg_body = MongoOpReply::default();
                msg_body.decode(&payload[_HEADER_SIZE..])?;
                if !msg_body.reply_ok {
+                   self.perf_stats.as_mut().map(|p| p.inc_resp_err());// Error record
                }
            }
        }
    }
}

Result:

  • Finally, supplement the server-side protocol identification in server The following two parts are in the code file server/libs/datatype/flow.go
go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type L7Protocol uint8
const (
    L7_PROTOCOL_UNKNOWN    L7Protocol = 0
    L7_PROTOCOL_OTHER      L7Protocol = 1
    L7_PROTOCOL_HTTP_1     L7Protocol = 20
    L7_PROTOCOL_HTTP_2     L7Protocol = 21
    L7_PROTOCOL_HTTP_1_TLS L7Protocol = 22
    L7_PROTOCOL_HTTP_2_TLS L7Protocol = 23
    L7_PROTOCOL_DUBBO      L7Protocol = 40
    L7_PROTOCOL_GRPC       L7Protocol = 41
    L7_PROTOCOL_SOFARPC    L7Protocol = 43
    L7_PROTOCOL_FASTCGI    L7Protocol = 44
    L7_PROTOCOL_MYSQL      L7Protocol = 60
    L7_PROTOCOL_POSTGRE    L7Protocol = 61
    L7_PROTOCOL_REDIS      L7Protocol = 80
+   L7_PROTOCOL_MONGODB    L7Protocol = 81
    L7_PROTOCOL_KAFKA      L7Protocol = 100
    L7_PROTOCOL_MQTT       L7Protocol = 101
    L7_PROTOCOL_DNS        L7Protocol = 120
    L7_PROTOCOL_CUSTOM     L7Protocol = 127
)
go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (p L7Protocol) String() string {
    formatted := ""
    switch p {
    case L7_PROTOCOL_HTTP_1:
        formatted = "HTTP"
    case L7_PROTOCOL_DNS:
        formatted = "DNS"
    case L7_PROTOCOL_MYSQL:
        formatted = "MySQL"
    case L7_PROTOCOL_POSTGRE:
        formatted = "PostgreSQL"
    case L7_PROTOCOL_REDIS:
        formatted = "Redis"
+   case L7_PROTOCOL_MONGODB:
+       formatted = "MongoDB"
    case L7_PROTOCOL_DUBBO:
        formatted = "Dubbo"
    case L7_PROTOCOL_GRPC:
        formatted = "gRPC"
    case L7_PROTOCOL_CUSTOM:
        formatted = "Custom"
    case L7_PROTOCOL_OTHER:
        formatted = "Others"
    default:
        formatted = "N/A"
    }
    return formatted
}

server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
	# Value , DisplayName     , Description
	0       , N/A             ,
	1       , Others          ,
	20      , HTTP            ,
	21      , HTTP2           ,
	22      , HTTP1_TLS       ,
	23      , HTTP2_TLS       ,
	40      , Dubbo           ,
	41      , gRPC            ,
	43      , SOFARPC         ,
	44      , FastCGI         ,
	60      , MySQL           ,
	61      , PostgreSQL      ,
	80      , Redis           ,
+	81      , MongoDB         ,
	100     , Kafka           ,
	101     , MQTT            ,
	120     , DNS             ,
	127     , Custom          ,

At this point, the native protocol extension of DeepFlow Agent is complete. Refer to the Complete Guide: How to Compile, Package and Deploy Custom DeepFlow to compile and publish.

If you want to quickly implement a protocol collection parser, or if you’re not familiar with Rust, there’s another option: use Wasm plugins to quickly extend protocol decoding.

Extending DeepFlow Protocol Collection Using Wasm Plugins

This case study uses Wasm to extend the Kafka protocol with Topic support. First, let’s reference the official Kafka documentation to do a simple analysis of the Kafka protocol.

Kafka Protocol Analysis

Kafka Header and Data Overview

Kafka Fetch API

Kafka Produce API

Kafka Protocol DeepFlow Agent Native Decoding

As of v6.3.x, the native decoding of Kafka by DeepFlow Agent is as shown below. It doesn’t yet support decoding the Topic field, and API decoding doesn’t have version numbers yet. The main goal of the plugin development that follows is to decode the Topic field and display it in the resource, while also parsing the API version numbers.

DeepFlow Agent Wasm Plugin

Refer to the official plugin documentation wasm-plugin. Two things to note:

  • The Agent determines the application layer protocol of a flow by iterating through all supported protocols, in the order: HTTP -> Wasm Hook -> DNS -> …
  • Requires Go version no lower than 1.21 and tinygo version no lower than 0.29

Wasm Go SDK Framework

  • First, get a general understanding of the framework. As shown in the code below, the entire framework logic is in the following five interface functions.
go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package main
import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
// Define struct, needs to implement sdk.Parser interface
type plugin struct {}

func (p plugin) HookIn() []sdk.HookBitmap {return []sdk.HookBitmap{}}
// When HookIn() contains HOOK_POINT_HTTP_REQ, it's called before the http request parsing completes and returns.
// HttpReqCtx contains BaseCtx and some already parsed http headers
func (p plugin) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {
    return sdk.HttpReqActionAbortWithResult(nil, trace, attr)
}
func (p plugin) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()}
func (p plugin) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 0, "ownwasm"}
func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
    return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
}
// main needs to register the parser
func main() {
    sdk.SetParser(plugin{})
}
  • The Agent iterates through all plugins calling the corresponding Export functions, but the iteration behavior can be controlled by the return value.
Return ValueDescription
sdk.ActionNext()Stop current plugin, execute the next plugin directly
sdk.ActionAbort()Stop current plugin and stop traversal
sdk.ActionAbortWithErr(err)Stop current plugin, print error log and stop traversal
sdk.HttpActionAbortWithResult()Agent stops traversal and extracts the corresponding result
sdk.ParseActionAbortWithL7Info()Agent stops traversal and extracts the corresponding result

⚠️Note: Since this case doesn’t involve HTTP protocol processing, OnHttpReq() and OnHttpResp() can directly use sdk.ActionNext() to skip. This case also won’t use sdk.HttpActionAbortWithResult().

  • The three hook points of HookBitmap
Hook PointDescription
HOOK_POINT_HTTP_REQCalled before http request parsing completes and returns
HOOK_POINT_HTTP_RESPCalled before http response parsing completes and returns
HOOK_POINT_PAYLOAD_PARSEFor protocol identification and parsing

⚠️Note: Since this case doesn’t involve HTTP protocol processing, HOOK_POINT_HTTP_REQ and HOOK_POINT_HTTP_RESP are not used.

Plugin Code Guide

  • The structured Kafka protocol Wasm plugin code framework
go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package main
import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
// Define struct, needs to implement sdk.Parser interface
type kafkaParser struct {}

func (p kafkaParser) HookIn() []sdk.HookBitmap {
    return []sdk.HookBitmap{sdk.HOOK_POINT_PAYLOAD_PARSE}
}
// Skip HTTP protocol processing
func (p kafkaParser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {return sdk.ActionNext()}
func (p kafkaParser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()}
// Protocol identification check
func (p kafkaParser) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 100, "kafka"}
// Protocol decoding
func (p kafkaParser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
    return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
}
// main needs to register the parser
func main() {sdk.SetParser(plugin{})}
  • Protocol identification

⚠️Note: Code comments below

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (p kafkaParser) OnCheckPayload(ctx *sdk.ParseCtx) (uint8, string) {
	// Skip UDP protocol data
	if ctx.L4 != sdk.TCP {
		return 0, ""
	}
	// If the environment has standard port conventions, specifying the port in the plugin reduces protocol data traversal,
	// optimizing CPU and other resource consumption during decoding.
	if ctx.DstPort < 9092 || ctx.DstPort > 9093 {
		return 0, ""
	}
	// Read packet capture data
	payload, err := ctx.GetPayload()
	if err != nil {
		sdk.Error("get payload fail: %v", err)
		return 0, ""
	}
	// Use "github.com/segmentio/kafka-go/protocol" for decoding
	bl, err := protocol.ReadAll(protocol.NewBytes(payload))
	if err != nil {
		sdk.Error("read payload fail: %v", err)
		return 0, ""
	}
	b, _ := decodeHeader(bl)
	if !b {
		return 0, ""
	}
	return WASM_KAFKA_PROTOCOL, "kafka"
}
  • Protocol API decoding

    • The official code framework’s OnParsePayload() logic is as follows:
    go
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
    func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
        // ctx.L7 is the protocol number returned by OnCheckPayload, can filter by L4 protocol or protocol number first.
        if ctx.L4 != sdk.TCP {return sdk.ActionNext()}
        payload, err := ctx.GetPayload()
        if err != nil {return sdk.ActionAbortWithErr(err)}
        // the parse logic here
        // ...
        /* About the L7ProtocolInfo struct:
                type L7ProtocolInfo struct {
                    ReqLen    *int       // Request length, e.g., http content-length
                    RespLen   *int       // Response length, e.g., http content-length
                    RequestID *uint32    // Sub-stream identifier id, e.g., http2 stream id, dns transaction id
                    Req       *Request
                    Resp      *Response
                    Trace     *Trace     // Trace info
                    Kv        []KeyVal   // Corresponds to attribute
                }
                type Request struct {
                    ReqType  string  // Corresponds to request type
                    Domain   string  // Corresponds to request domain
                    Resource string  // Corresponds to request resource
                    Endpoint string  // Corresponds to endpoint
                }
                type Response struct {
                    Status    RespStatus // Corresponds to response status
                    Code      *int32     // Corresponds to response code
                    Result    string     // Corresponds to response result
                    Exception string     // Corresponds to response exception
                }*/
        return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
    }
    • Topic field decoding logic
    go
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
    func (p kafkaParser) OnParsePayload(ctx *sdk.ParseCtx) sdk.Action {
        // the parse logic here
        // ...
    
      	// Decode header base size:
      	// req_len(int32) + api_key(int16) + api_ver(int16) + c_id(int32) + client_len(int16)
      	// = 14
      	var header_offset = 14 + header.clientLen
      	var topic_size int16 = 0
      	var topic_name = ""
      	switch protocol.ApiKey(header.apikey) {
      	case protocol.Produce:
      		topic_size, topic_name = decodeProduce(header.apiversion, payload[header_offset:])
      	case protocol.Fetch:
      		topic_size, topic_name = decodeFetch(header.apiversion, payload[header_offset:])
      	}
      	if topic_size == 0 {
      		return sdk.ActionNext()
      	}
      	req = &sdk.Request{
      		ReqType:  protocol.ApiKey(header.apikey).String() + "_v" + strconv.Itoa(int(header.apiversion)),
      		Resource: topic_name,
      	}
      	return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{
      		{
      			RequestID: &id,
      			ReqLen:    &length,
      			Req:       req,
      		},
      	})
    }

Loading the Plugin and Result Display

Compile the plugin with the following command and load it via CTL:

zsh
1
2
3
➜ deepflow-plugin git:(main) ✗ tinygo build -o build/topic.wasm  -target wasi  -panic=trap -scheduler=none -no-debug ./wasm/kafka/topic.go

➜ deepflow-plugin git:(main) ✗ deepflow-ctl plugin create --type wasm --image build/topic.wasm --name topic

Prepare the Agent configuration file with the following additions. Note that the Agent can load multiple Wasm plugins.

yaml
1
2
3
4
5
6
7
  ############
  ## plugin ##
  ############
  ## wasm plugin need to load in agent
  wasm-plugins:
    - mongo
    - topic

Run the command to update the configuration:

shell
1
➜ deepflow-plugin git:(main) ✗ deepflow-ctl agent-group-config update -f g-d2d06af17e.yaml

When the Agent log shows the yellow content as shown in the image below, it means the loading was successful.

In Grafana, you can see that the native Kafka protocol has been overridden, with several changes:

  • The Protocol field changed from Kafka to Custom
  • The Request type field now includes API version numbers
  • The Request resource field now shows Topic information

Conclusion

Finally, let’s compare the two protocol extension approaches. Note ⚠️: Both approaches share a common issue: each added protocol reduces the efficiency of protocol identification and decoding. The number of protocols to decode can be reduced through configuration.

Native Rust Extension

  • Advantages:
    • Lower runtime resource usage compared to plugins
    • Supports richer functionality than plugins, with more flexible customization
  • Disadvantages:
    • Higher development difficulty in terms of language compared to plugins
    • Adding a new protocol requires changes in more locations, including a small portion of Server code

Wasm Plugin Extension

  • Advantages:
    • Lower development difficulty with Golang compared to Rust
    • Can be loaded at runtime via CLI
    • Strong extensibility
  • Disadvantages:
    • Go’s standard library and third-party libraries have certain limitations, and debugging is difficult, making plugin anomalies hard to troubleshoot
    • Due to Wasm’s inherent limitations, functionality is weaker compared to native Rust development
    • Increased resource consumption, especially in terms of memory

Appendix