博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ZeroMQ的订阅发布(publish-subscribe)模式
阅读量:2425 次
发布时间:2019-05-10

本文共 3752 字,大约阅读时间需要 12 分钟。

ZeroMQ的订阅发布模式是一种单向的数据发布,当客户端向服务端订阅消息之后,服务端便会将产生的消息源源不断的推送给订阅者,本文的示例代码来源于文献[1]示例代码的修改。

发布-订阅图示

这里写图片描述
发布者使用PUB套接字将消息发送到队列中,订阅者使用SUB套接字从队列中源源不断的接收消息。新的订阅者可以随时加入,但之前的消息是无法接收到的;已有的订阅者可以随时退出;订阅者还可以添加“过滤器”用来有选择性的接收消息。

使用方法简介

首先要创建一个上下文环境,然后使用它创建套接字:

void *context = zmq_ctx_new ();

对于服务端来说,使用”ZMQ_PUB”创建socket,并且绑定到一个周知的端口,然后便可以不断的广播消息了:

void *publisher = zmq_socket (context, ZMQ_PUB);int rc = zmq_bind (publisher, "tcp://*:5556");

如果使用TCP连接并且订阅者是慢速的,那么消息将在发布方排队;可以使用高水位标记(High-Water Marks,HWM)来定义缓冲区的大小,在ZeroMQ v2.x版本中HWM默认是无限制的,而在v3.x中默认情况下它是1000。对于PUB套接字,当到达HWM时,将丢弃数据。设置HWM参数:

zmq_setsockopt(publisher, ZMQ_SNDHWM, &nMaxNum, sizeof(nMaxNum));

对于客户端来说,要使用”ZMQ_SUB”创建socket,并且链接(zmq_connect)到待订阅的服务端;此外,要想接收到服务推送的消息,还必须使用zmq_setsockoptZMQ_SUBSCRIBE来配置该订阅。zmq_setsockopt的ZMQ_SUBSCRIBE选项可以带有一个”过滤器“,用以选择性的接收来自服务端的消息。该”过滤器”为空,则接收全部的消息;”过滤器”还可以有多个,它们之间是”or”的关系,即接收满足任一条件的消息。当然也可以使用zmq_setsockopt配置选项ZMQ_UNSUBSCRIBE来取消订阅,示例如下:

void *context = zmq_ctx_new ();void *subscriber = zmq_socket (context, ZMQ_SUB);int rc = zmq_connect (subscriber, "tcp://localhost:5556");char filter1[] = "10001 "; char filter2[] = "20002 ";rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1)); //接收消息的前缀为filter1的消息rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2)); //接收消息的前缀为filter2的消息

接收和发送消息:此处使用的方法是zmq_recv()和zmq_send(),相对于zmq_msg_send()和zmq_msg_recv(),它们会自己调用消息发送和接收的初始化方法等。

int zmq_recv (void *s, void *buf, size_t len, int flags);int zmq_send (void *s, const void *buf, size_t len, int flags);

示例代码

//服务端:#include 
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;int main (){ void *context = zmq_ctx_new (); void *publisher = zmq_socket (context, ZMQ_PUB); int rc = zmq_bind (publisher, "tcp://*:5556"); assert (rc == 0); // Initialize random number generator srand(time(0)); while (1) { int zipcode, temperature, relhumidity; zipcode = rand() % 100000; temperature = rand() % 215 - 80; relhumidity = rand() % 50 + 10; ostringstream os; os << setw(5) << setfill('0')<< zipcode <<" " << temperature <<" "<< relhumidity << "\n"; zmq_send(publisher, os.str().c_str(), strlen(os.str().c_str()), 0); } zmq_close (publisher); zmq_ctx_destroy (context); return 0;}
//客户端:#include 
#include
#include
#include
#include
using namespace std;int main (int argc, char *argv []){ // Socket to talk to server printf ("Collecting updates from weather server...\n"); void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); int rc = zmq_connect (subscriber, "tcp://localhost:5556"); assert (rc == 0); char filter1[] = "10001 "; char filter2[] = "20002 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1)); assert (rc == 0); rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2)); assert (rc == 0); // Process 100 updates int size; char buffer [256]; for (int update_nbr = 0; update_nbr < 100; update_nbr++) { memset(buffer, 0, 256*sizeof(char)); size = zmq_recv (subscriber, buffer, 255, 0); if (size == -1){ cout<< "receiver error , skip this message"<

NOTE:

在文献[1]中指出:

  • 在ZMQ_SUB套接字上执行相对于zmq_msg_send()和在ZMQ_PUB套接字上执行相对于和zmq_msg_recv()同样都是错误的;
  • PUB-SUB具有”slow joiner”症状。”slow joiner”的症状是:即使先启动订阅者,稍等片刻再启动发布者,订阅者也可能错过发布者发送的第一条消息。建立TCP连接需要花费时间,具体取决于网络状况,以及主机到服务端的路由,所以即使多个订阅者同时启动,它们可能也不会收到同样的消息;
  • 订阅者可以使用zmq_connect()同时连接到多个发布者。不同发布者推送的消息将交错到达;
  • 如果一个发布者没有任何订阅者,那么它会简单地丢弃所有的消息;
  • 从ZMQ v3.x开始,在使用连接的协议是tcp或者ipc时,过滤发生在发布方。使用epgm协议,过滤发生在订阅方。但在ZMQ v2.x版本中,所有过滤都发生在订阅方。

[1].《ZeroMQ云时代极速消息通信库》.电子工业出版社,2015.

你可能感兴趣的文章
Redis运维和开发学习笔记(5) 主从复制和sentinel哨兵模式
查看>>
Redis运维和开发学习笔记(6) 监控Redis工作状态-info命令
查看>>
Redis运维和开发学习笔记(7) 内存管理和过期策略
查看>>
Redis源码分析(零)学习路径笔记
查看>>
Redis源码分析(一)redis.c //redis-server.c
查看>>
Redis源码分析(二)redis-cli.c
查看>>
redis源码剖析(三)——基础数据结构
查看>>
redis源码剖析(四)跳表
查看>>
redis源码剖析(五)—— 字符串,列表,哈希,集合,有序集合
查看>>
redis源码剖析(六)—— Redis 数据库、键过期的实现
查看>>
redis源码剖析(七)—— Redis 数据结构dict.c
查看>>
redis源码剖析(八)—— 当你启动Redis的时候,Redis做了什么
查看>>
redis源码剖析(九)—— Redis双链表实现
查看>>
redis源码剖析(十一)—— Redis字符串相关函数实现
查看>>
事务隔离级别动图演示
查看>>
mysql row_id为什么是6字节?为什么是8字节
查看>>
伪随机数和真随机数
查看>>
ps -ef和ps aux
查看>>
Linux中screen的用法
查看>>
linux查看硬盘是不是ssd固态硬盘
查看>>