39# if __cplusplus >= 201103L
41# include <initializer_list>
44# if defined(ZTH_HAVE_POLL) || defined(ZTH_HAVE_LIBZMQ)
45# define ZTH_HAVE_POLLER
48# if defined(ZTH_HAVE_LIBZMQ)
50# elif defined(ZTH_HAVE_POLL)
86 typedef std::bitset<FlagCount>
Events;
144# ifdef ZTH_HAVE_LIBZMQ
150# ifdef ZTH_HAVE_LIBZMQ
205# if __cplusplus >= 201103L
206 int add(std::initializer_list<std::reference_wrapper<Pollable>> l)
noexcept;
225 virtual bool empty() const noexcept = 0;
258 virtual
Result const&
poll(
int timeout_ms = -1) noexcept = 0;
284 virtual
int poll(
int timeout_ms) noexcept = 0;
337template <typename PollItem_>
348# if __cplusplus >= 201103L
370 p.reserve(m_metaItems.size());
375 for(
size_t i = 0; i < m_metaItems.size(); i++) {
376 MetaItem
const& m = m_metaItems[i];
378 int res = p.add(*m.pollable, m.client);
381 for(
size_t j = i; j > 0; j--)
383 *m_metaItems[j - 1U].pollable,
384 m_metaItems[j - 1U].client);
394 m_metaItems.clear_and_release();
395 m_pollItems.clear_and_release();
402 m_metaItems.reserve(m_metaItems.size() + more);
403 m_pollItems.reserve(m_metaItems.capacity());
409 MetaItem m = {&p, client};
410 m_metaItems.push_back(m);
417 m_pollItems.reserve(m_metaItems.size());
420 m_metaItems.pop_back();
425 zth_dbg(io,
"[%s] added pollable %p for client %s", this->
id_str(), &p,
430 zth_assert(m_metaItems.size() >= m_pollItems.size());
437 return add(p,
nullptr);
443 size_t count = m_metaItems.size();
446 for(i = count; i > 0; i--) {
447 MetaItem
const& m = m_metaItems[i - 1U];
448 if(m.pollable == &p && m.client == client)
459 if(i < m_pollItems.size()) {
460 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
461 if(i < m_pollItems.size() - 1U) {
462# if __cplusplus >= 201103L
463 m_pollItems[i] = std::move(m_pollItems.back());
465 m_pollItems[i] = m_pollItems.back();
468 m_pollItems.pop_back();
471# if __cplusplus >= 201103L
472 m_metaItems[i] = std::move(m_metaItems.back());
474 m_metaItems[i] = m_metaItems.back();
476 m_metaItems.pop_back();
483 zth_assert(m_metaItems.size() >= m_pollItems.size());
492 if(last >= m_metaItems.size()) {
495 for(
size_t i = m_metaItems.size() - last; i < m_pollItems.size(); i++)
496 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
498 m_metaItems.resize(m_metaItems.size() - last);
499 if(m_pollItems.size() > m_metaItems.size())
500 m_pollItems.resize(m_metaItems.size());
506 return remove(p,
nullptr);
516 for(
size_t i = 0; i < m_pollItems.size(); i++)
517 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
525 return m_metaItems.empty();
528 virtual int poll(
int timeout_ms)
noexcept override
535 if(m_metaItems.empty()) {
539 zth_dbg(io,
"[%s] polling %u items for %d ms", this->
id_str(),
540 (
unsigned)m_metaItems.size(), timeout_ms);
541 return doPoll(timeout_ms, m_pollItems);
562 MetaItem
const& m = m_metaItems[index];
563 m.pollable->revents = revents;
566 zth_dbg(io,
"[%s] pollable %p got event 0x%lx", this->
id_str(), m.pollable,
569 m.client->event(*m.pollable);
578 virtual int init(
Pollable const& p, PollItem& item)
noexcept = 0;
595 virtual int doPoll(
int timeout_ms, PollItemList& items)
noexcept = 0;
602 int mirror() noexcept
606 size_t doInit = m_pollItems.size();
607 m_pollItems.resize(m_metaItems.size());
609 for(
size_t i = doInit; i < m_metaItems.size(); i++)
610 if((res = init(*m_metaItems[i].pollable, m_pollItems[i]))) {
611 m_pollItems.resize(i);
615 zth_assert(m_metaItems.size() == m_pollItems.size());
628 typedef small_vector<MetaItem, PollItemList::prealloc> MetaItemList;
633 MetaItemList m_metaItems;
642 PollItemList m_pollItems;
646# ifdef ZTH_HAVE_LIBZMQ
663 virtual int init(
Pollable const& p, zmq_pollitem_t& item)
noexcept override;
664 virtual int doPoll(
int timeout_ms, base::PollItemList& items)
noexcept override;
669# elif defined(ZTH_HAVE_POLL)
682 virtual ~PollPoller()
override;
685 virtual int init(
Pollable const& p,
struct pollfd& item)
noexcept override;
686 virtual int doPoll(
int timeout_ms, base::PollItemList& items)
noexcept override;
699class NoPoller final :
public PollerServer<int> {
702 typedef PollerServer<int> base;
708 int init(Pollable const& p,
int& item) noexcept final;
709 int doPoll(
int timeout_ms, base::PollItemList& items) noexcept final;
712typedef NoPoller DefaultPollerServer;
745# if __cplusplus >= 201103L
747 PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l);
752 virtual void reserve(
size_t more)
override;
753 virtual int add(
Pollable& p)
noexcept override;
755 virtual int remove(
Pollable& p)
noexcept override;
757 virtual Result const&
poll(
int timeout_ms = -1) noexcept override;
758 virtual
void event(
Pollable& p) noexcept override;
760 bool empty() const noexcept final;
784int poll(P pollable,
int timeout_ms = -1)
788 poller.
add(pollable);
807 if((result[0]->revents & result[0]->events).any()) {
The abstract base class of a Poller client.
virtual ~PollerClientBase() override=default
The poller to be used by a fiber.
virtual ~PollerClient() override
virtual Result const & poll(int timeout_ms=-1) noexcept override
Poll.
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Abstract base class of a poller.
virtual int remove(Pollable &p) noexcept=0
Remove a pollable object.
virtual ~PollerInterface() override=default
Dtor.
virtual int add(Pollable &p) noexcept=0
Add a pollable object.
virtual void reserve(size_t more)=0
Reserve memory to add more pollables.
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
virtual ~PollerServerBase() override=default
Poller to be executed by the Waiter.
void operator=(PollerServer const &)=delete
PollerServer(PollerServer &&p)=delete
virtual ~PollerServer() override
int remove(Pollable &p) noexcept final
Remove a pollable object.
void operator=(PollerServer &&p)=delete
int add(Pollable &p) noexcept final
Add a pollable object.
small_vector< PollItem, 4 > PollItemList
Type of list of poll things.
void event(Pollable::Events revents, size_t index) noexcept
Register an revents for the PollItem at the given index.
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
virtual int poll(int timeout_ms) noexcept override
Poll.
PollerServer()=default
Ctor.
int remove(Pollable &p, Client *client) noexcept final
Remove the given Pollable, belonging to the given client.
PollerServer(PollerServer const &)=delete
void clear() noexcept
Clear all Pollables.
int add(Pollable &p, Client *client) noexcept final
Add a Pollable, that belongs to a given client.
bool empty() const noexcept final
Checks if there is any pollable registered.
virtual int migrateTo(PollerServerBase &p) noexcept override
Move all registered Pollables to another server.
void remove(size_t last) noexcept
Remove the last added Pollables.
Keeps track of a process-wide unique ID within the type T.
virtual char const * id_str() const override
UniqueID & operator=(UniqueID const &)=delete
void wakeup(TimedWaitable &w)
Waiter & waiter() noexcept
A PollerServer that uses zmq_poll().
PollerServer< zmq_pollitem_t > base
virtual ~ZmqPoller() override
A simple std::vector, which can contain Prealloc without heap allocation.
bool empty() const noexcept
Check if the vector is empty.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
ZmqPoller DefaultPollerServer
A pollable file descriptor.
constexpr PollableFd(void *s, Events const &e, void *user=nullptr) noexcept
Ctor for a ZeroMQ socket.
constexpr PollableFd(int f, Events const &e, void *user=nullptr) noexcept
Ctor for a file descriptor.
int fd
The file descriptor.
void * socket
The ZeroMQ socket.
static const unsigned long PollIn
constexpr Pollable(Events const &e, void *user=nullptr) noexcept
Ctor.
static const unsigned long PollOut
Events events
Events to poll.
static const unsigned long PollErr
static const unsigned long PollPri
void * user_data
User data.
EventsFlags
Flags to be used with events and revents.
Events revents
Returned events by a poll.
std::bitset< FlagCount > Events
Type of events and revents.
static const unsigned long PollHup
#define zth_assert(expr)
assert(), but better integrated in Zth.