7#define ZTH_REDIRECT_ZMQ 0
20static void*& zmq_context_storage()
23 static void* zmq_ctx =
nullptr;
27# ifndef ZTH_OS_WINDOWS
28static void zmq_global_deinit()
30 void*& zmq_ctx = zmq_context_storage();
34 zth_dbg(zmq,
"destroy context");
40static void* zmq_global_init()
45 zmq_version(&major, &minor, &patch);
46 zth_dbg(
banner,
"0MQ version is %d.%d.%d", major, minor, patch);
52 zth_abort(
"0MQ context creation failed; %s",
err(errno).c_str());
57# ifndef ZTH_OS_WINDOWS
59 (void)atexit(zmq_global_deinit);
61 zmq_context_storage() = zmq_ctx;
71 void*& zmq_ctx = zmq_context_storage();
73 zmq_ctx = zmq_global_init();
96 int err = res == -1 ? zmq_errno() : 0;
97 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
106 return ::zmq_msg_send(msg, socket, flags | ZMQ_DONTWAIT);
118 int err = res == -1 ? zmq_errno() : 0;
119 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
128 return ::zmq_msg_recv(msg, socket, flags | ZMQ_DONTWAIT);
135int zmq_send(
void* socket,
void const* buf,
size_t len,
int flags)
139 int res =
::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
140 int err = res == -1 ? zmq_errno() : 0;
141 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
150 return ::zmq_send(socket, buf, len, flags | ZMQ_DONTWAIT);
157int zmq_recv(
void* socket,
void* buf,
size_t len,
int flags)
161 int res =
::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
162 int err = res == -1 ? zmq_errno() : 0;
163 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
172 return ::zmq_recv(socket, buf, len, flags | ZMQ_DONTWAIT);
184 int err = res == -1 ? zmq_errno() : 0;
185 if(
err != EAGAIN || (flags & ZMQ_DONTWAIT))
194 return ::zmq_send_const(socket, buf, len, flags | ZMQ_DONTWAIT);
200static int const no_zmq __attribute__((unused)) = 0;
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
char const * banner() noexcept
Returns a banner line with version and configuration information.
void * zmq_context()
Returns the (only) 0MQ context, used by all fibers.
cow_string str(T value)
Returns an zth::string representation of the given value.
string err(int e)
Return a string like strerror() does, but as a zth::string.
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp()) noexcept
Put a syscall into the perf output.
A pollable file descriptor.
static const unsigned long PollIn
static const unsigned long PollOut