ZeroMQ High Water Mark (HWM) Not Dropping Messages? Troubleshooting the Durable Subscribers Example in Python

ZeroMQ (also known as ØMQ) is a powerful, lightweight messaging library that enables high-performance inter-process communication (IPC) and distributed systems. It abstracts complex networking details, allowing developers to focus on building scalable, resilient message-passing architectures. Two critical concepts in ZeroMQ are High Water Mark (HWM) and durable subscribers.

  • High Water Mark (HWM): A flow-control mechanism that limits the number of messages queued in a ZeroMQ socket’s buffer. When the buffer reaches the HWM threshold, ZeroMQ either drops new messages or blocks the sender, depending on the socket type. This prevents memory overflow and ensures system stability under backpressure.
  • Durable Subscribers: Subscribers designed to avoid message loss, even if they disconnect or temporarily fall behind. This is often achieved via persistence layers, subscription tracking, or reconnection logic.

A common frustration among ZeroMQ users is when HWM fails to drop messages in durable subscriber setups. Messages pile up, leading to unexpected memory usage or downstream failures. This blog dives into why this happens, how to troubleshoot it, and how to fix it—with a step-by-step Python example.

Table of Contents#

  1. Introduction to ZeroMQ, HWM, and Durable Subscribers
  2. The Problem: HWM Not Dropping Messages in Durable Subscribers
  3. Understanding ZeroMQ HWM: How It Should Work
  4. Durable Subscribers in ZeroMQ: What You Need to Know
  5. Troubleshooting Steps: Why Isn’t HWM Dropping Messages?
  6. Example Walkthrough: Reproducing and Fixing the Issue
  7. Best Practices for HWM and Durable Subscribers
  8. Conclusion
  9. References

Introduction to ZeroMQ, HWM, and Durable Subscribers#

ZeroMQ Basics#

ZeroMQ uses "sockets" (not OS-level sockets) to send/receive messages across inproc (inter-thread), IPC (inter-process), TCP, or PGM (multicast) transports. These sockets are typed (e.g., PUB, SUB, PUSH, PULL) and follow specific messaging patterns (publish-subscribe, pipeline, request-reply, etc.).

High Water Mark (HWM)#

HWM acts as a safety valve for socket buffers. Each ZeroMQ socket has two HWMs:

  • ZMQ_SNDHWM: Limits the number of outgoing messages queued in the sender’s buffer.
  • ZMQ_RCVHWM: Limits the number of incoming messages queued in the receiver’s buffer.

By default, HWMs are often set to 1000 (varies by ZeroMQ version and socket type). When a socket’s buffer reaches its HWM:

  • Sender-side (SNDHWM): For "lossy" sockets like PUB or PUSH, ZeroMQ drops new messages. For "reliable" sockets like REQ or REP, it blocks the sender until the buffer has space.
  • Receiver-side (RCVHWM): The receiver’s buffer discards new messages once full (applies to most socket types).

Durable Subscribers#

Standard SUB sockets are "ephemeral": if they disconnect, they miss messages sent during downtime. Durable subscribers solve this by:

  • Persisting messages (e.g., to disk) until the subscriber acknowledges receipt.
  • Tracking subscriptions (via proxies like XPUB/XSUB) to resume message flow post-reconnection.
  • Replaying missed messages from a buffer or database when the subscriber reconnects.

Durable subscribers are critical for use cases like financial trading or IoT, where data loss is unacceptable.

The Problem: HWM Not Dropping Messages in Durable Subscribers#

Imagine this scenario: You set up a PUB socket with ZMQ_SNDHWM=100, expecting it to drop messages once the subscriber falls behind. But when the subscriber is slow (e.g., due to high latency or processing delays), messages keep piling up—no drops occur, and memory usage balloons.

Why? HWM relies on ZeroMQ’s internal buffers filling up. Durable subscriber setups often introduce additional layers (proxies, persistence, reconnection logic) that can bypass or interfere with HWM. Common culprits include:

  • Misconfigured HWM on the wrong socket (e.g., setting RCVHWM instead of SNDHWM).
  • Using socket types that block (not drop) when HWM is hit.
  • Durability mechanisms (e.g., persistent queues) storing messages outside ZeroMQ’s buffers, so HWM never triggers.

Understanding ZeroMQ HWM: How It Should Work#

To troubleshoot HWM issues, you first need to understand its core behavior.

HWM Basics#

  • Default Values: In ZeroMQ 4.0+, default SNDHWM/RCVHWM is 1000 for most sockets. Older versions (pre-3.2) had no default HWM, leading to unbounded buffering.
  • Per-Socket Setting: HWMs are configured per socket with setsockopt:
    import zmq  
    context = zmq.Context()  
    pub_socket = context.socket(zmq.PUB)  
    pub_socket.setsockopt(zmq.SNDHWM, 100)  # Limit send buffer to 100 messages  

When HWM Triggers: Dropping vs. Blocking#

Socket type determines HWM behavior:

  • Drops messages: PUB, PUSH, XPUB, XSUB (loss-tolerant patterns).
  • Blocks sender: REQ, REP, DEALER, ROUTER (reliable patterns).

Key Takeaway: If you’re using a PUB socket (which should drop), but HWM isn’t triggering, the issue lies elsewhere (e.g., buffer not filling).

HWM and Socket Topology#

In complex topologies (e.g., with proxies), HWMs apply to each socket in the chain. For example:

  • A PUBXPUB/XSUB proxySUB pipeline has three sockets: PUB (SNDHWM), XPUB (RCVHWM/SNDHWM), and SUB (RCVHWM). If the proxy’s XPUB socket has a high SNDHWM, it may buffer messages even if the PUB’s HWM is low.

Durable Subscribers in ZeroMQ: What You Need to Know#

ZeroMQ doesn’t natively support durable subscribers, so developers often use XPUB/XSUB proxies to track subscriptions. Here’s how it works:

How Durable Subscribers Work (with XPUB/XSUB Proxy)#

  • XSUB Socket: Connects to PUB sockets and collects messages.
  • XPUB Socket: Accepts SUB connections and forwards messages from XSUB.
  • Proxy Thread: Binds XSUB and XPUB, relaying messages between them.

To make subscribers durable, the proxy can:

  • Track subscriptions (via XPUB socket’s subscription messages).
  • Persist messages to a queue (e.g., Redis, RabbitMQ) if subscribers are slow.

Potential Pitfalls with HWM#

  • Proxy Buffering: If the XPUB socket’s SNDHWM is higher than the PUB’s, it will buffer messages, preventing the PUB from hitting its HWM.
  • Persistence Overrides HWM: If messages are offloaded to a persistent queue (not ZeroMQ’s buffer), the PUB socket’s buffer never fills, so HWM isn’t triggered.

Troubleshooting Steps: Why Isn’t HWM Dropping Messages?#

Let’s systematically diagnose why HWM isn’t dropping messages in your durable subscriber setup.

Step 1: Verify HWM Configuration#

First, confirm HWMs are set correctly.

Check Current HWM Values:
Use getsockopt to verify settings:

print("PUB SNDHWM:", pub_socket.getsockopt(zmq.SNDHWM))  
print("SUB RCVHWM:", sub_socket.getsockopt(zmq.RCVHWM))  

Common Mistakes:

  • Forgetting to set SNDHWM on the PUB socket (using default 1000, which may not fill under light load).
  • Setting RCVHWM on the SUB socket but not SNDHWM on the PUB (HWM drops happen on the sender side for PUB).

Step 2: Check Socket Types and Topology#

Ensure you’re using a socket type that drops messages when HWM is hit.

  • Use PUB for Sending: PUB is lossy and drops messages at SNDHWM. Avoid PUSH with PULL if you need drops (though PUSH also drops, it’s for pipeline patterns).
  • Avoid Reliable Sockets: REQ/REP or DEALER/ROUTER block senders, so HWM won’t drop messages—they’ll hang the sender instead.

Step 3: Analyze Message Flow and Backpressure#

HWM only triggers if the receiver is slow enough to cause backpressure.

Simulate a Slow Subscriber:
Add a delay in the subscriber’s message processing loop to create backpressure:

while True:  
    msg = sub_socket.recv()  
    time.sleep(0.1)  # Slow down processing  

Monitor Buffer Fill:
Use zmq_socket_monitor to track socket events (e.g., ZMQ_EVENT_SND_HWM when SNDHWM is hit):

pub_socket.monitor("inproc://monitor", zmq.EVENT_ALL)  
monitor_socket = context.socket(zmq.PAIR)  
monitor_socket.connect("inproc://monitor")  
 
while True:  
    event = monitor_socket.recv_multipart()  
    if event[0] == zmq.EVENT_SND_HWM:  
        print("SNDHWM hit! Messages dropping.")  

Step 4: Inspect ZMQ_SNDHWM vs ZMQ_RCVHWM#

  • SNDHWM (Publisher): Controls outgoing buffer. If the subscriber is slow, the PUB’s buffer fills, triggering drops.
  • RCVHWM (Subscriber): Controls incoming buffer. If the subscriber’s buffer is full, it drops messages, but this doesn’t affect the publisher’s HWM.

Fix: Set SNDHWM on the PUB socket (not just RCVHWM on SUB).

Step 5: Consider Message Size and Batching#

Small messages (e.g., 1KB) take longer to fill the buffer than large ones (e.g., 1MB). If your HWM is 100, sending 100 1MB messages fills the buffer faster than 100 1KB messages.

Test with Large Messages:
Send large payloads to accelerate buffer filling:

large_msg = b"x" * 1024 * 1024  # 1MB message  
pub_socket.send(large_msg)  

Step 6: Check for Misconfigured Durability Mechanisms#

Durability tools like persistent queues or proxies can bypass ZeroMQ’s buffers:

  • Proxies (XPUB/XSUB): If the proxy’s XPUB socket has a high SNDHWM, it will buffer messages, preventing the PUB from hitting its HWM. Set XPUB’s SNDHWM to match the PUB’s.
  • Persistent Storage: If messages are saved to a database/queue (not ZeroMQ’s buffer), the PUB socket’s buffer never fills. HWM only applies to in-memory buffers!

Example Walkthrough: Reproducing and Fixing the Issue#

Let’s build a durable subscriber example in Python, reproduce the HWM issue, then fix it.

Setup: Durable Subscriber with XPUB/XSUB Proxy#

We’ll use an XPUB/XSUB proxy to track subscriptions (enabling durability) and a PUB socket sending messages to a slow SUB subscriber.

Step 1: The Faulty Code (HWM Not Dropping)#

import zmq  
import time  
from threading import Thread  
 
def proxy_thread(context):  
    # Proxy to track subscriptions (durable subscriber support)  
    xsub = context.socket(zmq.XSUB)  
    xsub.bind("tcp://*:5556")  
 
    xpub = context.socket(zmq.XPUB)  
    xpub.bind("tcp://*:5557")  
 
    # ❌ Missing HWM configuration on XPUB!  
    zmq.proxy(xsub, xpub)  
 
def publisher(context):  
    pub = context.socket(zmq.PUB)  
    pub.connect("tcp://localhost:5556")  # Connect to XSUB  
 
    # ❌ Setting RCVHWM instead of SNDHWM  
    pub.setsockopt(zmq.RCVHWM, 100)  
 
    for i in range(1000):  
        pub.send(f"Message {i}".encode())  
        print(f"Sent: Message {i}")  
        time.sleep(0.01)  # Send 100 messages/sec  
 
def subscriber(context):  
    sub = context.socket(zmq.SUB)  
    sub.connect("tcp://localhost:5557")  # Connect to XPUB  
    sub.setsockopt_string(zmq.SUBSCRIBE, "")  
 
    # Slow subscriber (1 message/sec)  
    for _ in range(1000):  
        msg = sub.recv()  
        print(f"Received: {msg.decode()}")  
        time.sleep(1)  # Simulate processing delay  
 
if __name__ == "__main__":  
    context = zmq.Context()  
 
    # Start proxy, publisher, subscriber  
    Thread(target=proxy_thread, args=(context,)).start()  
    Thread(target=publisher, args=(context,)).start()  
    subscriber(context)  

Problem Analysis#

  • HWM Set on Wrong Socket: The publisher sets RCVHWM (receiver HWM) instead of SNDHWM (sender HWM).
  • Proxy’s XPUB Lacks HWM: The XPUB socket in the proxy has default SNDHWM=1000, so it buffers messages, preventing the publisher’s buffer from filling.

Step 2: Fixing the Issue#

Correct HWM Configuration:

  • Set SNDHWM=100 on the PUB and XPUB sockets.
  • Use SNDHWM (not RCVHWM) on the publisher.

Fixed Code:

def proxy_thread(context):  
    xsub = context.socket(zmq.XSUB)  
    xsub.bind("tcp://*:5556")  
 
    xpub = context.socket(zmq.XPUB)  
    xpub.bind("tcp://*:5557")  
    xpub.setsockopt(zmq.SNDHWM, 100)  # ✅ Set XPUB SNDHWM  
 
    zmq.proxy(xsub, xpub)  
 
def publisher(context):  
    pub = context.socket(zmq.PUB)  
    pub.connect("tcp://localhost:5556")  
    pub.setsockopt(zmq.SNDHWM, 100)  # ✅ Set PUB SNDHWM  
 
    for i in range(1000):  
        try:  
            pub.send(f"Message {i}".encode(), zmq.DONTWAIT)  # Non-blocking send  
        except zmq.Again:  
            print(f"Dropped: Message {i} (HWM hit)")  
        time.sleep(0.01)  
 
# Subscriber code remains the same (slow processing)  

Key Fixes:#

  • PUB and XPUB SNDHWM=100: Both sockets now drop messages when their buffers fill.
  • Non-Blocking Send (zmq.DONTWAIT): Explicitly catch zmq.Again exceptions to log dropped messages (optional but helpful for debugging).

Result#

With these fixes, the PUB and XPUB sockets will drop messages once their buffers reach 100. You’ll see "Dropped: Message X" in the logs, confirming HWM is working.

Best Practices for HWM and Durable Subscribers#

To avoid HWM issues in durable setups:

  1. Set HWM on All Relevant Sockets: PUB, XPUB, and any intermediate proxies should have matching SNDHWM values.
  2. Use Non-Blocking Sends with Retries: For PUB, use zmq.DONTWAIT and handle zmq.Again to log drops.
  3. Monitor HWM Events: Use zmq_socket_monitor to track ZMQ_EVENT_SND_HWM and alert on sustained backpressure.
  4. Separate Durability from ZeroMQ Buffers: Use persistent queues (e.g., Redis Streams) alongside HWM. Let ZeroMQ handle in-memory buffering (with HWM), and the queue handle long-term persistence.
  5. Test Under Load: Simulate slow subscribers and large messages to validate HWM behavior before production.

Conclusion#

HWM not dropping messages in durable subscriber setups is rarely a ZeroMQ bug—it’s almost always misconfiguration or misunderstanding of HWM’s interaction with durability mechanisms. By verifying HWM settings, socket types, and message flow, you can ensure HWM drops messages when needed, preventing memory bloat and ensuring system stability.

Remember: HWM protects ZeroMQ’s in-memory buffers. For durability, pair it with external persistence tools, but don’t rely on them to enforce backpressure—HWM is your first line of defense.

References#