16#if __cplusplus >= 201103L
22int PollerInterface::add(std::initializer_list<std::reference_wrapper<Pollable>> l)
noexcept
37 for(
auto const* it = l.begin(); it != l.end() && count > 0; ++it, count--)
59 this->
setName(
"zth::ZmqPoller");
75 item.events |= ZMQ_POLLIN;
77 item.events |= ZMQ_POLLOUT;
81int ZmqPoller::doPoll(
int timeout_ms, base::PollItemList& items)
noexcept
83 zth_assert(items.size() <= (
size_t)std::numeric_limits<int>::max());
84 int res = zmq_poll(items.data(), (
int)items.size(), timeout_ms);
91 for(
size_t i = 0; res > 0 && i < items.size(); i++) {
92 zmq_pollitem_t
const& item = items[i];
93 Pollable::Events revents = 0;
98 if(item.revents & ZMQ_POLLIN)
99 revents |= Pollable::PollIn;
100 if(item.revents & ZMQ_POLLOUT)
101 revents |= Pollable::PollOut;
102 if(item.revents & ZMQ_POLLERR)
103 revents |= Pollable::PollErr;
106 this->event(revents, i);
112#elif defined(ZTH_HAVE_POLL)
117PollPoller::PollPoller()
119 setName(
"zth::PollPoller");
122PollPoller::~PollPoller()
127int PollPoller::init(Pollable
const& p,
struct pollfd& item)
noexcept
131 PollableFd
const& pfd =
static_cast<PollableFd const&
>(p);
136 if((pfd.events.test(Pollable::PollInIndex)))
137 item.events |= POLLIN;
138 if((pfd.events.test(Pollable::PollOutIndex)))
139 item.events |= POLLOUT;
140 if((pfd.events.test(Pollable::PollPriIndex)))
141 item.events |= POLLPRI;
142 if((pfd.events.test(Pollable::PollHupIndex)))
143 item.events |= POLLHUP;
148int PollPoller::doPoll(
int timeout_ms, base::PollItemList& items)
noexcept
150 int res = ::poll(items.data(), items.size(), timeout_ms);
157 for(
size_t i = 0; res > 0 && i < items.size(); i++) {
158 struct pollfd const& item = items[i];
159 Pollable::Events revents = 0;
164 if(item.revents & POLLIN)
165 revents |= Pollable::PollIn;
166 if(item.revents & POLLOUT)
167 revents |= Pollable::PollOut;
168 if(item.revents & POLLERR)
169 revents |= Pollable::PollErr;
170 if(item.revents & POLLPRI)
171 revents |= Pollable::PollPri;
172 if(item.revents & POLLHUP)
173 revents |= Pollable::PollHup;
190 setName(
"zth::NoPoller");
213PollerClient::PollerClient()
215 this->
setName(
"zth::PollerClient");
218#if __cplusplus >= 201103L
219PollerClient::PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l)
223# ifdef __cpp_exceptions
228 throw std::bad_alloc();
230 throw std::runtime_error(
"");
240 m_pollables.
reserve(m_pollables.size() + more);
241 m_result.reserve(m_pollables.capacity());
247 m_result.reserve(m_pollables.size() + 1U);
248 m_pollables.push_back(&p);
249 zth_dbg(io,
"[%s] poll %p for events 0x%lu", id_str(), &p, p.events.to_ulong());
258 for(
size_t i = m_pollables.size(); i > 0; i--) {
259 Pollable*& pi = m_pollables[i - 1U];
261 pi = m_pollables.back();
262 m_pollables.pop_back();
274 if(m_pollables.empty()) {
283 for(
size_t i = 0; i < m_pollables.size(); i++)
284 p.
add(*m_pollables[i],
this);
287 zth_dbg(io,
"[%s] polling %u items for %d ms", id_str(), (
unsigned)m_pollables.size(),
294 if(!m_result.empty()) {
296 }
else if(res && res != EAGAIN) {
298 }
else if(timeout_ms == 0) {
300 }
else if(timeout_ms < 0) {
302 zth_dbg(io,
"[%s] hand-off to server", id_str());
306 zth_dbg(io,
"[%s] hand-off to server with timeout", id_str());
309 +
TimeInterval(timeout_ms / 1000L, (timeout_ms * 1000000L) % 1000000000L));
314 for(
size_t i = m_pollables.size(); i > 0; i--)
315 p.
remove(*m_pollables[i - 1U],
this);
318 if(m_result.empty() && !res)
322 zth_dbg(io,
"[%s] poll returned %s", id_str(),
err(res).c_str());
324 zth_dbg(io,
"[%s] poll returned %u pollable(s)", id_str(),
325 (
unsigned)m_result.size());
333 m_result.push_back(&p);
335 if(!m_wait.timeout().isNull()) {
338 }
else if(m_wait.hasFiber()) {
344bool PollerClient::empty() const noexcept
346 return m_pollables.
empty();
The poller to be used by a fiber.
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Abstract base class of a Poller server.
virtual int remove(Pollable &p, Client *client) noexcept=0
Remove the given Pollable, belonging to the given client.
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int add(Pollable &p, Client *client) noexcept=0
Add a Pollable, that belongs to a given client.
Convenient wrapper around struct timespec that contains a time interval.
A single fiber per Worker that manages sleeping and blocked fibers.
void wakeup(TimedWaitable &w)
PollerServerBase & poller()
Waiter & waiter() noexcept
A PollerServer that uses zmq_poll().
Change the name of a fiber returned by zth_async.
void clear() noexcept
Erase all elements from the vector.
bool empty() const noexcept
Check if the vector is empty.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
void resume(Fiber &fiber)
string err(int e)
Return a string like strerror() does, but as a zth::string.
A pollable file descriptor.
int fd
The file descriptor.
void * socket
The ZeroMQ socket.
Events events
Events to poll.
#define zth_assert(expr)
assert(), but better integrated in Zth.