Zth (libzth)
poller.cpp
Go to the documentation of this file.
1 /*
2  * Zth (libzth), a cooperative userspace multitasking library.
3  * Copyright (C) 2019-2022 Jochem Rutgers
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
8  */
9 
10 #include <libzth/poller.h>
11 
12 namespace zth {
13 
14 
16 // PollerInterface
17 //
18 
19 #if __cplusplus >= 201103L
25 int PollerInterface::add(std::initializer_list<std::reference_wrapper<Pollable>> l) noexcept
26 {
27  try {
28  reserve(l.size());
29  } catch(...) {
30  return ENOMEM;
31  }
32 
33  int res = 0;
34  size_t count = 0;
35 
36  for(Pollable& p : l) {
37  if((res = add(p))) {
38  // Rollback.
39  for(auto const* it = l.begin(); it != l.end() && count > 0; ++it, count--)
40  remove(*it);
41  break;
42  }
43 
44  // Success.
45  count++;
46  }
47 
48  return res;
49 }
50 #endif // C++11
51 
52 
53 
54 #ifdef ZTH_HAVE_LIBZMQ
56 // ZmqPoller
57 //
58 
60 {
61  this->setName("zth::ZmqPoller");
62 }
63 
65 
66 int ZmqPoller::init(Pollable const& p, zmq_pollitem_t& item) noexcept
67 {
68  // Zth only has PollableFds, so casting is safe.
69  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
70  PollableFd const& pfd = static_cast<PollableFd const&>(p);
71 
72  item.socket = pfd.socket;
73  item.fd = pfd.fd;
74 
75  item.events = 0;
76  if((pfd.events.test(Pollable::PollInIndex)))
77  item.events |= ZMQ_POLLIN;
78  if((pfd.events.test(Pollable::PollOutIndex)))
79  item.events |= ZMQ_POLLOUT;
80  return 0;
81 }
82 
83 int ZmqPoller::doPoll(int timeout_ms, typename base::PollItemList& items) noexcept
84 {
85  zth_assert(items.size() <= (size_t)std::numeric_limits<int>::max());
86  int res = zmq_poll(items.data(), (int)items.size(), timeout_ms);
87  if(res < 0)
88  return zmq_errno();
89  if(res == 0)
90  // Timeout.
91  return 0;
92 
93  for(size_t i = 0; res > 0 && i < items.size(); i++) {
94  zmq_pollitem_t const& item = items[i];
95  Pollable::Events revents = 0;
96 
97  if(item.revents) {
98  res--;
99 
100  if(item.revents & ZMQ_POLLIN) // NOLINT(hicpp-signed-bitwise)
101  revents |= Pollable::PollIn;
102  if(item.revents & ZMQ_POLLOUT) // NOLINT(hicpp-signed-bitwise)
103  revents |= Pollable::PollOut;
104  if(item.revents & ZMQ_POLLERR) // NOLINT(hicpp-signed-bitwise)
105  revents |= Pollable::PollErr;
106  }
107 
108  this->event(revents, i);
109  }
110 
111  return 0;
112 }
113 
114 #elif defined(ZTH_HAVE_POLL)
116 // PollPoller
117 //
118 
119 PollPoller::PollPoller()
120 {
121  setName("zth::PollPoller");
122 }
123 
124 PollPoller::~PollPoller()
125 {
126  clear();
127 }
128 
129 int PollPoller::init(Pollable const& p, struct pollfd& item) noexcept
130 {
131  // Zth only has PollableFds, so casting is safe.
132  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
133  PollableFd const& pfd = static_cast<PollableFd const&>(p);
134 
135  item.fd = pfd.fd;
136 
137  item.events = 0;
138  if((pfd.events.test(Pollable::PollInIndex)))
139  item.events |= POLLIN;
140  if((pfd.events.test(Pollable::PollOutIndex)))
141  item.events |= POLLOUT;
142  if((pfd.events.test(Pollable::PollPriIndex)))
143  item.events |= POLLPRI;
144  if((pfd.events.test(Pollable::PollHupIndex)))
145  item.events |= POLLHUP;
146 
147  return 0;
148 }
149 
150 int PollPoller::doPoll(int timeout_ms, base::PollItemList& items) noexcept
151 {
152  int res = ::poll(items.data(), items.size(), timeout_ms);
153  if(res < 0)
154  return errno;
155  if(res == 0)
156  // Timeout.
157  return 0;
158 
159  for(size_t i = 0; res > 0 && i < items.size(); i++) {
160  struct pollfd const& item = items[i];
161  Pollable::Events revents = 0;
162 
163  if(item.revents) {
164  res--;
165 
166  if(item.revents & POLLIN)
167  revents |= Pollable::PollIn;
168  if(item.revents & POLLOUT)
169  revents |= Pollable::PollOut;
170  if(item.revents & POLLERR)
171  revents |= Pollable::PollErr;
172  if(item.revents & POLLPRI)
173  revents |= Pollable::PollPri;
174  if(item.revents & POLLHUP)
175  revents |= Pollable::PollHup;
176  }
177 
178  event(revents, i);
179  }
180 
181  return 0;
182 }
183 
184 
185 #else
187 // NoPoller
188 //
189 
190 NoPoller::NoPoller()
191 {
192  setName("zth::NoPoller");
193 }
194 
195 NoPoller::~NoPoller() is_default
196 
197 int NoPoller::init(Pollable const& UNUSED_PAR(p), int& UNUSED_PAR(item)) noexcept
198 {
199  return EINVAL;
200 }
201 
202 int NoPoller::doPoll(int UNUSED_PAR(timeout_ms), base::PollItemList& UNUSED_PAR(items)) noexcept
203 {
204  return ENOSYS;
205 }
206 
207 #endif // Poller method
208 
209 
210 
212 // PollerClient
213 //
214 
216 {
217  this->setName("zth::PollerClient");
218 }
219 
220 #if __cplusplus >= 201103L
221 PollerClient::PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l)
222 {
223  errno = add(l);
224 
225 # ifdef __cpp_exceptions
226  switch(errno) {
227  case 0:
228  break;
229  case ENOMEM:
230  throw std::bad_alloc();
231  default:
232  throw std::runtime_error("");
233  }
234 # endif
235 }
236 #endif
237 
239 
240 void PollerClient::reserve(size_t more)
241 {
242  m_pollables.reserve(m_pollables.size() + more);
243  m_result.reserve(m_pollables.capacity());
244 }
245 
246 int PollerClient::add(Pollable& p) noexcept
247 {
248  try {
249  m_result.reserve(m_pollables.size() + 1u);
250  m_pollables.push_back(&p);
251  zth_dbg(io, "[%s] poll %p for events 0x%lu", id_str(), &p, p.events.to_ulong());
252  return 0;
253  } catch(...) {
254  return ENOMEM;
255  }
256 }
257 
259 {
260  for(size_t i = m_pollables.size(); i > 0; i--) {
261  Pollable*& pi = m_pollables[i - 1u];
262  if(pi == &p) {
263  pi = m_pollables.back();
264  m_pollables.pop_back();
265  return 0;
266  }
267  }
268 
269  return EAGAIN;
270 }
271 
273 {
274  m_result.clear();
275 
276  if(m_pollables.empty()) {
277  errno = EINVAL;
278  return m_result;
279  }
280 
281  Waiter& waiter = currentWorker().waiter();
282  PollerServerBase& p = waiter.poller();
283 
284  // Add all our pollables to the server.
285  for(size_t i = 0; i < m_pollables.size(); i++)
286  p.add(*m_pollables[i], this);
287 
288  // Go do the poll.
289  zth_dbg(io, "[%s] polling %u items for %d ms", id_str(), (unsigned)m_pollables.size(),
290  timeout_ms);
291 
292  // First try, in the current fiber's context.
293  m_wait = TimedWaitable();
294  int res = p.poll(0);
295 
296  if(!m_result.empty()) { // NOLINT(bugprone-branch-clone)
297  // Completed already.
298  } else if(res && res != EAGAIN) {
299  // Completed with an error.
300  } else if(timeout_ms == 0) {
301  // Just tried. Got nothing, apparently.
302  } else if(timeout_ms < 0) {
303  // Wait for the event callback.
304  zth_dbg(io, "[%s] hand-off to server", id_str());
305  m_wait.setFiber(currentFiber());
306  suspend();
307  } else {
308  zth_dbg(io, "[%s] hand-off to server with timeout", id_str());
309  m_wait = TimedWaitable(
311  + TimeInterval(timeout_ms / 1000L, (timeout_ms * 1000000L) % 1000000000L));
312  waitUntil(m_wait);
313  }
314 
315  // Remove our pollables from the server.
316  for(size_t i = m_pollables.size(); i > 0; i--)
317  p.remove(*m_pollables[i - 1u], this);
318 
319  // m_result got populated by event().
320  if(m_result.empty() && !res)
321  res = EAGAIN;
322 
323  if(res)
324  zth_dbg(io, "[%s] poll returned %s", id_str(), err(res).c_str());
325  else
326  zth_dbg(io, "[%s] poll returned %u pollable(s)", id_str(),
327  (unsigned)m_result.size());
328 
329  errno = res;
330  return m_result;
331 }
332 
333 void PollerClient::event(Pollable& p) noexcept
334 {
335  m_result.push_back(&p);
336 
337  if(!m_wait.timeout().isNull()) {
338  // Managed by the waiter
339  currentWorker().waiter().wakeup(m_wait);
340  } else if(m_wait.hasFiber()) {
341  // Just suspended.
342  resume(m_wait.fiber());
343  }
344 }
345 
346 bool PollerClient::empty() const noexcept
347 {
348  return m_pollables.empty();
349 }
350 
351 } // namespace zth
The poller to be used by a fiber.
Definition: poller.h:733
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
Definition: poller.cpp:240
virtual ~PollerClient() override
virtual int remove(Pollable &p) noexcept override
Remove a pollable object.
Definition: poller.cpp:258
bool empty() const noexcept final
Checks if there is any pollable registered.
Definition: poller.cpp:346
virtual Result const & poll(int timeout_ms=-1) noexcept override
Poll.
Definition: poller.cpp:272
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Definition: poller.cpp:246
virtual void event(Pollable &p) noexcept override
Indicate that the given pollable got an event.
Definition: poller.cpp:333
virtual int add(Pollable &p) noexcept=0
Add a pollable object.
Abstract base class of a Poller server.
Definition: poller.h:273
virtual int remove(Pollable &p, Client *client) noexcept=0
Remove the given Pollable, belonging to the given client.
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int add(Pollable &p, Client *client) noexcept=0
Add a Pollable, that belongs to a given client.
Convenient wrapper around struct timespec that contains a time interval.
Definition: time.h:52
static Timestamp now()
Definition: time.h:554
void setName(string const &name)
Definition: util.h:730
A single fiber per Worker that manages sleeping and blocked fibers.
Definition: waiter.h:197
void wakeup(TimedWaitable &w)
Definition: waiter.cpp:85
PollerServerBase & poller()
Definition: waiter.cpp:98
Waiter & waiter() noexcept
Definition: worker.h:101
A PollerServer that uses zmq_poll().
Definition: poller.h:653
virtual ~ZmqPoller() override
void clear() noexcept
Erase all elements from the vector.
Definition: util.h:1178
bool empty() const noexcept
Check if the vector is empty.
Definition: util.h:1135
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition: worker.h:388
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition: worker.h:398
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
Definition: waiter.cpp:28
constexpr auto timeout_ms
A guard that is enabled after a ms milliseconds after entering the current state.
Definition: fsm14.h:2069
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
Definition: poller.h:783
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition: util.h:210
#define is_default
Definition: macros.h:205
#define UNUSED_PAR(name)
Definition: macros.h:79
Definition: allocator.h:23
void resume(Fiber &fiber)
Definition: worker.h:470
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition: util.h:617
void suspend()
Definition: worker.h:462
A pollable file descriptor.
Definition: poller.h:138
int fd
The file descriptor.
Definition: poller.h:178
void * socket
The ZeroMQ socket.
Definition: poller.h:169
A pollable thing.
Definition: poller.h:75
static const unsigned long PollIn
Definition: poller.h:92
static const unsigned long PollOut
Definition: poller.h:93
Events events
Events to poll.
Definition: poller.h:121
static const unsigned long PollErr
Definition: poller.h:94
static const unsigned long PollPri
Definition: poller.h:95
@ PollOutIndex
Definition: poller.h:81
@ PollPriIndex
Definition: poller.h:83
@ PollInIndex
Definition: poller.h:80
@ PollHupIndex
Definition: poller.h:84
std::bitset< FlagCount > Events
Type of events and revents.
Definition: poller.h:89
static const unsigned long PollHup
Definition: poller.h:96
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition: util.h:236