Zth (libzth)
Loading...
Searching...
No Matches
zmq.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 */
6
7#define ZTH_REDIRECT_ZMQ 0
8
9#include <libzth/zmq.h>
10
11#ifdef ZTH_HAVE_LIBZMQ
12
13# include <libzth/poller.h>
14# include <libzth/waiter.h>
15# include <libzth/worker.h>
16
17namespace zth {
18namespace zmq {
19
20static void*& zmq_context_storage()
21{
22 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
23 static void* zmq_ctx = nullptr;
24 return zmq_ctx;
25}
26
27# ifndef ZTH_OS_WINDOWS
28static void zmq_global_deinit()
29{
30 void*& zmq_ctx = zmq_context_storage();
31 if(!zmq_ctx)
32 return;
33
34 zth_dbg(zmq, "destroy context");
35 (void)zmq_ctx_term(zmq_ctx);
36 zmq_ctx = nullptr;
37}
38# endif
39
40static void* zmq_global_init()
41{
42 int major = 0;
43 int minor = 0;
44 int patch = 0;
45 zmq_version(&major, &minor, &patch);
46 zth_dbg(banner, "0MQ version is %d.%d.%d", major, minor, patch);
47
48 zth_dbg(zmq, "new context");
49 void* zmq_ctx = zmq_ctx_new();
50
51 if(!zmq_ctx)
52 zth_abort("0MQ context creation failed; %s", err(errno).c_str());
53
54 // Windows shutdown order can tear down Winsock before process-exit hooks,
55 // which makes libzmq's socket cleanup assert with WSANOTINITIALISED.
56 // Keep explicit cleanup on non-Windows only.
57# ifndef ZTH_OS_WINDOWS
58 // Only do the deinit this when 0MQ was actually used.
59 (void)atexit(zmq_global_deinit);
60# endif
61 zmq_context_storage() = zmq_ctx;
62 return zmq_ctx;
63}
64
70{
71 void*& zmq_ctx = zmq_context_storage();
72 if(!zmq_ctx)
73 zmq_ctx = zmq_global_init();
74 return zmq_ctx;
75}
76
82void* zmq_socket(int type)
83{
84 return ::zmq_socket(zmq_context(), type);
85}
86
91int zmq_msg_send(zmq_msg_t* msg, void* socket, int flags)
92{
93 perf_syscall("zmq_msg_send()");
94
95 int res = ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
96 int err = res == -1 ? zmq_errno() : 0;
97 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
98 return res;
99
100 zth_dbg(zmq, "[%s] zmq_msg_send(%p) hand-off", zth::currentFiber().str().c_str(), socket);
101
102 errno = poll(PollableFd(socket, Pollable::PollOut));
103 if(errno)
104 return -1;
105
106 return ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
107}
108
113int zmq_msg_recv(zmq_msg_t* msg, void* socket, int flags)
114{
115 perf_syscall("zmq_msg_recv()");
116
117 int res = ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
118 int err = res == -1 ? zmq_errno() : 0;
119 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
120 return res;
121
122 zth_dbg(zmq, "[%s] zmq_msg_recv(%p) hand-off", zth::currentFiber().str().c_str(), socket);
123
124 errno = poll(PollableFd(socket, Pollable::PollIn));
125 if(errno)
126 return -1;
127
128 return ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
129}
130
135int zmq_send(void* socket, void const* buf, size_t len, int flags)
136{
137 perf_syscall("zmq_send()");
138
139 int res = ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
140 int err = res == -1 ? zmq_errno() : 0;
141 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
142 return res;
143
144 zth_dbg(zmq, "[%s] zmq_send(%p) hand-off", zth::currentFiber().str().c_str(), socket);
145
146 errno = poll(PollableFd(socket, Pollable::PollOut));
147 if(errno)
148 return -1;
149
150 return ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
151}
152
157int zmq_recv(void* socket, void* buf, size_t len, int flags)
158{
159 perf_syscall("zmq_recv()");
160
161 int res = ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
162 int err = res == -1 ? zmq_errno() : 0;
163 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
164 return res;
165
166 zth_dbg(zmq, "[%s] zmq_recv(%p) hand-off", zth::currentFiber().str().c_str(), socket);
167
168 errno = poll(PollableFd(socket, Pollable::PollIn));
169 if(errno)
170 return -1;
171
172 return ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
173}
174
179int zmq_send_const(void* socket, void const* buf, size_t len, int flags)
180{
181 perf_syscall("zmq_send_const()");
182
183 int res = ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
184 int err = res == -1 ? zmq_errno() : 0;
185 if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
186 return res;
187
188 zth_dbg(zmq, "[%s] zmq_send_const(%p) hand-off", zth::currentFiber().str().c_str(), socket);
189
190 errno = poll(PollableFd(socket, Pollable::PollOut));
191 if(errno)
192 return -1;
193
194 return ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
195}
196
197} // namespace zmq
198} // namespace zth
199#else
200static int const no_zmq __attribute__((unused)) = 0;
201#endif // ZTH_HAVE_LIBZMQ
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition util.cpp:342
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition worker.h:427
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
Definition poller.h:784
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:194
char const * banner() noexcept
Returns a banner line with version and configuration information.
Definition util.cpp:38
void * zmq_context()
Returns the (only) 0MQ context, used by all fibers.
Definition zmq.cpp:69
cow_string str(T value)
Returns an zth::string representation of the given value.
Definition util.h:512
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:701
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp()) noexcept
Put a syscall into the perf output.
Definition perf.h:71
A pollable file descriptor.
Definition poller.h:135
static const unsigned long PollIn
Definition poller.h:89
static const unsigned long PollOut
Definition poller.h:90
#define zmq_send
Definition zmq.h:132
#define zmq_msg_recv
Definition zmq.h:131
#define zmq_msg_send
Definition zmq.h:130
#define zmq_recv
Definition zmq.h:133
#define zmq_ctx_term(c)
Definition zmq.h:128
#define zmq_ctx_new
Definition zmq.h:127
#define zmq_send_const
Definition zmq.h:134
#define zmq_socket(c, t)
Definition zmq.h:129