19 #if __cplusplus >= 201103L
39 for(
auto const* it = l.begin(); it != l.end() && count > 0; ++it, count--)
54 #ifdef ZTH_HAVE_LIBZMQ
61 this->
setName(
"zth::ZmqPoller");
77 item.events |= ZMQ_POLLIN;
79 item.events |= ZMQ_POLLOUT;
83 int ZmqPoller::doPoll(
int timeout_ms,
typename base::PollItemList& items) noexcept
85 zth_assert(items.size() <= (
size_t)std::numeric_limits<int>::max());
86 int res = zmq_poll(items.data(), (
int)items.size(),
timeout_ms);
93 for(
size_t i = 0; res > 0 && i < items.size(); i++) {
94 zmq_pollitem_t
const& item = items[i];
100 if(item.revents & ZMQ_POLLIN)
102 if(item.revents & ZMQ_POLLOUT)
104 if(item.revents & ZMQ_POLLERR)
108 this->event(revents, i);
114 #elif defined(ZTH_HAVE_POLL)
119 PollPoller::PollPoller()
121 setName(
"zth::PollPoller");
124 PollPoller::~PollPoller()
129 int PollPoller::init(Pollable
const& p,
struct pollfd& item) noexcept
133 PollableFd
const& pfd =
static_cast<PollableFd const&
>(p);
139 item.events |= POLLIN;
141 item.events |= POLLOUT;
143 item.events |= POLLPRI;
145 item.events |= POLLHUP;
150 int PollPoller::doPoll(
int timeout_ms, base::PollItemList& items) noexcept
159 for(
size_t i = 0; res > 0 && i < items.size(); i++) {
160 struct pollfd const& item = items[i];
166 if(item.revents & POLLIN)
168 if(item.revents & POLLOUT)
170 if(item.revents & POLLERR)
172 if(item.revents & POLLPRI)
174 if(item.revents & POLLHUP)
192 setName(
"zth::NoPoller");
217 this->
setName(
"zth::PollerClient");
220 #if __cplusplus >= 201103L
225 # ifdef __cpp_exceptions
230 throw std::bad_alloc();
232 throw std::runtime_error(
"");
242 m_pollables.
reserve(m_pollables.size() + more);
243 m_result.reserve(m_pollables.capacity());
249 m_result.reserve(m_pollables.size() + 1u);
250 m_pollables.push_back(&p);
251 zth_dbg(io,
"[%s] poll %p for events 0x%lu", id_str(), &p, p.events.to_ulong());
260 for(
size_t i = m_pollables.size(); i > 0; i--) {
261 Pollable*& pi = m_pollables[i - 1u];
263 pi = m_pollables.back();
264 m_pollables.pop_back();
276 if(m_pollables.empty()) {
285 for(
size_t i = 0; i < m_pollables.size(); i++)
286 p.
add(*m_pollables[i],
this);
289 zth_dbg(io,
"[%s] polling %u items for %d ms", id_str(), (
unsigned)m_pollables.size(),
296 if(!m_result.empty()) {
298 }
else if(res && res != EAGAIN) {
304 zth_dbg(io,
"[%s] hand-off to server", id_str());
308 zth_dbg(io,
"[%s] hand-off to server with timeout", id_str());
316 for(
size_t i = m_pollables.size(); i > 0; i--)
317 p.
remove(*m_pollables[i - 1u],
this);
320 if(m_result.empty() && !res)
324 zth_dbg(io,
"[%s] poll returned %s", id_str(),
err(res).c_str());
326 zth_dbg(io,
"[%s] poll returned %u pollable(s)", id_str(),
327 (
unsigned)m_result.size());
335 m_result.push_back(&p);
337 if(!m_wait.timeout().isNull()) {
340 }
else if(m_wait.hasFiber()) {
348 return m_pollables.
empty();
The poller to be used by a fiber.
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
virtual ~PollerClient() override
virtual int remove(Pollable &p) noexcept override
Remove a pollable object.
bool empty() const noexcept final
Checks if there is any pollable registered.
virtual Result const & poll(int timeout_ms=-1) noexcept override
Poll.
virtual int add(Pollable &p) noexcept override
Add a pollable object.
virtual void event(Pollable &p) noexcept override
Indicate that the given pollable got an event.
virtual int add(Pollable &p) noexcept=0
Add a pollable object.
Abstract base class of a Poller server.
virtual int remove(Pollable &p, Client *client) noexcept=0
Remove the given Pollable, belonging to the given client.
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int add(Pollable &p, Client *client) noexcept=0
Add a Pollable, that belongs to a given client.
Convenient wrapper around struct timespec that contains a time interval.
void setName(string const &name)
A single fiber per Worker that manages sleeping and blocked fibers.
void wakeup(TimedWaitable &w)
PollerServerBase & poller()
Waiter & waiter() noexcept
A PollerServer that uses zmq_poll().
virtual ~ZmqPoller() override
void clear() noexcept
Erase all elements from the vector.
bool empty() const noexcept
Check if the vector is empty.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
constexpr auto timeout_ms
A guard that is enabled after a ms milliseconds after entering the current state.
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
void resume(Fiber &fiber)
string err(int e)
Return a string like strerror() does, but as a zth::string.
A pollable file descriptor.
int fd
The file descriptor.
void * socket
The ZeroMQ socket.
static const unsigned long PollIn
static const unsigned long PollOut
Events events
Events to poll.
static const unsigned long PollErr
static const unsigned long PollPri
std::bitset< FlagCount > Events
Type of events and revents.
static const unsigned long PollHup
#define zth_assert(expr)
assert(), but better integrated in Zth.