Zth (libzth)
Loading...
Searching...
No Matches
zmq.cpp

0MQ integration

0MQ integrationThis program will print:

zmq main fiber
Connecting to hello world server...
Sending Hello 10...
Received Hello
Received World 10
Sending Hello 9...
Received Hello
Received World 9
Sending Hello 8...
Received Hello
Received World 8
Sending Hello 7...
Received Hello
Received World 7
Sending Hello 6...
Received Hello
Received World 6
Sending Hello 5...
Received Hello
Received World 5
Sending Hello 4...
Received Hello
Received World 4
Sending Hello 3...
Received Hello
Received World 3
Sending Hello 2...
Received Hello
Received World 2
Sending Hello 1...
Received Hello
Received World 1
Stopping client...
Stopping server...
/*
* SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
*
* SPDX-License-Identifier: CC0-1.0
*/
#include <zth>
#include <csignal>
static sig_atomic_t volatile stop = 0;
static int check(int res)
{
if(res < 0) {
int e = errno;
printf("%s\n", zth::err(errno).c_str());
errno = e;
}
return res;
}
void server()
{
void* responder = zth_zmq_socket(ZMQ_REP);
int rc __attribute__((unused)) = zmq_bind(responder, "inproc://hello");
zth_assert(!rc);
while(true) {
char buffer[10] = {};
switch(check(zmq_recv(responder, buffer, sizeof(buffer), 0))) {
case 0:
printf("Stopping server...\n");
check(zmq_send(responder, nullptr, 0, 0));
// fall-through
case -1:
zmq_close(responder);
return;
default:;
}
printf("Received %s\n", buffer);
zth::nap(1); // Do some 'work'
check(zmq_send(responder, static_cast<void const*>("World"), 5, 0));
}
}
void client(int messages)
{
printf("Connecting to hello world server...\n");
void* requester = zth_zmq_socket(ZMQ_REQ);
zmq_connect(requester, "inproc://hello");
while(messages > 0 && !stop) {
char buffer[10];
printf("Sending Hello %d...\n", messages);
check(zmq_send(requester, static_cast<void const*>("Hello"), 5, 0));
check(zmq_recv(requester, buffer, sizeof(buffer), 0));
printf("Received World %d\n", messages);
messages--;
}
// Terminator.
printf("Stopping client...\n");
check(zmq_send(requester, nullptr, 0, 0));
check(zmq_recv(requester, nullptr, 0, 0));
zmq_close(requester);
}
static void handler(int UNUSED_PAR(sig))
{
char const* msg = "got interrupted\n";
for(ssize_t len = (ssize_t)strlen(msg), c = 1; c > 0 && len > 0;
c = write(fileno(stderr), msg, (unsigned long)len), len -= c, msg += c)
;
stop = 1;
}
int main_fiber(int argc, char** argv)
{
printf("zmq main fiber\n");
#ifdef ZTH_OS_WINDOWS
signal(SIGINT, handler);
#else
struct sigaction sa = {};
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sa.sa_handler = handler;
if(sigaction(SIGINT, &sa, nullptr) == -1)
(void)fprintf(stderr, "sigaction() failed; %s", zth::err(errno).c_str());
#endif
int messages = 10;
if(argc > 1) {
messages = (int)strtol(argv[1], nullptr, 0);
if(messages == 0)
messages = 1;
}
zth::fiber(server);
zth::fiber(client, messages);
return 0;
}
int main_fiber(int argc, char **argv)
Definition main.cpp:11
void * zth_zmq_socket(int type)
Fiber-aware wrapper for 0MQ's zmq_socket().
Definition zmq.h:61
void nap(Timestamp const &sleepUntil)
Sleep until the given time stamp.
Definition waiter.h:274
fiber_type< F >::fiber fiber(F &&f, Args &&... args)
Create and start a new fiber.
Definition async.h:1192
constexpr auto stop
Action to return from Fsm::run().
Definition fsm14.h:2049
ssize_t write(int fd, void const *buf, size_t count)
Like normal write(), but forwards the poll() to the zth::Waiter in case it would block.
Definition io.cpp:52
#define UNUSED_PAR(name)
Definition macros.h:78
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:686
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:217
#define zmq_send
Definition zmq.h:132
#define zmq_recv
Definition zmq.h:133