Zero MQ
We took a normal TCP socket, injected it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombarded it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex.
—-Pieter Hintjens, CEO of iMatix
精简版:人们受够了套接字!他实在太混乱了!ZMQ就是这个混乱网络世界的救主。
—-笔者
ZMQ是一个高性能的异步消息传递库,提供了一个无需消息代理的消息队列(MQ)。ZMQ通过多种传输方式(TCP, in-process, inter-process, multicast, WebSocket and more)实现了常见的消息传递模式(pub/sub, request/reply, client/server and others)从而使进程间消息传递变得与线程间消息传递一样简单。这样可以保持代码清晰、模块化,并且非常易于扩展。
本质上ZMQ和Redis、RabbitMQ、RocketMQ、Kafka这类的消息中间件并不相同。ZMQ更类似于一个对socket的封装,包含了网络层协议。因此他更快速也无需消息代理(message broker)。当然,这也意味着更底层更复杂(这就是力量的代价:MQ +4/+4,当发生底层错误时消灭程序员)。
一、 ZMQ模型
| [1. Basics | ØMQ - The Guide](https://zguide.zeromq.org/docs/chapter1/#Why-We-Needed-ZeroMQ) |
一文带你入门了解“零之禅“消息队列ZeroMQ-CSDN博客
重头戏!ZeroMQ的管道模式详解:ZMQ_PUSH、ZMQ_PULL_zmq push pull-CSDN博客
1 请求/应答模型(REQ/REP)–1v1
1.1 原理

REQ-REP 套接字的步调一致。客户端循环发出zmq_send(),然后发出 zmq_recv()(如果仅此而已,则只发出一次)。执行任何其他顺序(例如,连续发送两条消息)将导致 send或recv调用返回 -1 代码。同样,服务会根据需要按顺序发出zmq_recv(),然后发出 zmq_send() 。
1.2 服务器端
服务器创建一个响应类型的套接字(稍后您将了解有关请求-响应的更多信息),将其绑定到端口 5555,然后等待消息。您还可以看到我们没有任何配置,我们只是发送字符串。
// Hello World server
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
while (1) {
char buffer [10];
zmq_recv (responder, buffer, 10, 0);
printf ("Received Hello\n");
sleep (1); // Do some 'work'
zmq_send (responder, "World", 5, 0);
}
return 0;
}
1.3 客户端
客户端创建一个请求类型的套接字,连接并开始发送消息。send和方法receive都是阻塞的(默认情况下)。对于接收来说很简单:如果没有消息,该方法将阻塞。对于发送来说,它更复杂,并且取决于套接字类型。对于请求套接字,如果达到高水位或没有连接对等点,该方法将阻塞。
// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];
printf ("Sending Hello %d…\n", request_nbr);
zmq_send (requester, "Hello", 5, 0);
zmq_recv (requester, buffer, 10, 0);
printf ("Received World %d\n", request_nbr);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
#注意:字符串安全性#
C语言中的字符串并不安全!
zmq_send只关注会发送的字符的数量,这意味着它是可以发送带有空字节的字符串的。这会导致不正确的字符串格式—没有安全的终止。如果接收方的buffer中剩余的位置不是空字符(尽管大部分情况下是这样),将会导致字符串读写的错误
zmq_send (requester, "Hello", 6, 0);一般默认的我们认为zmq_send使用的字符串的长度是不带终止符的长度,也就是strlen得到的长度。也就是说,网络中传输的字符串不是C语言中的字符串,需要额外的格式化处理。
我们选择在接收端对接收的字符进行处理。在实际的使用之中,有两种方法:1.每次memset接收缓存为0;2.需要对接收缓存的字符串多读一位然后使用空字符截取,好消息是我们从0开始,所以recv的返回值就是那个多一位,该赋值为空字符的位置。两种方法的思路其实是一样的,为recv到的网络字符添加空字符结尾,使他结束在应该有的位置。
// Receive ZeroMQ string from socket and convert into C string // Chops string at 255 chars, if it's longer static char * s_recv (void *socket) { char buffer [256]; int size = zmq_recv (socket, buffer, 255, 0); if (size == -1) return NULL; if (size > 255) size = 255; buffer [size] = '\0'; /* use strndup(buffer, sizeof(buffer)-1) in *nix */ return strdup (buffer); }可以使用
zhelpers.h头文件对c语言进行安全的收发。点击此处查看具体源码。ps. 不正确的使用send,在router中会导致身份识别的错误请务必注意!
2 发布/订阅模型(PUB/SUB)–1vN
2.1 原理

单向数据分发即服务器将更新流推送到一组客户端。这股更新流可以理解为无始无终永不结束的广播。
2.2 发布者(生产者)
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
#include "zhelpers.h"
int main (void)
{
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == 0);
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
2.3 订阅者(消费者)
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
#include "zhelpers.h"
int main (int argc, char *argv [])
{
// Socket to talk to server
printf ("Collecting updates from weather server...\n");
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
assert (rc == 0);
// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
filter, strlen (filter));
assert (rc == 0);
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("Average temperature for zipcode '%s' was %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}
关键在于 zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter)); 一步为subscriber设置订阅。PUB-SUB 套接字对是异步的。客户端循环执行zmq_recv()(如zmq_hello_world_client果仅此而已,则执行一次)。尝试向 SUB 套接字发送消息将导致错误。同样,服务会根据需要尽可能频繁地执行 zmq_send(),但不得在 PUB 套接字上执行zmq_recv() 。
理论上,对于 ZeroMQ 套接字,哪一端连接和哪一端绑定并不重要。然而,在实践中存在一些未记录的差异。一般的,在PUB端bind在SUB端connect。
在通常的套接字编程中,bind 和conect一般分别用于服务器端和客户端。bind用于服务器监听指定的地址和端口,connect用于客户端连接到指定地址的服务器。
在ZMQ中,在那一段bind或connect并无影响(为什么?)但实际上还是有差异的(毕竟底层的套接字不同)
#注意:缓慢加入问题#
”缓慢加入“问题会导致SUB永远没办法接收到PUB发送的最开始的信息(建立连接需要时间,这段时间中PUB很可能已发出很多信息!)需要对发布者和订阅者的同步。
3 推拉模型(PUSH/PULL)–1vNv1
管道模式或者叫并行流水线(ventilator/worker/sink)
3.1 原理

可同时执行多项任务的发送者ventilator、一组处理任务的工作者worker、从工作进程收集结果的接收者sink。发送者将任务分配给工人并行的处理,工人将各自的成果交给接收者。实质上是push和pull两种套接字上的操作!!本质是单向的套接字接口。
3.2 发送者
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("Total expected cost: %d msec\n", total_msec);
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
当工作者准备好时,发送者发送一个开始标志给sink让其准备好接收工人的成果。这么做的原因是和发布订阅模型一样可能存在丢失的信息,所以必须的先让接收者做好接收的准备,再让其得到工作者发出的信息。
3.3 工作者
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
#include "zhelpers.h"
int main (void)
{
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
printf ("%s.", string); // Show progress
fflush (stdout);
s_sleep (atoi (string)); // Do the work
free (string);
s_send (sender, ""); // Send results to sink
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
工作者得到发送者的任务后停止一段时间然后发出成果给接收者。但这里隐含一个需求——所有的工作者需要同步的开始工作。也就是说我们需要额外的操作实现同步的并行处理。
同时,发送者需要均匀的为每一位工作者分配任务,称为**负载均衡**。
3.4 接收者
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
#include "zhelpers.h"
int main (void)
{
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
// Start our clock now
int64_t start_time = s_clock ();
// Process 100 confirmations
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if (task_nbr % 10 == 0)
printf (":");
else
printf (".");
fflush (stdout);
}
// Calculate and report duration of batch
printf ("Total elapsed time: %d msec\n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_ctx_destroy (context);
return 0;
}**
接收者需要均匀的从工作者处接收成果,也就是**公平排队**的接收机制。
3.5 负载均衡和公平排队

注意:不完全的负载均衡#
和发布/订阅模型一样,流水线模式也有着很严重的同步问题需要研究。
PUSH 套接字无法正确实现负载平衡。如果您同时使用 PUSH 和 PULL,并且您的一个工作进程收到的消息比其他工作进程多得多,这是因为该 PULL 套接字比其他工作进程加入得更快,并且在其他工作进程设法连接之前就获取了大量消息。
如果您想要正确的负载平衡,您可能需要查看 第 3 章 - 高级请求-回复模式中的负载平衡模式。
4 独占套接字对(Exclusive pair)–线程间通信
—-todo:
5 路由模式(Router/Dealer)–NvN
Router/Dealer模式是异步版的的REQ/REP模式。在实际的高访问量高并发性应用当中,往往有者多个服务器共同完成请求任务,甚至承载着不同的任务(微服务?)。此时可以通过一个消息中间人(message broker)(类似于Nginx的反向代理服务器?作用于不同的层次)实现请求应答。
router实际上在使用中更多的起到的是消息中间人的效果!
zeromq中两个dealer 通过一个router进行通信 - fengbohello - 博客园
6 有代理的发布/订阅模型(XPUB/XSUB)
—-todo:
二、ZMQ套接字机制
1 ZMQ上下文的创建和销毁:
在ZMQ中,使用ZMQ上下文管理所有的套接字。ZMQ使用一个称为上下文的结构体管理套接字——一个zmq::ctx_t类型的结构体,用于管理单个进程的所有套接字。一般的我们在一个进程开始时使用zmq_ctx_new()去创建上下文,在进程结束时使用zmq_ctx_term() / zmq_ctx_destroy()去销毁上下文,释放所有资源。如果使用fork,则是在fork之后和子线程的开头创建上下文。一般来说在子线程进行具体的zmq操作,在父线程进行线程管理。
在之前的zmq版本中使用的是
zmq_init()和zmq_term()去创建和销毁上下文。但这样并没有表现出zmq上下文的作用,为了强调这是上下文的创建和销毁而修改了接口。更主要的,这为了提示你还需要释放zmq_msg结构体和套接字才能实现完整的释放,不导致资源泄露。换句话说,实际上ZMQ就是ZMQ上下文。
在ZMQ当中,一个完整的资源释放过程被分为了三个部分:释放zmq_msg_t、释放套接字、释放zmq_ctx_t。
-
释放zmq_msg_t: 在使用时尽量使用
zmq_send()和zmq_recv(),而不是zmq_msg_send()和zmq_msg_recv()。通过这种方法来避免zmq_msg_t结构体的使用。如果非要使用的话,需要在每次使用之后立刻调用zmq_msg_close()来关闭消息结构体,避免内存泄漏。 -
释放套接字: 在连接的套接字使用完毕后,需要及时的使用
zmq_close()关闭套接字,因为上下文的释放仅能在其拥有的所有套接字都释放完毕后进行。此时需要为未关闭的套接字设置一个较小的LINGER值(等待时间,比如1s),然后关闭所有的套接字。 -
释放zmq_ctx_t:
zmq_ctx_destroy()的过程是一个复杂而痛苦的过程,因为上下文释放时可能仍有着悬挂的连接和进行的发送,也就是上下文对应的套接字没有完全释放。此时zmq_ctx_destroy()会一直被挂起。释放zmq上下文的
zmq_ctx_term()的流程是:-
任何当前在“context”内打开的套接字上正在进行的阻塞操作应立即以错误代码
ETERM返回。除了zmq_close()之外,在“context”内打开的套接字上的任何进一步操作都应因错误代码ETERM而失败。 -
中断所有阻塞调用后,
zmq_ctx_term()应阻塞,直到:-
所有在“context”内打开的套接字都已使用
zmq_close()关闭 -
zmq_send()发送的所有消息要么已实际传输到网络对等方,要么套接字的延迟时间已过期(由ZMQ_LINGER套接字选项设置)。
-
-
2 ZMQ上下文对于套接字的管理:
ZMQ上下文对于套接字的管理主要包括:
-
创建和销毁套接字,它们共同构成套接字生命的循环(参见zmq_socket()、zmq_close())。
-
通过设置选项并在必要时检查它们来配置套接字(参见zmq_setsockopt()、zmq_getsockopt())。
-
通过创建 ZeroMQ 连接将套接字插入网络拓扑(参见zmq_bind()、zmq_connect())。
-
通过在套接字上写入和接收消息来使用套接字传输数据(参见zmq_msg_send()、zmq_msg_recv())。
2.1 ZMQ套接字的创建:
—-todo:
2.2 ZMQ 的套接字选项:
—-todo:
2.3 ZMQ套接字绑定和连接:
ZMQ允许不严格的区分bind和connect,这使得其使用更加简单。但是正如之前在ZMQ模式中提到的,他们实际上有着微妙不同,但我们只需要按照标准的方法来使用它就可以避免。一般来说,执行zmq_bind()的节点是***“服务器”***,位于***众所周知的网络地址***上,而执行zmq_connect()的节点是***“客户端”***,具有***未知或任意的网络地址***。因此,我们说“将套接字绑定到端点”和“将套接字连接到端点”,端点是众所周知的网络地址。
2.4 ZMQ的发送和接收:
ZMQ与TCP套接字的区别:
-
可以使用任意传输方式进行传输( inproc、ipc、tcp、pgm或epgm)。zmq_inproc()、zmq_ipc()、zmq_tcp()、zmq_pgm()和zmq_epgm()。
-
一个套接字可能有许多传出连接和许多传入连接。
-
没有zmq_accept () 方法。当套接字绑定到端点时,它会自动开始接受连接。
-
网络连接由ZMQ自动管理,你无法直接处理这些连接。如果网络连接中断(例如,如果对等方消失然后又回来),ZeroMQ 将自动重新连接。
-
ZMQ不是一个能承载协议的中间载体,只能使用ZMQ_ROUTER_RAW套接字选项支持正确的读写HTTP等协议。这意味着他不能兼容现有的协议(like:HTTP,因为它们基于socket而ZMQ对socket进行了封装)。尽管可以使用ZMQ实现类似的协议但他们本质不同(取决于对端的协议,更广泛的设备使用的是普通的协议)。。
-
ZMQ使用一个I/O线程来处理所有的网络连接,处于不断轮询的poll/select之中。
三、ZMQ程序接口
void *zmq_ctx_new ();
-
返回值:
-
成功: 返回ZMQ上下文content,是一个zmq_ctx_t的结构体
-
失败: 返回NULL, 并设置errno
-
void *zmq_ctx_term ();
-
返回值:
-
成功: 返回0
-
失败: 返回-1, 并设置errno
-
三、ZMQ源码结构
https://www.iteye.com/blog/watter1985-1736023
zeromq源码分析笔记之架构(1) - zengzy - 博客园
zmq源代码分析 - mailbox_t_zmq代码-CSDN博客
The Architecture of Open Source Applications (Volume 2)ZeroMQ