Advanced Bash Programming Techniques — Handling Message Queue Backlog with the Producer-Consumer Pattern

Introduction

In message system operations, queue backlog is a common and tricky problem. Especially during malicious attacks or system anomalies, the delivery queue may accumulate a large number of messages, severely impacting normal system operation. Traditional single-process processing is not only inefficient but may also cause problems to worsen due to delayed handling.

This article shares a set of advanced Bash programming techniques based on the producer-consumer pattern. Through core technologies including multi-process concurrency, pipe communication, and efficient awk parsing, we achieve fast and effective handling of message queue backlog issues. This solution has been validated in multiple production environments, capable of improving processing efficiency several times over while ensuring system stability.

Problem Scenario

Generalized Problem Description

The main challenges faced by message delivery systems include:

  1. Queue backlog caused by malicious attacks: Attackers send large volumes of spam in a short time, causing delivery queue files to surge
  2. Resource contention caused by system failures: Anti-spam engines or other services malfunction, consuming significant CPU and IO resources
  3. Sudden business peaks: Scenarios such as marketing campaigns or holiday greetings cause a sharp increase in normal email delivery volume
  4. Long-term accumulated performance bottlenecks: Over time, temporary files increase and system performance gradually degrades

Typical Symptoms

When message queue problems occur, the following symptoms typically appear:

  • Queue file count exceeds normal thresholds (e.g., thousands or even tens of thousands)
  • Delivery latency increases significantly, greatly extending user email receipt times
  • System load remains persistently high, affecting normal business processing
  • Disk space is consumed rapidly, posing a risk of system crash

Design Approach

Multi-Process Pipe Architecture

This solution adopts the producer-consumer pattern, improving efficiency through multi-process concurrent processing. The core design approach is as follows:

mermaid
flowchart TD
    classDef primary fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    classDef storage fill:#e8f5e9,stroke:#4caf50,stroke-width:2px
    classDef process fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
    classDef network fill:#fff3e0,stroke:#ff9800,stroke-width:2px
    
    A@{ shape: process, label: "Log Scan/File Scan" } --> B@{ shape: cyl, label: "Message Queue" }
    B --> C@{ shape: process, label: "Producer Process" }
    C --> D@{ shape: hex, label: "Pipe Communication" }
    D --> E@{ shape: process, label: "Consumer Process" }
    E --> F@{ shape: process, label: "Message Move Processing" }
    F --> G@{ shape: rounded, label: "Result Summary" }
    
    class A C E F process
    class B storage
    class D network
    class G primary
    
    subgraph "Producer Side"
        A
        B
        C
    end
    
    subgraph "Consumer Side"
        D
        E
        F
        G
    end

Key Design Points

  1. Separate production from consumption: Decouple scanning tasks from move tasks to avoid mutual blocking
  2. Pipe communication: Use FIFO pipes for efficient inter-process communication
  3. Concurrency control: Control the number of concurrent processes through a semaphore mechanism
  4. Error handling: Comprehensive logging and exception handling mechanisms
  5. Resource management: Automatic cleanup of temporary files to prevent resource leaks

Performance Advantage Analysis

Compared to traditional single-process processing, this solution has the following advantages:

  1. Parallel processing: Multiple consumer processes work simultaneously, linearly improving processing speed
  2. Load balancing: Automatically distributes tasks across different processes, avoiding single-point bottlenecks
  3. IO optimization: Batch processing of file system operations, reducing IO overhead
  4. Memory-friendly: Pipe-based stream processing avoids large amounts of data resident in memory

Core Implementation

1. Queue Scanning Module

Queue scanning is the first step in the entire processing flow, responsible for identifying target files that need processing. Here we provide two scanning methods:

Context File-Based Scanning

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env bash

# Scan message context files, match target emails
scan_context_files() {
    local pattern="$1"
    local context_dir="/opt/app/var/mda/dacontext/"
    local backup_dir="/opt/app/backup/$(date +%F)/"
    
    # Create backup directory
    mkdir -p "${backup_dir}"{msg,dacontext}
    
    # Find and scan context files
    find "${context_dir}" -type f | while read line; do
        if [[ -n $(grep -a "${pattern}" "$line") ]]; then
            echo "$(basename "$line")"
        fi
    done
}

Log File-Based Scanning

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env bash

# Scan delivery logs, extract target message IDs
scan_log_files() {
    local log_dir="/opt/app/logs/rcptstat/"
    local pattern="$1"
    local today=$(date +%F)
    local yesterday=$(date +%F -d "yesterday")
    
    # Process today and yesterday log files
    for day in "${today}" "${yesterday}"; do
        local day_log="${log_dir}/da${day//-/_}"
        
        for ((i=0; i<24; i++)); do
            local log_file="${day_log}/da.127.0.0.1.$(date +%H -d "-$i hour")_00_00.log"
            
            if [[ -f "${log_file}" ]]; then
                process_log_file "${log_file}" "${pattern}"
            fi
        done
    done
}

2. Multi-Process Concurrency Architecture

The core of multi-process concurrency is the pipe and semaphore mechanism:

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env bash

# Set concurrent process count
PROCESS_NUMS=10

# Create pipe for process control
TMP_FIFO_FOR_PROCESS="/tmp/$$_06.fifo" && \
    mkfifo "${TMP_FIFO_FOR_PROCESS}" && \
    exec 6<>"${TMP_FIFO_FOR_PROCESS}" && \
    rm "${TMP_FIFO_FOR_PROCESS}"

# Initialize semaphore
for ((i=0; i<${PROCESS_NUMS}; i++)); do
    echo
done >&6

# Create pipe for task delivery
TMP_FIFO_FOR_TID="/tmp/$$_07.fifo" && \
    mkfifo "${TMP_FIFO_FOR_TID}" && \
    exec 7<>"${TMP_FIFO_FOR_TID}" && \
    rm "${TMP_FIFO_FOR_TID}"

3. Producer Process Implementation

The producer process is responsible for scanning files and sending tasks to the pipe:

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env bash

# Producer: scan context files and dispatch tasks
producer_context_scan() {
    local pattern="$1"
    local context_dir="/opt/app/var/mda/dacontext/"
    
    # Start scan process
    find "${context_dir}" -type f | while read line; do
        if [[ -n $(grep -a "${pattern}" "$line") ]]; then
            echo "$(basename "$line")" >&7 2>/dev/null
        fi
    done
    
    # Send completion marker
    echo "done" >&7
}

# Producer: scan log files and dispatch tasks
producer_log_scan() {
    for file in "$@"; do
        if [[ ! -f "$file" ]]; then
            continue
        fi
        
        {
            # Process log file, extract message IDs
            awk -F"[][]" '
            $0 ~ /\[cmd:remote,tid/ {
                tid=gettid($2)
                result=getresult($2)
                if(result=="Normal-Spam"){
                    print tid
                }
            }
            
            function gettid(s){
                match(s,/,tid:.*[0-9],from:/)
                start=RSTART + 5
                end=RLENGTH - 11
                return substr(s,start,end)
            }
            
            function getresult(s){
                match(s,/,commresult:.*,commrefid:/)
                start=RSTART + 12
                end=RLENGTH - 23
                return substr(s,start,end)
            }' "$file" | while read line; do
                echo "${line}" >&7 2>/dev/null
            done
        } &
    done
    
    # Send completion marker
    echo "done" >&7
}

4. Consumer Process Implementation

The consumer process retrieves tasks from the pipe and executes specific move operations:

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env bash

# Consumer: process message move tasks
consumer_move_messages() {
    local backup_dir="/opt/app/backup/$(date +%F)/"
    local msg_dir="/opt/app/var/mda/msg/"
    local context_dir="/opt/app/var/mda/dacontext/"
    
    # Create backup directory
    mkdir -p "${backup_dir}"{msg,dacontext}
    
    while true; do
        if read tid <&7; then
            [[ "${tid}" == 'done' ]] && wait && break
            
            # Acquire semaphore
            read -u6
            
            # Execute move operation
            {
                # Move message file
                find "${msg_dir}" -name "${tid}" -exec mv {} "${backup_dir}/" \;
                
                # Move context file
                find "${context_dir}" -name "${tid}" -exec mv {} "${backup_dir}/dacontext/" \;
                
                # Log operation
                echo "$(date '+%F %H:%M') - Moved message ${tid} to backup directory"
                
                # Release semaphore
                echo >&6
            } &
        fi
    done
    
    wait
}

5. Result Summary Module

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env bash

# Result summary and statistics
result_summary() {
    local backup_dir="/opt/app/backup/$(date +%F)/"
    local log_file="/opt/app/logs/cleanup_$(date +%F).log"
    
    # Count processing results
    local msg_count=$(find "${backup_dir}/msg/" -type f | wc -l)
    local context_count=$(find "${backup_dir}/dacontext/" -type f | wc -l)
    
    # Record summary information
    {
        echo "=========================================="
        echo "Cleanup operation completed at: $(date '+%F %H:%M:%S')"
        echo "Messages processed: ${msg_count}"
        echo "Contexts processed: ${context_count}"
        echo "Backup directory: ${backup_dir}"
        echo "=========================================="
    } | tee -a "${log_file}"
}

Key Bash Techniques

1. Clever Use of Pipes and FIFOs

Key technique: Use named pipes for inter-process communication to avoid disk IO bottlenecks

bash
1
2
3
4
5
6
# Create bidirectional communication pipe
mkfifo /tmp/process_fifo && exec 3<> /tmp/process_fifo

# Use pipe for asynchronous communication
echo "task_data" >&3
read response <&3

Practical application:

  • Producer writes tasks to the pipe
  • Consumer reads tasks from the pipe
  • Controls concurrency through semaphores

2. Efficient Text Processing with awk

Key technique: Use awk for complex text parsing and filtering

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Efficient log parsing
awk -F"[][]" '
$0 ~ /\[cmd:remote,tid/ {
    tid=gettid($2)
    result=getresult($2)
    if(result=="Normal-Spam"){
        print tid
    }
}
' /var/log/rcptstat.log

Performance advantages:

  • Single-pass multi-level parsing
  • Built-in regular expression engine
  • Memory-efficient processing

3. Process Control and Semaphores

Key technique: Use file descriptors to implement semaphore mechanisms

bash
1
2
3
4
5
6
7
8
# Create semaphore control
for i in {1..10}; do echo; done >&6

# Acquire semaphore
read -u6

# Release semaphore
echo >&6

4. Conditional Logic and Error Handling

Key technique: Comprehensive error checking and handling mechanisms

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Safe file operations
if [[ -f "${file}" ]]; then
    process_file "${file}"
else
    log_error "File does not exist: ${file}"
fi

# Log function
log_error() {
    echo "[$(date '+%F %H:%M')] ERROR: $1" | tee -a "${LOG_FILE}"
}

5. Path Handling and Variable Safety

Key technique: Safe path construction and variable handling

bash
1
2
3
4
5
6
7
# Safe path construction
BASE_DIR="/opt/app"
TARGET_DIR="${BASE_DIR}/backup/$(date +%F)"
mkdir -p "${TARGET_DIR}"

# Safe variable reference
echo "Processing file: ${file:-unspecified}"

Complete Integration Example

Main Control Script

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env bash

#!/usr/bin/env bash
# Message queue cleanup tool - producer-consumer pattern
# Batch cleanup of messages flagged by anti-spam engine

# Configuration parameters
PROCESS_NUMS=10
WARNING_THRESHOLD=4000
CRITICAL_THRESHOLD=10000
HELL_THRESHOLD=16000

# Base path configuration
BASE_DIR="/opt/app"
LOG_DIR="${BASE_DIR}/logs"
BACKUP_DIR="${BASE_DIR}/backup/$(date +%F)"
MSG_DIR="${BASE_DIR}/var/mda/msg"
CONTEXT_DIR="${BASE_DIR}/var/mda/dacontext"
RCPT_LOG_DIR="${BASE_DIR}/logs/rcptstat"

# Log files
CLEANUP_LOG="${LOG_DIR}/cleanup_$(date +%F).log"
MOVE_LOG="${LOG_DIR}/move_$(date +%F).log"

# Create required directories
mkdir -p "${LOG_DIR}" "${BACKUP_DIR}"{msg,dacontext}

# Log functions
log_error() {
    echo "[$(date '+%F %H:%M:%S')] ERROR: $1" | tee -a "${CLEANUP_LOG}"
}

log_info() {
    echo "[$(date '+%F %H:%M:%S')] INFO: $1" | tee -a "${CLEANUP_LOG}"
}

# Initialize pipes and semaphores
init_pipes() {
    # Process control pipe
    local process_fifo="/tmp/$$process.fifo"
    mkfifo "${process_fifo}" && exec 6<>"${process_fifo}" && rm "${process_fifo}"
    for ((i=0; i<${PROCESS_NUMS}; i++)); do echo; done >&6
    
    # Task delivery pipe
    local task_fifo="/tmp/$$task.fifo"
    mkfifo "${task_fifo}" && exec 7<>"${task_fifo}" && rm "${task_fifo}"
}

# Cleanup functions
cleanup_functions() {
    # Cleanup pipe file descriptors
    exec 6>&-
    exec 7>&-
    log_info "Cleanup complete"
}

# Main function
main() {
    log_info "Starting message queue cleanup check"
    
    # Initialize pipes
    init_pipes
    
    # Check queue status
    local queue_count=$(find "${MSG_DIR}" -type f | wc -l)
    log_info "Current queue file count: ${queue_count}"
    
    # Select processing strategy based on queue status
    if [[ ${queue_count} -gt ${HELL_THRESHOLD} ]]; then
        log_error "Queue count ${queue_count} has exceeded threshold ${HELL_THRESHOLD}, starting hell mode"
        emergency_cleanup
    elif [[ ${queue_count} -gt ${CRITICAL_THRESHOLD} ]]; then
        log_error "Queue count ${queue_count} has exceeded threshold ${CRITICAL_THRESHOLD}, starting critical mode"
        critical_cleanup
    elif [[ ${queue_count} -gt ${WARNING_THRESHOLD} ]]; then
        log_error "Queue count ${queue_count} has exceeded threshold ${WARNING_THRESHOLD}, starting quick mode"
        quick_cleanup
    else
        log_info "Queue count is normal, no action needed"
    fi
    
    # Cleanup resources
    cleanup_functions
    result_summary
}

# Execute main function
main "$@"

Summary

This article detailed a set of advanced Bash programming solutions based on the producer-consumer pattern for solving message queue backlog issues. Through the combined use of multi-process concurrency, pipe communication, and efficient awk parsing, efficient queue cleanup is achieved.

Core Advantages

  1. Excellent performance: Multi-process concurrent processing improves efficiency several times over
  2. Clear architecture: Producer-consumer separation, easy to maintain and extend
  3. Stable and reliable: Comprehensive error handling and resource management mechanisms
  4. Flexible configuration: Supports different thresholds and concurrency strategies

Applicable Scenarios

  • Batch cleanup of messages flagged by anti-spam engines
  • Queue backlog handling caused by malicious attacks
  • Queue recovery after system failures
  • Regular queue maintenance and optimization

Extension Suggestions

  1. Monitoring integration: Integrate with monitoring systems for automated alerting and handling
  2. Strategy optimization: Adjust concurrency numbers and thresholds based on actual business needs
  3. Documentation improvement: Establish detailed operation manuals and emergency plans
  4. Testing and verification: Conduct regular stress tests to ensure solution effectiveness

This solution not only solves the current problem but also provides a reusable architectural pattern that can be applied in various message processing scenarios. Through proper configuration and continuous optimization, it effectively ensures the stable operation of messaging systems.


This document is based on actual production environment experience and aims to share practical technical solutions. In actual applications, please adjust and test according to your specific environment.