42 # if __cplusplus >= 201103L
43 # include <functional>
44 # include <initializer_list>
47 # if defined(ZTH_HAVE_POLL) || defined(ZTH_HAVE_LIBZMQ)
48 # define ZTH_HAVE_POLLER
51 # if defined(ZTH_HAVE_LIBZMQ)
53 # elif defined(ZTH_HAVE_POLL)
89 typedef std::bitset<FlagCount>
Events;
147 # ifdef ZTH_HAVE_LIBZMQ
153 # ifdef ZTH_HAVE_LIBZMQ
208 # if __cplusplus >= 201103L
209 int add(std::initializer_list<std::reference_wrapper<Pollable>> l) noexcept;
228 virtual bool empty() const noexcept = 0;
336 template <typename PollItem_>
347 # if __cplusplus >= 201103L
369 p.reserve(m_metaItems.size());
374 for(
size_t i = 0; i < m_metaItems.size(); i++) {
375 MetaItem
const& m = m_metaItems[i];
377 int res = p.add(*m.pollable, m.client);
380 for(
size_t j = i; j > 0; j--)
382 *m_metaItems[j - 1U].pollable,
383 m_metaItems[j - 1U].client);
393 m_metaItems.clear_and_release();
394 m_pollItems.clear_and_release();
401 m_metaItems.reserve(m_metaItems.size() + more);
402 m_pollItems.reserve(m_metaItems.capacity());
408 MetaItem m = {&p, client};
409 m_metaItems.push_back(m);
416 m_pollItems.reserve(m_metaItems.size());
419 m_metaItems.pop_back();
424 zth_dbg(io,
"[%s] added pollable %p for client %s", this->
id_str(), &p,
429 zth_assert(m_metaItems.size() >= m_pollItems.size());
436 return add(p,
nullptr);
442 size_t count = m_metaItems.size();
445 for(i = count; i > 0; i--) {
446 MetaItem& m = m_metaItems[i - 1u];
447 if(m.pollable == &p && m.client == client)
458 if(i < m_pollItems.size()) {
459 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
460 if(i < m_pollItems.size() - 1u) {
461 # if __cplusplus >= 201103L
462 m_pollItems[i] = std::move(m_pollItems.back());
464 m_pollItems[i] = m_pollItems.back();
467 m_pollItems.pop_back();
470 # if __cplusplus >= 201103L
471 m_metaItems[i] = std::move(m_metaItems.back());
473 m_metaItems[i] = m_metaItems.back();
475 m_metaItems.pop_back();
482 zth_assert(m_metaItems.size() >= m_pollItems.size());
491 if(last >= m_metaItems.size()) {
494 for(
size_t i = m_metaItems.size() - last; i < m_pollItems.size(); i++)
495 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
497 m_metaItems.resize(m_metaItems.size() - last);
498 if(m_pollItems.size() > m_metaItems.size())
499 m_pollItems.resize(m_metaItems.size());
505 return remove(p,
nullptr);
515 for(
size_t i = 0; i < m_pollItems.size(); i++)
516 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
524 return m_metaItems.empty();
534 if(m_metaItems.empty()) {
538 zth_dbg(io,
"[%s] polling %u items for %d ms", this->
id_str(),
561 MetaItem
const& m = m_metaItems[index];
562 m.pollable->revents = revents;
565 zth_dbg(io,
"[%s] pollable %p got event 0x%lx", this->
id_str(), m.pollable,
568 m.client->event(*m.pollable);
577 virtual int init(
Pollable const& p, PollItem& item) noexcept = 0;
594 virtual int doPoll(
int timeout_ms, PollItemList& items) noexcept = 0;
601 int mirror() noexcept
605 size_t doInit = m_pollItems.size();
606 m_pollItems.resize(m_metaItems.size());
608 for(
size_t i = doInit; i < m_metaItems.size(); i++)
609 if((res = init(*m_metaItems[i].pollable, m_pollItems[i]))) {
610 m_pollItems.resize(i);
614 zth_assert(m_metaItems.size() == m_pollItems.size());
627 typedef small_vector<MetaItem, PollItemList::prealloc> MetaItemList;
632 MetaItemList m_metaItems;
641 PollItemList m_pollItems;
645 # ifdef ZTH_HAVE_LIBZMQ
662 virtual int init(
Pollable const& p, zmq_pollitem_t& item) noexcept
override;
663 virtual int doPoll(
int timeout_ms,
typename base::PollItemList& items) noexcept
override;
668 # elif defined(ZTH_HAVE_POLL)
681 virtual ~PollPoller()
override;
684 virtual int init(
Pollable const& p,
struct pollfd& item) noexcept
override;
685 virtual int doPoll(
int timeout_ms, base::PollItemList& items) noexcept
override;
698 class NoPoller final :
public PollerServer<int> {
701 typedef PollerServer<int> base;
707 int init(Pollable const& p,
int& item) noexcept final;
708 int doPoll(
int timeout_ms, base::PollItemList& items) noexcept final;
744 # if __cplusplus >= 201103L
746 PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l);
751 virtual void reserve(
size_t more)
override;
757 virtual
void event(
Pollable& p) noexcept override;
759 bool empty() const noexcept final;
782 template <typename P>
787 poller.
add(pollable);
806 if((result[0]->revents & result[0]->events).any()) {
The abstract base class of a Poller client.
virtual ~PollerClientBase() override=default
The poller to be used by a fiber.
virtual ~PollerClient() override
virtual Result const & poll(int timeout_ms=-1) noexcept override
Poll.
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Abstract base class of a poller.
virtual int remove(Pollable &p) noexcept=0
Remove a pollable object.
virtual ~PollerInterface()=default
Dtor.
virtual int add(Pollable &p) noexcept=0
Add a pollable object.
virtual void reserve(size_t more)=0
Reserve memory to add more pollables.
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
virtual ~PollerServerBase() override=default
Poller to be executed by the Waiter.
void operator=(PollerServer const &)=delete
PollerServer(PollerServer &&p)=delete
virtual ~PollerServer() override
int remove(Pollable &p) noexcept final
Remove a pollable object.
void operator=(PollerServer &&p)=delete
int add(Pollable &p) noexcept final
Add a pollable object.
small_vector< PollItem, 4 > PollItemList
Type of list of poll things.
void event(Pollable::Events revents, size_t index) noexcept
Register an revents for the PollItem at the given index.
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
virtual int poll(int timeout_ms) noexcept override
Poll.
PollerServer()=default
Ctor.
int remove(Pollable &p, Client *client) noexcept final
Remove the given Pollable, belonging to the given client.
PollerServer(PollerServer const &)=delete
void clear() noexcept
Clear all Pollables.
int add(Pollable &p, Client *client) noexcept final
Add a Pollable, that belongs to a given client.
bool empty() const noexcept final
Checks if there is any pollable registered.
virtual int migrateTo(PollerServerBase &p) noexcept override
Move all registered Pollables to another server.
void remove(size_t last) noexcept
Remove the last added Pollables.
Keeps track of a process-wide unique ID within the type T.
virtual char const * id_str() const override
UniqueID & operator=(UniqueID const &)=delete
void wakeup(TimedWaitable &w)
Waiter & waiter() noexcept
A PollerServer that uses zmq_poll().
PollerServer< zmq_pollitem_t > base
virtual ~ZmqPoller() override
A simple std::vector, which can contain Prealloc without heap allocation.
bool empty() const noexcept
Check if the vector is empty.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
constexpr auto timeout_ms
A guard that is enabled after a ms milliseconds after entering the current state.
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
ZmqPoller DefaultPollerServer
The poller server, by default instantiated by the zth::Waiter.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
A pollable file descriptor.
constexpr PollableFd(int fd, Events const &e, void *user=nullptr) noexcept
Ctor for a file descriptor.
int fd
The file descriptor.
void * socket
The ZeroMQ socket.
constexpr PollableFd(void *socket, Events const &e, void *user=nullptr) noexcept
Ctor for a ZeroMQ socket.
static const unsigned long PollIn
constexpr Pollable(Events const &e, void *user=nullptr) noexcept
Ctor.
static const unsigned long PollOut
Events events
Events to poll.
static const unsigned long PollErr
static const unsigned long PollPri
void * user_data
User data.
EventsFlags
Flags to be used with events and revents.
Events revents
Returned events by a poll.
std::bitset< FlagCount > Events
Type of events and revents.
static const unsigned long PollHup
#define zth_assert(expr)
assert(), but better integrated in Zth.