7#define ZTH_REDIRECT_ZMQ 0
20static void zmq_global_deinit()
22 zth_dbg(zmq,
"destroy context");
26static void* zmq_global_init()
31 zmq_version(&major, &minor, &patch);
32 zth_dbg(
banner,
"0MQ version is %d.%d.%d", major, minor, patch);
35 void* zmq_ctx = zmq_ctx_new();
38 zth_abort(
"0MQ context creation failed; %s",
err(errno).c_str());
41 (void)atexit(zmq_global_deinit);
52 static void* zmq_ctx = zmq_global_init();
75 int err = res == -1 ? zmq_errno() : 0;
76 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
85 return ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
97 int err = res == -1 ? zmq_errno() : 0;
98 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
107 return ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
114int zmq_send(
void* socket,
void const* buf,
size_t len,
int flags)
118 int res =
::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
119 int err = res == -1 ? zmq_errno() : 0;
120 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
129 return ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
136int zmq_recv(
void* socket,
void* buf,
size_t len,
int flags)
140 int res =
::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
141 int err = res == -1 ? zmq_errno() : 0;
142 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
151 return ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
163 int err = res == -1 ? zmq_errno() : 0;
164 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
173 return ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
179static int const no_zmq __attribute__((unused)) = 0;
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
char const * banner() noexcept
Prints a banner line with version and configuration information.
int zmq_msg_send(zmq_msg_t *msg, void *socket, int flags)
Fiber-aware wrapper for 0MQ's zmq_msg_send().
int zmq_recv(void *socket, void *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_recv().
void * zmq_socket(int type)
Fiber-aware wrapper for 0MQ's zmq_socket().
int zmq_send(void *socket, void const *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_send().
int zmq_msg_recv(zmq_msg_t *msg, void *socket, int flags)
Fiber-aware wrapper for 0MQ's zmq_msg_recv().
void * zmq_context()
Returns the (only) 0MQ context, used by all fibers.
int zmq_send_const(void *socket, void const *buf, size_t len, int flags)
Fiber-aware wrapper for 0MQ's zmq_send_const().
cow_string str(T value)
Returns an zth::string representation of the given value.
string err(int e)
Return a string like strerror() does, but as a zth::string.
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
A pollable file descriptor.
static const unsigned long PollIn
static const unsigned long PollOut