Applying VictoriaMetrics Stream Aggregation for Metrics

Community VM Stream Aggregation Capability Analysis and Issues

VictoriaMetrics Open-Source Project Native Capabilities

Stream aggregation in the VictoriaMetrics project was integrated into vmagent starting from version 1.86. For details, refer to: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460 From the source code analysis, the stream aggregation capability looks like this:

The core computation code is described in the pushSample function:

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) {
	currentTime := fasttime.UnixTimestamp()
	deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1)
again:
	v, ok := as.m.Load(outputKey)
	if !ok {
		v = &totalStateValue{
			lastValues: make(map[string]*lastValueState),
		}
		vNew, loaded := as.m.LoadOrStore(outputKey, v)
		if loaded {
			v = vNew
		}
	}
	sv := v.(*totalStateValue)
	sv.mu.Lock()
	deleted := sv.deleted
	if !deleted {
		lv, ok := sv.lastValues[inputKey]
		if !ok {
			lv = &lastValueState{}
			sv.lastValues[inputKey] = lv
		}
		d := value
		if ok && lv.value <= value {
			d = value - lv.value
		}
		if ok || currentTime > as.ignoreInputDeadline {
			sv.total += d
		}
		lv.value = value
		lv.deleteDeadline = deleteDeadline
		sv.deleteDeadline = deleteDeadline
	}
	sv.mu.Unlock()
	if deleted {
		goto again
	}
}

General Application Analysis of Stream Aggregation

First, let’s look at the time series chart after stream aggregation:

Now, let’s look at the theoretical form of time series data:

The ideal stream aggregation form based on the theoretical model:

The stream aggregation form in actual conditions:

Common Issues with Native Stream Aggregation

Collection Gap Problem

In the real world, things are more残酷: collection gaps are unavoidable for various reasons—network issues, performance problems on the collected services, etc. The impact of high-precision stream aggregation gap issues in large-number calculations is extreme:

Computing Architecture for Massive Data

Stream aggregation doesn’t save historical data details, so performance is excellent. But even the best-performing service has single-point limits. When dealing with data volumes that a single point cannot handle, distributed computing architecture introduces a series of problems:

  • Is vmagent’s built-in collection capability usable?

    In actual testing, enabling vmagent shard + replica collection combined with real-time stream aggregation significantly increased resource usage. At very large scales, continuous collection gaps occurred frequently, worsening the calculation result differences caused by gaps. Reference architecture diagram:

  • Which compute node should each Sample’s data be assigned to?
  • How to solve the problem of same-dimension metric sets computed by multiple nodes having different result values within the same time window, causing later values to be discarded? (Triggering out-of-order metric handling logic)

  • Resource balancing in distributed computing?
  • After inserting a task ID to solve compute node differentiation for same dimensions, how to handle the new dimension introduced?

Design and Implementation of a Distributed Stream Aggregation Gateway

After analyzing the problems encountered in current industry solutions combined with real scenarios in the previous section, this section addresses each problem specifically.

Through source code analysis, there are two key parts:

  • Time window range:
go
1
2
3
// The window range is the interval setting plus the interval shifted right by 1 bit
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1)
  • Calculation window logic:
go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Read metrics to be aggregated from cache
v, ok := as.m.Load(outputKey)
if !ok {
	v = &totalStateValue{
		lastValues: make(map[string]*lastValueState),
	}
	vNew, loaded := as.m.LoadOrStore(outputKey, v)
	if loaded {
		v = vNew
	}
}
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
	// Look up the recorded value of the incoming time series from the aggregated metric cache
	lv, ok := sv.lastValues[inputKey]
	if !ok {
		// If no recorded value exists, use the original value as the recorded value
		lv = &lastValueState{}
		sv.lastValues[inputKey] = lv
	}
	d := value
	if ok && lv.value <= value {
		// If a recorded value exists, take the difference between the new value and the recorded value
		// as the incremental value for the aggregated metric
		d = value - lv.value
	}
	if ok || currentTime > as.ignoreInputDeadline {
		sv.total += d
	}
	lv.value = value
	lv.deleteDeadline = deleteDeadline
	sv.deleteDeadline = deleteDeadline
}

From the source code analysis, the stream aggregation logic only performs simple periodic checks on the time window. There isn’t much handling for inflated calculated values caused by delayed arrival of incoming metrics due to various reasons.

At this point, we need a frontend module to solve some problems. Since vmgateway is an enterprise component, we developed our own distributed stream aggregation gateway vm-receive-route.

Problems Addressed by the Distributed Stream Aggregation Gateway

Asynchronous Processing

Most remote write projects do synchronous forwarding. For example, prometheus-kafka-adapter waits for kafka to complete data production logic before completing the entire write process and processing the next batch of remote write data.

However, stream computation has window constraints on data processing. If synchronous logic blocks here, performance is affected, causing subsequent samples to be delayed. If delays miss the stream computation time window, calculation deviations occur. Therefore, asynchronous forwarding is the right approach here.

Time Window Filtering

Since the stream aggregation logic already compares the difference between new and old values on the time series, there’s no need to introduce an out-of-order algorithm.

The gateway only needs to cooperate with the stream aggregation logic’s time window to filter and discard samples. This mainly solves the problem of Prometheus retries sending large backlogs of old samples affecting real-time calculation results, and also resolves the resource performance issues caused by Prometheus retries with large sample backlogs.

Dimension Control

Since the stream aggregation component inserts a node ID into each aggregated time series to distinguish labels. However, as nodes scale horizontally, this label’s dimensions also increase. We need a way to control dimensions, while simultaneously distributing time series by dimension ID to different nodes for processing. Later, we designed a dual hashmod scheduling algorithm to solve these problems.

By moving the taskid labeling capability to the gateway, we can ignore the impact of horizontal scaling node count on the task ID dimension:

Dual Hashmod Scheduling

Omitted for brevity.

Backend Service Migration for Failures

Omitted for brevity.

Program Flow Logic Diagram

Production Environment Architecture Diagram

Record Rule Dimension Task Generator

Stream aggregation is a technique for real-time processing and analyzing large data streams. Its main purpose is to extract valuable information from high-speed, continuous data streams, such as: calculating averages, sums, maximums, minimums, etc. Stream aggregation is typically performed in memory to maintain low latency and high throughput.

The implementation of stream aggregation typically includes the following key components:

  1. Data source and data sink: Define and monitor data streams such as logs, network packets, sensor data, splitting the data stream into multiple mini-batches.
  2. Window operations: Data streams are divided into fixed or sliding windows. Data within each window undergoes predefined aggregation operations.
  3. Aggregation functions: Various operations on data within windows, such as count, sum, average, max/min, etc.
  4. Output: Output or store aggregation results for further processing or real-time data visualization.

In metric stream aggregation, this capability efficiently reduces dimensions for a single metric. However, in reality, describing complex scenarios requires combining multiple metrics with functions.

Stream aggregation can efficiently solve the following aggregation problem of filtering the req_path field:

  • Before aggregation:
json
1
2
3
4
5
6
7
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="30021",dis_svr="192.168.2.3",dis_port="8080",code="202",req_path="/xxxx/yyyy?abc=exg"}
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="30023",dis_svr="192.168.2.3",dis_port="8080",code="202",req_path="/xxxx/yyyy?abc=sdf"}
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="10021",dis_svr="192.168.2.3",dis_port="8080",code="202",req_path="/xxxx/yyyy?abc=fasdfa"}
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="20210",dis_svr="192.168.2.3",dis_port="8080",code="202",req_path="/xxxx/yyyy?abc=asdfe"}
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="20211",dis_svr="192.168.2.3",dis_port="8080",code="202",req_path="/xxxx/yyyy?abc=sfadsf"}
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="21210",dis_svr="192.168.2.3",dis_port="8080",code="500",req_path="/xxxx/yyyy?abc=caresaf"}
a_http_req_total{zone="bj",src_svr="192.168.1.2",src_port="30121",dis_svr="192.168.2.3",dis_port="8080",code="400",req_path="/xxxx/yyyy?abc=werads"}
  • After aggregation:
json
1
2
3
agg_a_http_req_total{zone="bj",src_svr="192.168.1.2",dis_svr="192.168.2.3",dis_port="8080",code="202"}
agg_a_http_req_total{zone="bj",src_svr="192.168.1.2",dis_svr="192.168.2.3",dis_port="8080",code="500"}
agg_a_http_req_total{zone="bj",src_svr="192.168.1.2",dis_svr="192.168.2.3",dis_port="8080",code="400"}

But what we ultimately want to display is the success rate for a specific target:

json
1
2
3
4
5
6
7
8
9
sum by (dis_svr)
	(
		rate(a_http_req_total{code=~"2.*"}[5m])
	)
/
sum by (dis_svr)
	(
		rate(a_http_req_total{}[5m])
	)

We can use Prometheus’s Record Rule to directly compute aggregated scenario metrics.

sql
1
2
3
4
5
6
7
8
groups:
- name: a_http_req_total:sum:rate:5m
  rules:
  - expr: sum by (src_svr,dis_svr,code)
			  (
				  rate(a_http_req_total{}[5m])
			  )
	record: a_http_req_total:sum:rate:5m

But Record Rule loads all dimension data of the metric a_http_req_total into memory. If the dimension count reaches a critical threshold, it triggers OOM. Even if not critical, the computation load increases with dimensions, delaying calculation results.

In actual production environments, istio_request_total QPS data could be delayed by 20 minutes. Combining stream aggregation to reduce dimensions—from tens of millions of time series down to tens of thousands—only reduces delay from 20 minutes to 1-2 minutes.

Record Rule configuration after stream aggregation:

sql
1
2
3
4
5
6
7
8
groups:
- name: agg_a_http_req_total:sum:rate:5m
  rules:
  - expr: sum by (src_svr,dis_svr,code)
			  (
				  rate(agg_a_http_req_total{}[5m])
			  )
	record: agg_a_http_req_total:sum:rate:5m

By analyzing the source code of the Rule module, we can create more granular configurations:

sql
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
groups:
- name: agg_a_http_req_total:sum:rate:5m-2xx
  rules:
  - expr: sum by (src_svr,dis_svr,code)
			  (
				  rate(agg_a_http_req_total{code=~"2.**"}[5m])
			  )
	record: agg_a_http_req_total:sum:rate:5m
- name: agg_a_http_req_total:sum:rate:5m-4xx
  rules:
  - expr: sum by (src_svr,dis_svr,code)
			  (
				  rate(agg_a_http_req_total{code=~"4.**"}[5m])
			  )
	record: agg_a_http_req_total:sum:rate:5m
- name: agg_a_http_req_total:sum:rate:5m-5xx
  rules:
  - expr: sum by (src_svr,dis_svr,code)
			  (
				  rate(agg_a_http_req_total{code=~"5.**"}[5m])
			  )
	record: agg_a_http_req_total:sum:rate:5m

This way, we can split queries by specific dimension labels to achieve concurrent processing for generating the same new metrics.

The problem is that most production dimension labels are dynamically changing. How do we dynamically generate such dimension-split queries for concurrent Record Rule processing? We need a label metadata management system to track and manage label dimension metadata.

We implemented a small metadata watch module and rule build module to solve this problem. The configuration for this ruler-handle-process component is:

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
recode_rules:
- interval: 5m
  recode_to: istio_requests_total:sum:rate:5m
  metric_name: istio_requests_total
  aggr_type: sum
  vector_type: rate
  vector_range: 5m
  group_by_and_filter:
  - source_workload
  - destination_workload
  - cluster
  - namespace
  group_by:
  - response_code
  - namespace
  - source_workload_namespace
  - destination_workload_namespace
  - destination_service_name
  - cluster
  - reporter
  filter_by:
    cluster: "k8s-hw-bj-xxxxxx"
  with_out:
    source_workload: "ingressgateway-workflows"

ruler-handle-process watches the dimension combinations response_code, namespace, source_workload_namespace, destination_workload_namespace, destination_service_name, cluster, reporter under the metric name "istio_requests_total" to generate a set of Record Rule rules.

Combined with the Rule component for concurrent computation, we can reduce computation latency to seconds.

sql
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
groups:
- name: istio_requests_total:sum:rate:5m-7218756fe8a0bc327e818812cefb02f7
  rules:
  - expr: sum by (request_protocol, grpc_response_status, response_code, svr_response_code,
      namespace, source_workload_namespace, destination_workload_namespace, destination_service_name,
      cluster, reporter, source_workload, destination_workload, cluster, namespace)
      (rate(istio_requests_total{cluster="k8s-hw-bj-1-prod",destination_workload="skyaxe-778-flink",namespace="istio-ingress",source_workload="ingressgateway-streaming"}[5m]))
    record: istio_requests_total:sum:rate:5m
- name: istio_requests_total:sum:rate:5m-8e30244048f8d5519a6332f309578ed4
  rules:
  - expr: sum by (request_protocol, grpc_response_status, response_code, svr_response_code,
      namespace, source_workload_namespace, destination_workload_namespace, destination_service_name,
      cluster, reporter, source_workload, destination_workload, cluster, namespace)
      (rate(istio_requests_total{cluster="k8s-hw-bj-1-prod",destination_workload="t-bean-portal",namespace="coin",source_workload="unknown"}[5m]))
    record: istio_requests_total:sum:rate:5m
- name: istio_requests_total:sum:rate:5m-ea2d81ce1c35a7ba5a72b9e23fe7faf6
  rules:
  - expr: sum by (request_protocol, grpc_response_status, response_code, svr_response_code,
      namespace, source_workload_namespace, destination_workload_namespace, destination_service_name,
      cluster, reporter, source_workload, destination_workload, cluster, namespace)
      (rate(istio_requests_total{cluster="k8s-hw-bj-1-prod",destination_workload="wefly-ad-report",namespace="wf",source_workload="wefly-app-packages"}[5m]))
    record: istio_requests_total:sum:rate:5m

Stream Aggregation Architecture Combinations in the Monitoring Platform

Using community open-source programs and our self-developed components, we can combine different solutions for different collection scales to handle high-dimensional data at various levels.

Stream Aggregation for Tens of Thousands of Single Metrics

Computational Aggregation for Tens of Thousands of Multi-Metrics

Stream Aggregation for Millions+ of Single Metrics

Scenario-Based Computational Aggregation for Millions+ of Metrics