ZeroMQ High Water Mark (HWM) Not Dropping Messages? Troubleshooting the Durable Subscribers Example in Python
ZeroMQ (also known as ØMQ) is a powerful, lightweight messaging library that enables high-performance inter-process communication (IPC) and distributed systems. It abstracts complex networking details, allowing developers to focus on building scalable, resilient message-passing architectures. Two critical concepts in ZeroMQ are High Water Mark (HWM) and durable subscribers.
- High Water Mark (HWM): A flow-control mechanism that limits the number of messages queued in a ZeroMQ socket’s buffer. When the buffer reaches the HWM threshold, ZeroMQ either drops new messages or blocks the sender, depending on the socket type. This prevents memory overflow and ensures system stability under backpressure.
- Durable Subscribers: Subscribers designed to avoid message loss, even if they disconnect or temporarily fall behind. This is often achieved via persistence layers, subscription tracking, or reconnection logic.
A common frustration among ZeroMQ users is when HWM fails to drop messages in durable subscriber setups. Messages pile up, leading to unexpected memory usage or downstream failures. This blog dives into why this happens, how to troubleshoot it, and how to fix it—with a step-by-step Python example.
Table of Contents#
- Introduction to ZeroMQ, HWM, and Durable Subscribers
- The Problem: HWM Not Dropping Messages in Durable Subscribers
- Understanding ZeroMQ HWM: How It Should Work
- Durable Subscribers in ZeroMQ: What You Need to Know
- Troubleshooting Steps: Why Isn’t HWM Dropping Messages?
- Example Walkthrough: Reproducing and Fixing the Issue
- Best Practices for HWM and Durable Subscribers
- Conclusion
- References
Introduction to ZeroMQ, HWM, and Durable Subscribers#
ZeroMQ Basics#
ZeroMQ uses "sockets" (not OS-level sockets) to send/receive messages across inproc (inter-thread), IPC (inter-process), TCP, or PGM (multicast) transports. These sockets are typed (e.g., PUB, SUB, PUSH, PULL) and follow specific messaging patterns (publish-subscribe, pipeline, request-reply, etc.).
High Water Mark (HWM)#
HWM acts as a safety valve for socket buffers. Each ZeroMQ socket has two HWMs:
ZMQ_SNDHWM: Limits the number of outgoing messages queued in the sender’s buffer.ZMQ_RCVHWM: Limits the number of incoming messages queued in the receiver’s buffer.
By default, HWMs are often set to 1000 (varies by ZeroMQ version and socket type). When a socket’s buffer reaches its HWM:
- Sender-side (SNDHWM): For "lossy" sockets like
PUBorPUSH, ZeroMQ drops new messages. For "reliable" sockets likeREQorREP, it blocks the sender until the buffer has space. - Receiver-side (RCVHWM): The receiver’s buffer discards new messages once full (applies to most socket types).
Durable Subscribers#
Standard SUB sockets are "ephemeral": if they disconnect, they miss messages sent during downtime. Durable subscribers solve this by:
- Persisting messages (e.g., to disk) until the subscriber acknowledges receipt.
- Tracking subscriptions (via proxies like
XPUB/XSUB) to resume message flow post-reconnection. - Replaying missed messages from a buffer or database when the subscriber reconnects.
Durable subscribers are critical for use cases like financial trading or IoT, where data loss is unacceptable.
The Problem: HWM Not Dropping Messages in Durable Subscribers#
Imagine this scenario: You set up a PUB socket with ZMQ_SNDHWM=100, expecting it to drop messages once the subscriber falls behind. But when the subscriber is slow (e.g., due to high latency or processing delays), messages keep piling up—no drops occur, and memory usage balloons.
Why? HWM relies on ZeroMQ’s internal buffers filling up. Durable subscriber setups often introduce additional layers (proxies, persistence, reconnection logic) that can bypass or interfere with HWM. Common culprits include:
- Misconfigured HWM on the wrong socket (e.g., setting
RCVHWMinstead ofSNDHWM). - Using socket types that block (not drop) when HWM is hit.
- Durability mechanisms (e.g., persistent queues) storing messages outside ZeroMQ’s buffers, so HWM never triggers.
Understanding ZeroMQ HWM: How It Should Work#
To troubleshoot HWM issues, you first need to understand its core behavior.
HWM Basics#
- Default Values: In ZeroMQ 4.0+, default
SNDHWM/RCVHWMis 1000 for most sockets. Older versions (pre-3.2) had no default HWM, leading to unbounded buffering. - Per-Socket Setting: HWMs are configured per socket with
setsockopt:import zmq context = zmq.Context() pub_socket = context.socket(zmq.PUB) pub_socket.setsockopt(zmq.SNDHWM, 100) # Limit send buffer to 100 messages
When HWM Triggers: Dropping vs. Blocking#
Socket type determines HWM behavior:
- Drops messages:
PUB,PUSH,XPUB,XSUB(loss-tolerant patterns). - Blocks sender:
REQ,REP,DEALER,ROUTER(reliable patterns).
Key Takeaway: If you’re using a PUB socket (which should drop), but HWM isn’t triggering, the issue lies elsewhere (e.g., buffer not filling).
HWM and Socket Topology#
In complex topologies (e.g., with proxies), HWMs apply to each socket in the chain. For example:
- A
PUB→XPUB/XSUB proxy→SUBpipeline has three sockets:PUB(SNDHWM),XPUB(RCVHWM/SNDHWM), andSUB(RCVHWM). If the proxy’sXPUBsocket has a highSNDHWM, it may buffer messages even if thePUB’s HWM is low.
Durable Subscribers in ZeroMQ: What You Need to Know#
ZeroMQ doesn’t natively support durable subscribers, so developers often use XPUB/XSUB proxies to track subscriptions. Here’s how it works:
How Durable Subscribers Work (with XPUB/XSUB Proxy)#
XSUBSocket: Connects toPUBsockets and collects messages.XPUBSocket: AcceptsSUBconnections and forwards messages fromXSUB.- Proxy Thread: Binds
XSUBandXPUB, relaying messages between them.
To make subscribers durable, the proxy can:
- Track subscriptions (via
XPUBsocket’s subscription messages). - Persist messages to a queue (e.g., Redis, RabbitMQ) if subscribers are slow.
Potential Pitfalls with HWM#
- Proxy Buffering: If the
XPUBsocket’sSNDHWMis higher than thePUB’s, it will buffer messages, preventing thePUBfrom hitting its HWM. - Persistence Overrides HWM: If messages are offloaded to a persistent queue (not ZeroMQ’s buffer), the
PUBsocket’s buffer never fills, so HWM isn’t triggered.
Troubleshooting Steps: Why Isn’t HWM Dropping Messages?#
Let’s systematically diagnose why HWM isn’t dropping messages in your durable subscriber setup.
Step 1: Verify HWM Configuration#
First, confirm HWMs are set correctly.
Check Current HWM Values:
Use getsockopt to verify settings:
print("PUB SNDHWM:", pub_socket.getsockopt(zmq.SNDHWM))
print("SUB RCVHWM:", sub_socket.getsockopt(zmq.RCVHWM)) Common Mistakes:
- Forgetting to set
SNDHWMon thePUBsocket (using default 1000, which may not fill under light load). - Setting
RCVHWMon theSUBsocket but notSNDHWMon thePUB(HWM drops happen on the sender side forPUB).
Step 2: Check Socket Types and Topology#
Ensure you’re using a socket type that drops messages when HWM is hit.
- Use
PUBfor Sending:PUBis lossy and drops messages atSNDHWM. AvoidPUSHwithPULLif you need drops (thoughPUSHalso drops, it’s for pipeline patterns). - Avoid Reliable Sockets:
REQ/REPorDEALER/ROUTERblock senders, so HWM won’t drop messages—they’ll hang the sender instead.
Step 3: Analyze Message Flow and Backpressure#
HWM only triggers if the receiver is slow enough to cause backpressure.
Simulate a Slow Subscriber:
Add a delay in the subscriber’s message processing loop to create backpressure:
while True:
msg = sub_socket.recv()
time.sleep(0.1) # Slow down processing Monitor Buffer Fill:
Use zmq_socket_monitor to track socket events (e.g., ZMQ_EVENT_SND_HWM when SNDHWM is hit):
pub_socket.monitor("inproc://monitor", zmq.EVENT_ALL)
monitor_socket = context.socket(zmq.PAIR)
monitor_socket.connect("inproc://monitor")
while True:
event = monitor_socket.recv_multipart()
if event[0] == zmq.EVENT_SND_HWM:
print("SNDHWM hit! Messages dropping.") Step 4: Inspect ZMQ_SNDHWM vs ZMQ_RCVHWM#
SNDHWM(Publisher): Controls outgoing buffer. If the subscriber is slow, thePUB’s buffer fills, triggering drops.RCVHWM(Subscriber): Controls incoming buffer. If the subscriber’s buffer is full, it drops messages, but this doesn’t affect the publisher’s HWM.
Fix: Set SNDHWM on the PUB socket (not just RCVHWM on SUB).
Step 5: Consider Message Size and Batching#
Small messages (e.g., 1KB) take longer to fill the buffer than large ones (e.g., 1MB). If your HWM is 100, sending 100 1MB messages fills the buffer faster than 100 1KB messages.
Test with Large Messages:
Send large payloads to accelerate buffer filling:
large_msg = b"x" * 1024 * 1024 # 1MB message
pub_socket.send(large_msg) Step 6: Check for Misconfigured Durability Mechanisms#
Durability tools like persistent queues or proxies can bypass ZeroMQ’s buffers:
- Proxies (
XPUB/XSUB): If the proxy’sXPUBsocket has a highSNDHWM, it will buffer messages, preventing thePUBfrom hitting its HWM. SetXPUB’sSNDHWMto match thePUB’s. - Persistent Storage: If messages are saved to a database/queue (not ZeroMQ’s buffer), the
PUBsocket’s buffer never fills. HWM only applies to in-memory buffers!
Example Walkthrough: Reproducing and Fixing the Issue#
Let’s build a durable subscriber example in Python, reproduce the HWM issue, then fix it.
Setup: Durable Subscriber with XPUB/XSUB Proxy#
We’ll use an XPUB/XSUB proxy to track subscriptions (enabling durability) and a PUB socket sending messages to a slow SUB subscriber.
Step 1: The Faulty Code (HWM Not Dropping)#
import zmq
import time
from threading import Thread
def proxy_thread(context):
# Proxy to track subscriptions (durable subscriber support)
xsub = context.socket(zmq.XSUB)
xsub.bind("tcp://*:5556")
xpub = context.socket(zmq.XPUB)
xpub.bind("tcp://*:5557")
# ❌ Missing HWM configuration on XPUB!
zmq.proxy(xsub, xpub)
def publisher(context):
pub = context.socket(zmq.PUB)
pub.connect("tcp://localhost:5556") # Connect to XSUB
# ❌ Setting RCVHWM instead of SNDHWM
pub.setsockopt(zmq.RCVHWM, 100)
for i in range(1000):
pub.send(f"Message {i}".encode())
print(f"Sent: Message {i}")
time.sleep(0.01) # Send 100 messages/sec
def subscriber(context):
sub = context.socket(zmq.SUB)
sub.connect("tcp://localhost:5557") # Connect to XPUB
sub.setsockopt_string(zmq.SUBSCRIBE, "")
# Slow subscriber (1 message/sec)
for _ in range(1000):
msg = sub.recv()
print(f"Received: {msg.decode()}")
time.sleep(1) # Simulate processing delay
if __name__ == "__main__":
context = zmq.Context()
# Start proxy, publisher, subscriber
Thread(target=proxy_thread, args=(context,)).start()
Thread(target=publisher, args=(context,)).start()
subscriber(context) Problem Analysis#
- HWM Set on Wrong Socket: The publisher sets
RCVHWM(receiver HWM) instead ofSNDHWM(sender HWM). - Proxy’s XPUB Lacks HWM: The
XPUBsocket in the proxy has defaultSNDHWM=1000, so it buffers messages, preventing the publisher’s buffer from filling.
Step 2: Fixing the Issue#
Correct HWM Configuration:
- Set
SNDHWM=100on thePUBandXPUBsockets. - Use
SNDHWM(notRCVHWM) on the publisher.
Fixed Code:
def proxy_thread(context):
xsub = context.socket(zmq.XSUB)
xsub.bind("tcp://*:5556")
xpub = context.socket(zmq.XPUB)
xpub.bind("tcp://*:5557")
xpub.setsockopt(zmq.SNDHWM, 100) # ✅ Set XPUB SNDHWM
zmq.proxy(xsub, xpub)
def publisher(context):
pub = context.socket(zmq.PUB)
pub.connect("tcp://localhost:5556")
pub.setsockopt(zmq.SNDHWM, 100) # ✅ Set PUB SNDHWM
for i in range(1000):
try:
pub.send(f"Message {i}".encode(), zmq.DONTWAIT) # Non-blocking send
except zmq.Again:
print(f"Dropped: Message {i} (HWM hit)")
time.sleep(0.01)
# Subscriber code remains the same (slow processing) Key Fixes:#
PUBandXPUBSNDHWM=100: Both sockets now drop messages when their buffers fill.- Non-Blocking Send (
zmq.DONTWAIT): Explicitly catchzmq.Againexceptions to log dropped messages (optional but helpful for debugging).
Result#
With these fixes, the PUB and XPUB sockets will drop messages once their buffers reach 100. You’ll see "Dropped: Message X" in the logs, confirming HWM is working.
Best Practices for HWM and Durable Subscribers#
To avoid HWM issues in durable setups:
- Set HWM on All Relevant Sockets:
PUB,XPUB, and any intermediate proxies should have matchingSNDHWMvalues. - Use Non-Blocking Sends with Retries: For
PUB, usezmq.DONTWAITand handlezmq.Againto log drops. - Monitor HWM Events: Use
zmq_socket_monitorto trackZMQ_EVENT_SND_HWMand alert on sustained backpressure. - Separate Durability from ZeroMQ Buffers: Use persistent queues (e.g., Redis Streams) alongside HWM. Let ZeroMQ handle in-memory buffering (with HWM), and the queue handle long-term persistence.
- Test Under Load: Simulate slow subscribers and large messages to validate HWM behavior before production.
Conclusion#
HWM not dropping messages in durable subscriber setups is rarely a ZeroMQ bug—it’s almost always misconfiguration or misunderstanding of HWM’s interaction with durability mechanisms. By verifying HWM settings, socket types, and message flow, you can ensure HWM drops messages when needed, preventing memory bloat and ensuring system stability.
Remember: HWM protects ZeroMQ’s in-memory buffers. For durability, pair it with external persistence tools, but don’t rely on them to enforce backpressure—HWM is your first line of defense.