Zth (libzth)
zmq.cpp
Go to the documentation of this file.
1 /*
2  * Zth (libzth), a cooperative userspace multitasking library.
3  * Copyright (C) 2019-2022 Jochem Rutgers
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
8  */
9 
10 #define ZTH_REDIRECT_ZMQ 0
11 
12 #include <libzth/macros.h>
13 
14 #ifdef ZTH_HAVE_LIBZMQ
15 
16 # include <libzth/poller.h>
17 # include <libzth/waiter.h>
18 # include <libzth/worker.h>
19 # include <libzth/zmq.h>
20 
21 namespace zth {
22 namespace zmq {
23 
24 static void zmq_global_deinit()
25 {
26  zth_dbg(zmq, "destroy context");
28 }
29 
30 static void* zmq_global_init()
31 {
32  int major = 0;
33  int minor = 0;
34  int patch = 0;
35  zmq_version(&major, &minor, &patch);
36  zth_dbg(banner, "0MQ version is %d.%d.%d", major, minor, patch);
37 
38  zth_dbg(zmq, "new context");
39  void* zmq_ctx = zmq_ctx_new();
40 
41  if(!zmq_ctx)
42  zth_abort("0MQ context creation failed; %s", err(errno).c_str());
43 
44  // Only do the deinit this when 0MQ was actually used.
45  atexit(zmq_global_deinit);
46  return zmq_ctx;
47 }
48 
53 void* zmq_context()
54 {
55  static void* zmq_ctx = zmq_global_init();
56  return zmq_ctx;
57 }
58 
64 void* zmq_socket(int type)
65 {
67 }
68 
73 int zmq_msg_send(zmq_msg_t* msg, void* socket, int flags)
74 {
75  perf_syscall("zmq_msg_send()");
76 
77  int res = ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
78  int err = res == -1 ? zmq_errno() : 0;
79  if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
80  return res;
81 
82  zth_dbg(zmq, "[%s] zmq_msg_send(%p) hand-off", zth::currentFiber().str().c_str(), socket);
83 
84  if((errno = poll(PollableFd(socket, Pollable::PollOut))))
85  return -1;
86 
87  return ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
88 }
89 
94 int zmq_msg_recv(zmq_msg_t* msg, void* socket, int flags)
95 {
96  perf_syscall("zmq_msg_recv()");
97 
98  int res = ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
99  int err = res == -1 ? zmq_errno() : 0;
100  if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
101  return res;
102 
103  zth_dbg(zmq, "[%s] zmq_msg_recv(%p) hand-off", zth::currentFiber().str().c_str(), socket);
104 
105  if((errno = poll(PollableFd(socket, Pollable::PollIn))))
106  return -1;
107 
108  return ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
109 }
110 
115 int zmq_send(void* socket, void const* buf, size_t len, int flags)
116 {
117  perf_syscall("zmq_send()");
118 
119  int res = ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
120  int err = res == -1 ? zmq_errno() : 0;
121  if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
122  return res;
123 
124  zth_dbg(zmq, "[%s] zmq_send(%p) hand-off", zth::currentFiber().str().c_str(), socket);
125 
126  if((errno = poll(PollableFd(socket, Pollable::PollOut))))
127  return -1;
128 
129  return ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
130 }
131 
136 int zmq_recv(void* socket, void* buf, size_t len, int flags)
137 {
138  perf_syscall("zmq_recv()");
139 
140  int res = ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
141  int err = res == -1 ? zmq_errno() : 0;
142  if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
143  return res;
144 
145  zth_dbg(zmq, "[%s] zmq_recv(%p) hand-off", zth::currentFiber().str().c_str(), socket);
146 
147  if((errno = poll(PollableFd(socket, Pollable::PollIn))))
148  return -1;
149 
150  return ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
151 }
152 
157 int zmq_send_const(void* socket, void const* buf, size_t len, int flags)
158 {
159  perf_syscall("zmq_send_const()");
160 
161  int res = ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
162  int err = res == -1 ? zmq_errno() : 0;
163  if(err != EAGAIN || (flags & ZMQ_DONTWAIT))
164  return res;
165 
166  zth_dbg(zmq, "[%s] zmq_send_const(%p) hand-off", zth::currentFiber().str().c_str(), socket);
167 
168  if((errno = poll(PollableFd(socket, Pollable::PollOut))))
169  return -1;
170 
171  return ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
172 }
173 
174 } // namespace zmq
175 } // namespace zth
176 #else
177 static int no_zmq __attribute__((unused));
178 #endif // ZTH_HAVE_LIBZMQ
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition: util.cpp:280
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition: worker.h:398
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
Definition: poller.h:783
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition: util.h:210
char const * banner() noexcept
Prints a banner line with version and configuration information.
Definition: util.cpp:34
int zmq_msg_send(zmq_msg_t *msg, void *socket, int flags)
Fiber-aware wrapper for 0MQ's zmq_msg_send().
Definition: zmq.cpp:73
int zmq_recv(void *socket, void *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_recv().
Definition: zmq.cpp:136
void * zmq_socket(int type)
Fiber-aware wrapper for 0MQ's zmq_socket().
Definition: zmq.cpp:64
int zmq_send(void *socket, void const *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_send().
Definition: zmq.cpp:115
int zmq_msg_recv(zmq_msg_t *msg, void *socket, int flags)
Fiber-aware wrapper for 0MQ's zmq_msg_recv().
Definition: zmq.cpp:94
void * zmq_context()
Returns the (only) 0MQ context, used by all fibers.
Definition: zmq.cpp:53
int zmq_send_const(void *socket, void const *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_send_const().
Definition: zmq.cpp:157
Definition: allocator.h:23
cow_string str(T value)
Returns an zth::string representation of the given value.
Definition: util.h:517
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition: util.h:617
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition: perf.h:359
A pollable file descriptor.
Definition: poller.h:138
static const unsigned long PollIn
Definition: poller.h:92
static const unsigned long PollOut
Definition: poller.h:93
#define zmq_send
Definition: zmq.h:135
#define zmq_msg_recv
Definition: zmq.h:134
#define zmq_msg_send
Definition: zmq.h:133
#define zmq_recv
Definition: zmq.h:136
#define zmq_ctx_term(c)
Definition: zmq.h:131
#define zmq_ctx_new
Definition: zmq.h:130
#define zmq_send_const
Definition: zmq.h:137
#define zmq_socket(c, t)
Definition: zmq.h:132