Zth (libzth)
poller.h
Go to the documentation of this file.
1 #ifndef ZTH_POLLER_H
2 #define ZTH_POLLER_H
3 /*
4  * Zth (libzth), a cooperative userspace multitasking library.
5  * Copyright (C) 2019-2022 Jochem Rutgers
6  *
7  * This Source Code Form is subject to the terms of the Mozilla Public
8  * License, v. 2.0. If a copy of the MPL was not distributed with this
9  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
10  */
11 
29 #include <libzth/macros.h>
30 
31 #ifdef __cplusplus
32 
33 # include <libzth/allocator.h>
34 # include <libzth/waiter.h>
35 # include <libzth/worker.h>
36 
37 # include <bitset>
38 # include <memory>
39 # include <stdexcept>
40 # include <vector>
41 
42 # if __cplusplus >= 201103L
43 # include <functional>
44 # include <initializer_list>
45 # endif
46 
47 # if defined(ZTH_HAVE_POLL) || defined(ZTH_HAVE_LIBZMQ)
48 # define ZTH_HAVE_POLLER
49 # endif
50 
51 # if defined(ZTH_HAVE_LIBZMQ)
52 # include <zmq.h>
53 # elif defined(ZTH_HAVE_POLL)
54 # include <poll.h>
55 # endif
56 
57 namespace zth {
58 
60 // Pollable
61 //
62 
75 struct Pollable {
79  enum EventsFlags {
86  };
87 
89  typedef std::bitset<FlagCount> Events;
90 
91  // unsigned long can be converted implicitly to bitset.
92  static const unsigned long PollIn = 1UL << PollInIndex;
93  static const unsigned long PollOut = 1UL << PollOutIndex;
94  static const unsigned long PollErr = 1UL << PollErrIndex;
95  static const unsigned long PollPri = 1UL << PollPriIndex;
96  static const unsigned long PollHup = 1UL << PollHupIndex;
97 
101  constexpr explicit Pollable(Events const& e, void* user = nullptr) noexcept
102  : user_data(user)
103  , events(e)
104  , revents()
105  {}
106 
112  void* user_data;
113 
122 
129 };
130 
138 struct PollableFd : public Pollable {
145  constexpr PollableFd(int fd, Events const& e, void* user = nullptr) noexcept
146  : Pollable(e, user)
147 # ifdef ZTH_HAVE_LIBZMQ
148  , socket()
149 # endif
150  , fd(fd)
151  {}
152 
153 # ifdef ZTH_HAVE_LIBZMQ
157  constexpr PollableFd(void* socket, Events const& e, void* user = nullptr) noexcept
158  : Pollable(e, user)
159  , socket(socket)
160  , fd()
161  {}
162 
169  void* socket;
170 # endif // ZTH_HAVE_LIBZMQ
171 
178  int fd;
179 };
180 
181 
182 
184 // Poller base classes
185 //
186 
191 class PollerInterface : public UniqueID<PollerInterface> {
192 public:
197 
206  virtual int add(Pollable& p) noexcept = 0;
207 
208 # if __cplusplus >= 201103L
209  int add(std::initializer_list<std::reference_wrapper<Pollable>> l) noexcept;
210 # endif
211 
216  virtual int remove(Pollable& p) noexcept = 0;
217 
223  virtual void reserve(size_t more) = 0;
224 
228  virtual bool empty() const noexcept = 0;
229 };
230 
241 public:
242  virtual ~PollerClientBase() override is_default
243 
248 
261  virtual Result const& poll(int timeout_ms = -1) noexcept = 0;
262 
266  virtual void event(Pollable& p) noexcept = 0;
267 };
268 
274 public:
276 
277  virtual ~PollerServerBase() override is_default
278 
287  virtual int poll(int timeout_ms) noexcept = 0;
288 
293  virtual int migrateTo(PollerServerBase& p) noexcept = 0;
294 
302  virtual int add(Pollable& p, Client* client) noexcept = 0;
303 
311  virtual int remove(Pollable& p, Client* client) noexcept = 0;
312 };
313 
314 
315 
317 // PollerServers
318 //
319 
336 template <typename PollItem_>
338 public:
341 
346 
347 # if __cplusplus >= 201103L
348  PollerServer(PollerServer const&) = delete;
349  void operator=(PollerServer const&) = delete;
350  PollerServer(PollerServer&& p) = delete;
351  void operator=(PollerServer&& p) = delete;
352 # else
353 private:
354  PollerServer(PollerServer const&);
356 
357 public:
358 # endif
359 
360  virtual ~PollerServer() override
361  {
362  // Call clear() in the subclass. The virtual deinit() will be called.
363  zth_assert(empty());
364  }
365 
366  virtual int migrateTo(PollerServerBase& p) noexcept override
367  {
368  try {
369  p.reserve(m_metaItems.size());
370  } catch(...) {
371  return ENOMEM;
372  }
373 
374  for(size_t i = 0; i < m_metaItems.size(); i++) {
375  MetaItem const& m = m_metaItems[i];
376 
377  int res = p.add(*m.pollable, m.client);
378  if(res) {
379  // Rollback
380  for(size_t j = i; j > 0; j--)
381  p.remove(
382  *m_metaItems[j - 1U].pollable,
383  m_metaItems[j - 1U].client);
384 
385  return res;
386  }
387  }
388 
389  clear();
390 
391  // Really release all memory. This object is probably not used
392  // anymore.
393  m_metaItems.clear_and_release();
394  m_pollItems.clear_and_release();
395 
396  return 0;
397  }
398 
399  virtual void reserve(size_t more) override
400  {
401  m_metaItems.reserve(m_metaItems.size() + more);
402  m_pollItems.reserve(m_metaItems.capacity());
403  }
404 
405  int add(Pollable& p, Client* client) noexcept final
406  {
407  try {
408  MetaItem m = {&p, client};
409  m_metaItems.push_back(m);
410  } catch(...) {
411  return ENOMEM;
412  }
413 
414  try {
415  // Reserve space, but do not initialize yet.
416  m_pollItems.reserve(m_metaItems.size());
417  } catch(...) {
418  // Rollback.
419  m_metaItems.pop_back();
420  return ENOMEM;
421  }
422 
423  if(client)
424  zth_dbg(io, "[%s] added pollable %p for client %s", this->id_str(), &p,
425  client->id_str());
426  else
427  zth_dbg(io, "[%s] added pollable %p", this->id_str(), &p);
428 
429  zth_assert(m_metaItems.size() >= m_pollItems.size());
431  return 0;
432  }
433 
434  int add(Pollable& p) noexcept final
435  {
436  return add(p, nullptr);
437  }
438 
439  // cppcheck-suppress constParameter
440  int remove(Pollable& p, Client* client) noexcept final
441  {
442  size_t count = m_metaItems.size();
443 
444  size_t i;
445  for(i = count; i > 0; i--) {
446  MetaItem& m = m_metaItems[i - 1u];
447  if(m.pollable == &p && m.client == client)
448  break;
449  }
450 
451  if(i == 0u)
452  return ESRCH;
453 
454  i--;
455 
456  if(i < count - 1u) {
457  // Not removing the last element, fill the gap.
458  if(i < m_pollItems.size()) {
459  deinit(*m_metaItems[i].pollable, m_pollItems[i]);
460  if(i < m_pollItems.size() - 1u) {
461 # if __cplusplus >= 201103L
462  m_pollItems[i] = std::move(m_pollItems.back());
463 # else
464  m_pollItems[i] = m_pollItems.back();
465 # endif
466  }
467  m_pollItems.pop_back();
468  }
469 
470 # if __cplusplus >= 201103L
471  m_metaItems[i] = std::move(m_metaItems.back());
472 # else
473  m_metaItems[i] = m_metaItems.back();
474 # endif
475  m_metaItems.pop_back();
476  } else {
477  // Drop the last element.
478  remove(1u);
479  }
480 
481  zth_dbg(io, "[%s] removed pollable %p", this->id_str(), &p);
482  zth_assert(m_metaItems.size() >= m_pollItems.size());
483  return 0;
484  }
485 
489  void remove(size_t last) noexcept
490  {
491  if(last >= m_metaItems.size()) {
492  clear();
493  } else {
494  for(size_t i = m_metaItems.size() - last; i < m_pollItems.size(); i++)
495  deinit(*m_metaItems[i].pollable, m_pollItems[i]);
496 
497  m_metaItems.resize(m_metaItems.size() - last);
498  if(m_pollItems.size() > m_metaItems.size())
499  m_pollItems.resize(m_metaItems.size());
500  }
501  }
502 
503  int remove(Pollable& p) noexcept final
504  {
505  return remove(p, nullptr);
506  }
507 
513  void clear() noexcept
514  {
515  for(size_t i = 0; i < m_pollItems.size(); i++)
516  deinit(*m_metaItems[i].pollable, m_pollItems[i]);
517 
518  m_metaItems.clear();
519  m_pollItems.clear();
520  }
521 
522  bool empty() const noexcept final
523  {
524  return m_metaItems.empty();
525  }
526 
527  virtual int poll(int timeout_ms) noexcept override
528  {
529  int res = 0;
530 
531  if((res = mirror()))
532  return res;
533 
534  if(m_metaItems.empty()) {
535  return EINVAL;
536  }
537 
538  zth_dbg(io, "[%s] polling %u items for %d ms", this->id_str(),
539  (unsigned)m_metaItems.size(), timeout_ms);
540  return doPoll(timeout_ms, m_pollItems);
541  }
542 
543 
544 protected:
545  typedef PollItem_ PollItem;
546 
549 
556  // cppcheck-suppress passedByValue
557  void event(Pollable::Events revents, size_t index) noexcept
558  {
559  zth_assert(index < m_metaItems.size());
560 
561  MetaItem const& m = m_metaItems[index];
562  m.pollable->revents = revents;
563 
564  if(revents.any()) {
565  zth_dbg(io, "[%s] pollable %p got event 0x%lx", this->id_str(), m.pollable,
566  revents.to_ulong());
567  if(m.client)
568  m.client->event(*m.pollable);
569  }
570  }
571 
572 private:
577  virtual int init(Pollable const& p, PollItem& item) noexcept = 0;
578 
582  virtual void deinit(Pollable const& UNUSED_PAR(p), PollItem& UNUSED_PAR(item)) noexcept {}
583 
594  virtual int doPoll(int timeout_ms, PollItemList& items) noexcept = 0;
595 
601  int mirror() noexcept
602  {
603  int res = 0;
604 
605  size_t doInit = m_pollItems.size();
606  m_pollItems.resize(m_metaItems.size());
607 
608  for(size_t i = doInit; i < m_metaItems.size(); i++)
609  if((res = init(*m_metaItems[i].pollable, m_pollItems[i]))) {
610  m_pollItems.resize(i);
611  break;
612  }
613 
614  zth_assert(m_metaItems.size() == m_pollItems.size());
615  return res;
616  }
617 
618 private:
622  struct MetaItem {
623  Pollable* pollable;
624  Client* client;
625  };
626 
627  typedef small_vector<MetaItem, PollItemList::prealloc> MetaItemList;
628 
632  MetaItemList m_metaItems;
633 
641  PollItemList m_pollItems;
642 };
643 
644 
645 # ifdef ZTH_HAVE_LIBZMQ
646 // By default, use zmq_poll(). It allows polling everything poll() can do,
647 // including all ZeroMQ sockets.
648 
653 class ZmqPoller : public PollerServer<zmq_pollitem_t> {
655 public:
657 
658  ZmqPoller();
659  virtual ~ZmqPoller() override;
660 
661 private:
662  virtual int init(Pollable const& p, zmq_pollitem_t& item) noexcept override;
663  virtual int doPoll(int timeout_ms, typename base::PollItemList& items) noexcept override;
664 };
665 
667 
668 # elif defined(ZTH_HAVE_POLL)
669 // If we don't have ZeroMQ, use the OS's poll() instead.
670 
675 class PollPoller : public PollerServer<struct pollfd> {
676  ZTH_CLASS_NEW_DELETE(PollPoller)
677 public:
678  typedef PollerServer<struct pollfd> base;
679 
680  PollPoller();
681  virtual ~PollPoller() override;
682 
683 private:
684  virtual int init(Pollable const& p, struct pollfd& item) noexcept override;
685  virtual int doPoll(int timeout_ms, base::PollItemList& items) noexcept override;
686 };
687 
688 typedef PollPoller DefaultPollerServer;
689 
690 # else
691 // No poller available.
692 // Do provide a PollerServer, but it can only return an error upon a poll.
693 
698 class NoPoller final : public PollerServer<int> {
699  ZTH_CLASS_NEW_DELETE(NoPoller)
700 public:
701  typedef PollerServer<int> base;
702 
703  NoPoller();
704  ~NoPoller() final;
705 
706 protected:
707  int init(Pollable const& p, int& item) noexcept final;
708  int doPoll(int timeout_ms, base::PollItemList& items) noexcept final;
709 };
710 
711 typedef NoPoller DefaultPollerServer;
712 # endif // No poller
713 
722 // PollerClient
723 //
724 
735 
737 
738 public:
740  using base::Result;
741 
742  PollerClient();
743 
744 # if __cplusplus >= 201103L
745  // cppcheck-suppress noExplicitConstructor
746  PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l);
747 # endif
748 
749  virtual ~PollerClient() override;
750 
751  virtual void reserve(size_t more) override;
752  virtual int add(Pollable& p) noexcept override;
753  using base::add;
754  virtual int remove(Pollable& p) noexcept override;
755 
756  virtual Result const& poll(int timeout_ms = -1) noexcept override;
757  virtual void event(Pollable& p) noexcept override;
758 
759  bool empty() const noexcept final;
760 
761 private:
763  TimedWaitable m_wait;
764 
766  Pollables m_pollables;
767 
769  Result m_result;
770 };
771 
777 
782 template <typename P>
783 int poll(P pollable, int timeout_ms = -1)
784 {
785  try {
786  Poller poller;
787  poller.add(pollable);
788 
789  while(true) {
790  Poller::Result const& result = poller.poll(timeout_ms);
791 
792  if(result.empty()) {
793  switch(errno) {
794  case EINTR:
795  case EAGAIN:
796  if(timeout_ms == -1)
797  // Retry.
798  continue;
800  default:
801  // Error.
802  return errno;
803  }
804  }
805 
806  if((result[0]->revents & result[0]->events).any()) {
807  // Got it.
808  return 0;
809  }
810 
811  // Hmm, there is another reason we returned...
812  return EIO;
813  }
814  } catch(...) {
815  }
816  return ENOMEM;
817 }
818 
819 } // namespace zth
820 #endif // __cplusplus
821 #endif // ZTH_POLLER_H
The abstract base class of a Poller client.
Definition: poller.h:240
virtual ~PollerClientBase() override=default
The poller to be used by a fiber.
Definition: poller.h:733
virtual ~PollerClient() override
PollerClientBase base
Definition: poller.h:739
virtual Result const & poll(int timeout_ms=-1) noexcept override
Poll.
Definition: poller.cpp:272
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Definition: poller.cpp:246
Abstract base class of a poller.
Definition: poller.h:191
virtual int remove(Pollable &p) noexcept=0
Remove a pollable object.
virtual ~PollerInterface()=default
Dtor.
virtual int add(Pollable &p) noexcept=0
Add a pollable object.
virtual void reserve(size_t more)=0
Reserve memory to add more pollables.
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
Definition: poller.h:273
PollerClientBase Client
Definition: poller.h:275
virtual ~PollerServerBase() override=default
Poller to be executed by the Waiter.
Definition: poller.h:337
void operator=(PollerServer const &)=delete
PollerServer(PollerServer &&p)=delete
virtual ~PollerServer() override
Definition: poller.h:360
int remove(Pollable &p) noexcept final
Remove a pollable object.
Definition: poller.h:503
void operator=(PollerServer &&p)=delete
int add(Pollable &p) noexcept final
Add a pollable object.
Definition: poller.h:434
PollerServerBase base
Definition: poller.h:339
small_vector< PollItem, 4 > PollItemList
Type of list of poll things.
Definition: poller.h:548
void event(Pollable::Events revents, size_t index) noexcept
Register an revents for the PollItem at the given index.
Definition: poller.h:557
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
Definition: poller.h:399
virtual int poll(int timeout_ms) noexcept override
Poll.
Definition: poller.h:527
PollerServer()=default
Ctor.
int remove(Pollable &p, Client *client) noexcept final
Remove the given Pollable, belonging to the given client.
Definition: poller.h:440
PollItem_ PollItem
Definition: poller.h:545
PollerServer(PollerServer const &)=delete
void clear() noexcept
Clear all Pollables.
Definition: poller.h:513
int add(Pollable &p, Client *client) noexcept final
Add a Pollable, that belongs to a given client.
Definition: poller.h:405
bool empty() const noexcept final
Checks if there is any pollable registered.
Definition: poller.h:522
virtual int migrateTo(PollerServerBase &p) noexcept override
Move all registered Pollables to another server.
Definition: poller.h:366
void remove(size_t last) noexcept
Remove the last added Pollables.
Definition: poller.h:489
Keeps track of a process-wide unique ID within the type T.
Definition: util.h:657
virtual char const * id_str() const override
Definition: util.h:751
UniqueID & operator=(UniqueID const &)=delete
void wakeup(TimedWaitable &w)
Definition: waiter.cpp:85
Waiter & waiter() noexcept
Definition: worker.h:101
A PollerServer that uses zmq_poll().
Definition: poller.h:653
PollerServer< zmq_pollitem_t > base
Definition: poller.h:656
virtual ~ZmqPoller() override
A simple std::vector, which can contain Prealloc without heap allocation.
Definition: util.h:1021
bool empty() const noexcept
Check if the vector is empty.
Definition: util.h:1135
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition: worker.h:388
constexpr auto timeout_ms
A guard that is enabled after a ms milliseconds after entering the current state.
Definition: fsm14.h:2069
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
Definition: poller.h:783
ZmqPoller DefaultPollerServer
The poller server, by default instantiated by the zth::Waiter.
Definition: poller.h:666
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition: util.h:210
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition: allocator.h:114
#define is_default
Definition: macros.h:205
#define ZTH_FALLTHROUGH
Definition: macros.h:90
#define UNUSED_PAR(name)
Definition: macros.h:79
Definition: allocator.h:23
A pollable file descriptor.
Definition: poller.h:138
constexpr PollableFd(int fd, Events const &e, void *user=nullptr) noexcept
Ctor for a file descriptor.
Definition: poller.h:145
int fd
The file descriptor.
Definition: poller.h:178
void * socket
The ZeroMQ socket.
Definition: poller.h:169
constexpr PollableFd(void *socket, Events const &e, void *user=nullptr) noexcept
Ctor for a ZeroMQ socket.
Definition: poller.h:157
A pollable thing.
Definition: poller.h:75
static const unsigned long PollIn
Definition: poller.h:92
constexpr Pollable(Events const &e, void *user=nullptr) noexcept
Ctor.
Definition: poller.h:101
static const unsigned long PollOut
Definition: poller.h:93
Events events
Events to poll.
Definition: poller.h:121
static const unsigned long PollErr
Definition: poller.h:94
static const unsigned long PollPri
Definition: poller.h:95
void * user_data
User data.
Definition: poller.h:112
EventsFlags
Flags to be used with events and revents.
Definition: poller.h:79
@ PollOutIndex
Definition: poller.h:81
@ PollPriIndex
Definition: poller.h:83
@ PollErrIndex
Definition: poller.h:82
@ PollInIndex
Definition: poller.h:80
@ PollHupIndex
Definition: poller.h:84
Events revents
Returned events by a poll.
Definition: poller.h:128
std::bitset< FlagCount > Events
Type of events and revents.
Definition: poller.h:89
static const unsigned long PollHup
Definition: poller.h:96
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition: util.h:236