Zth (libzth)
Loading...
Searching...
No Matches
zmq.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 */
6
7#define ZTH_REDIRECT_ZMQ 0
8
9#include <libzth/zmq.h>
10
11#ifdef ZTH_HAVE_LIBZMQ
12
13# include <libzth/poller.h>
14# include <libzth/waiter.h>
15# include <libzth/worker.h>
16
17namespace zth {
18namespace zmq {
19
20static void zmq_global_deinit()
21{
22 zth_dbg(zmq, "destroy context");
23 zmq_ctx_term(zmq_context());
24}
25
26static void* zmq_global_init()
27{
28 int major = 0;
29 int minor = 0;
30 int patch = 0;
31 zmq_version(&major, &minor, &patch);
32 zth_dbg(banner, "0MQ version is %d.%d.%d", major, minor, patch);
33
34 zth_dbg(zmq, "new context");
35 void* zmq_ctx = zmq_ctx_new();
36
37 if(!zmq_ctx)
38 zth_abort("0MQ context creation failed; %s", err(errno).c_str());
39
40 // Only do the deinit this when 0MQ was actually used.
41 (void)atexit(zmq_global_deinit);
42 return zmq_ctx;
43}
44
50{
51 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
52 static void* zmq_ctx = zmq_global_init();
53 return zmq_ctx;
54}
55
61void* zmq_socket(int type)
62{
63 return ::zmq_socket(zmq_context(), type);
64}
65
70int zmq_msg_send(zmq_msg_t* msg, void* socket, int flags)
71{
72 perf_syscall("zmq_msg_send()");
73
74 int res = ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
75 int err = res == -1 ? zmq_errno() : 0;
76 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
77 return res;
78
79 zth_dbg(zmq, "[%s] zmq_msg_send(%p) hand-off", zth::currentFiber().str().c_str(), socket);
80
81 errno = poll(PollableFd(socket, Pollable::PollOut));
82 if(errno)
83 return -1;
84
85 return ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
86}
87
92int zmq_msg_recv(zmq_msg_t* msg, void* socket, int flags)
93{
94 perf_syscall("zmq_msg_recv()");
95
96 int res = ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
97 int err = res == -1 ? zmq_errno() : 0;
98 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
99 return res;
100
101 zth_dbg(zmq, "[%s] zmq_msg_recv(%p) hand-off", zth::currentFiber().str().c_str(), socket);
102
103 errno = poll(PollableFd(socket, Pollable::PollIn));
104 if(errno)
105 return -1;
106
107 return ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
108}
109
114int zmq_send(void* socket, void const* buf, size_t len, int flags)
115{
116 perf_syscall("zmq_send()");
117
118 int res = ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
119 int err = res == -1 ? zmq_errno() : 0;
120 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
121 return res;
122
123 zth_dbg(zmq, "[%s] zmq_send(%p) hand-off", zth::currentFiber().str().c_str(), socket);
124
125 errno = poll(PollableFd(socket, Pollable::PollOut));
126 if(errno)
127 return -1;
128
129 return ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
130}
131
136int zmq_recv(void* socket, void* buf, size_t len, int flags)
137{
138 perf_syscall("zmq_recv()");
139
140 int res = ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
141 int err = res == -1 ? zmq_errno() : 0;
142 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
143 return res;
144
145 zth_dbg(zmq, "[%s] zmq_recv(%p) hand-off", zth::currentFiber().str().c_str(), socket);
146
147 errno = poll(PollableFd(socket, Pollable::PollIn));
148 if(errno)
149 return -1;
150
151 return ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
152}
153
158int zmq_send_const(void* socket, void const* buf, size_t len, int flags)
159{
160 perf_syscall("zmq_send_const()");
161
162 int res = ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
163 int err = res == -1 ? zmq_errno() : 0;
164 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
165 return res;
166
167 zth_dbg(zmq, "[%s] zmq_send_const(%p) hand-off", zth::currentFiber().str().c_str(), socket);
168
169 errno = poll(PollableFd(socket, Pollable::PollOut));
170 if(errno)
171 return -1;
172
173 return ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
174}
175
176} // namespace zmq
177} // namespace zth
178#else
179static int const no_zmq __attribute__((unused)) = 0;
180#endif // ZTH_HAVE_LIBZMQ
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition util.cpp:334
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition worker.h:399
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
Definition poller.h:784
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:189
char const * banner() noexcept
Prints a banner line with version and configuration information.
Definition util.cpp:37
int zmq_msg_send(zmq_msg_t *msg, void *socket, int flags)
Fiber-aware wrapper for 0MQ's zmq_msg_send().
Definition zmq.cpp:70
int zmq_recv(void *socket, void *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_recv().
Definition zmq.cpp:136
void * zmq_socket(int type)
Fiber-aware wrapper for 0MQ's zmq_socket().
Definition zmq.cpp:61
int zmq_send(void *socket, void const *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_send().
Definition zmq.cpp:114
int zmq_msg_recv(zmq_msg_t *msg, void *socket, int flags)
Fiber-aware wrapper for 0MQ's zmq_msg_recv().
Definition zmq.cpp:92
void * zmq_context()
Returns the (only) 0MQ context, used by all fibers.
Definition zmq.cpp:49
int zmq_send_const(void *socket, void const *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_send_const().
Definition zmq.cpp:158
cow_string str(T value)
Returns an zth::string representation of the given value.
Definition util.h:492
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:675
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition perf.h:361
A pollable file descriptor.
Definition poller.h:135
static const unsigned long PollIn
Definition poller.h:89
static const unsigned long PollOut
Definition poller.h:90