Zth (libzth)
zmq.cpp

0MQ integration

#include <zth>
#include <csignal>
static sig_atomic_t volatile stop = 0;
static int check(int res)
{
if(res < 0) {
int e = errno;
printf("%s\n", zth::err(errno).c_str());
errno = e;
}
return res;
}
void server()
{
void* responder = zth_zmq_socket(ZMQ_REP);
int rc __attribute__((unused)) = zmq_bind(responder, "inproc://hello");
zth_assert(!rc);
while(true) {
char buffer[10] = {};
switch(check(zmq_recv(responder, buffer, sizeof(buffer), 0))) {
case 0:
printf("Stopping server...\n");
check(zmq_send(responder, nullptr, 0, 0));
// fall-through
case -1:
zmq_close(responder);
return;
}
printf("Received %s\n", buffer);
zth::nap(1); // Do some 'work'
check(zmq_send(responder, (void*)"World", 5, 0));
}
}
zth_fiber(server)
void client(int messages)
{
printf("Connecting to hello world server...\n");
void* requester = zth_zmq_socket(ZMQ_REQ);
zmq_connect(requester, "inproc://hello");
while(messages > 0 && !stop) {
char buffer[10];
printf("Sending Hello %d...\n", messages);
check(zmq_send(requester, (void*)"Hello", 5, 0));
check(zmq_recv(requester, buffer, sizeof(buffer), 0));
printf("Received World %d\n", messages);
messages--;
}
// Terminator.
printf("Stopping client...\n");
check(zmq_send(requester, nullptr, 0, 0));
check(zmq_recv(requester, nullptr, 0, 0));
zmq_close(requester);
}
zth_fiber(client)
static void handler(int UNUSED_PAR(sig))
{
char const* msg = "got interrupted\n";
for(ssize_t len = (ssize_t)strlen(msg), c = 1; c > 0 && len > 0;
c = write(fileno(stderr), msg, (unsigned long)len), len -= c, msg += c)
;
stop = 1;
}
int main_fiber(int argc, char** argv)
{
printf("zmq main fiber\n");
#ifdef ZTH_OS_WINDOWS
signal(SIGINT, handler);
#else
struct sigaction sa = {};
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = handler;
if(sigaction(SIGINT, &sa, nullptr) == -1)
fprintf(stderr, "sigaction() failed; %s", zth::err(errno).c_str());
#endif
int messages = 10;
if(argc > 1) {
messages = atoi(argv[1]);
if(messages == 0)
messages = 1;
}
async server();
async client(messages);
return 0;
}
int main_fiber(int argc, char **argv)
Definition: main.cpp:14
void * zth_zmq_socket(int type)
Fiber-aware wrapper for 0MQ's zmq_socket().
Definition: zmq.h:64
void nap(Timestamp const &sleepUntil)
Sleep until the given time stamp.
Definition: waiter.h:277
#define zth_fiber(...)
Prepare every given function to become a fiber by async.
Definition: async.h:765
#define async
Run a function as a new fiber.
Definition: async.h:789
constexpr auto stop
Action to return from Fsm::run().
Definition: fsm14.h:2037
ssize_t write(int fd, void const *buf, size_t count)
Like normal write(), but forwards the poll() to the zth::Waiter in case it would block.
Definition: io.cpp:54
#define UNUSED_PAR(name)
Definition: macros.h:79
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition: util.h:617
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition: util.h:236
#define zmq_send
Definition: zmq.h:135
#define zmq_recv
Definition: zmq.h:136