Zth (libzth)
Loading...
Searching...
No Matches
poller.h
Go to the documentation of this file.
1#ifndef ZTH_POLLER_H
2#define ZTH_POLLER_H
3/*
4 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 */
8
26#include <libzth/macros.h>
27
28#ifdef __cplusplus
29
30# include <libzth/allocator.h>
31# include <libzth/waiter.h>
32# include <libzth/worker.h>
33
34# include <bitset>
35# include <memory>
36# include <stdexcept>
37# include <vector>
38
39# if __cplusplus >= 201103L
40# include <functional>
41# include <initializer_list>
42# endif
43
44# if defined(ZTH_HAVE_POLL) || defined(ZTH_HAVE_LIBZMQ)
45# define ZTH_HAVE_POLLER
46# endif
47
48# if defined(ZTH_HAVE_LIBZMQ)
49# include <zmq.h>
50# elif defined(ZTH_HAVE_POLL)
51# include <poll.h>
52# endif
53
54namespace zth {
55
57// Pollable
58//
59
72struct Pollable {
84
86 typedef std::bitset<FlagCount> Events;
87
88 // unsigned long can be converted implicitly to bitset.
89 static const unsigned long PollIn = 1UL << PollInIndex;
90 static const unsigned long PollOut = 1UL << PollOutIndex;
91 static const unsigned long PollErr = 1UL << PollErrIndex;
92 static const unsigned long PollPri = 1UL << PollPriIndex;
93 static const unsigned long PollHup = 1UL << PollHupIndex;
94
98 constexpr explicit Pollable(Events const& e, void* user = nullptr) noexcept
99 : user_data(user)
100 , events(e)
101 , revents()
102 {}
103
110
119
126};
127
135struct PollableFd : public Pollable {
142 constexpr PollableFd(int f, Events const& e, void* user = nullptr) noexcept
143 : Pollable(e, user)
144# ifdef ZTH_HAVE_LIBZMQ
145 , socket()
146# endif // ZTH_HAVE_LIBZMQ
147 , fd(f)
148 {}
149
150# ifdef ZTH_HAVE_LIBZMQ
154 constexpr PollableFd(void* s, Events const& e, void* user = nullptr) noexcept
155 : Pollable(e, user)
156 , socket(s)
157 , fd()
158 {}
159
166 void* socket;
167# endif // ZTH_HAVE_LIBZMQ
168
175 int fd;
176};
177
178
179
181// Poller base classes
182//
183
188class PollerInterface : public UniqueID<PollerInterface> {
189public:
193 virtual ~PollerInterface() override is_default
194
203 virtual int add(Pollable& p) noexcept = 0;
204
205# if __cplusplus >= 201103L
206 int add(std::initializer_list<std::reference_wrapper<Pollable>> l) noexcept;
207# endif
208
213 virtual int remove(Pollable& p) noexcept = 0;
214
220 virtual void reserve(size_t more) = 0;
221
225 virtual bool empty() const noexcept = 0;
226};
227
238public:
239 virtual ~PollerClientBase() override is_default
240
245
258 virtual Result const& poll(int timeout_ms = -1) noexcept = 0;
259
263 virtual void event(Pollable& p) noexcept = 0;
264};
265
271public:
273
274 virtual ~PollerServerBase() override is_default
275
284 virtual int poll(int timeout_ms) noexcept = 0;
285
290 virtual int migrateTo(PollerServerBase& p) noexcept = 0;
291
299 virtual int add(Pollable& p, Client* client) noexcept = 0;
300
301 using PollerInterface::add;
302
310 virtual int remove(Pollable& p, Client* client) noexcept = 0;
311
312 using PollerInterface::remove;
313};
314
315
316
318// PollerServers
319//
320
337template <typename PollItem_>
339public:
342
347
348# if __cplusplus >= 201103L
349 PollerServer(PollerServer const&) = delete;
350 void operator=(PollerServer const&) = delete;
352 void operator=(PollerServer&& p) = delete;
353# else
354private:
357
358public:
359# endif
360
361 virtual ~PollerServer() override
362 {
363 // Call clear() in the subclass. The virtual deinit() will be called.
364 zth_assert(empty());
365 }
366
367 virtual int migrateTo(PollerServerBase& p) noexcept override
368 {
369 try {
370 p.reserve(m_metaItems.size());
371 } catch(...) {
372 return ENOMEM;
373 }
374
375 for(size_t i = 0; i < m_metaItems.size(); i++) {
376 MetaItem const& m = m_metaItems[i];
377
378 int res = p.add(*m.pollable, m.client);
379 if(res) {
380 // Rollback
381 for(size_t j = i; j > 0; j--)
382 p.remove(
383 *m_metaItems[j - 1U].pollable,
384 m_metaItems[j - 1U].client);
385
386 return res;
387 }
388 }
389
390 clear();
391
392 // Really release all memory. This object is probably not used
393 // anymore.
394 m_metaItems.clear_and_release();
395 m_pollItems.clear_and_release();
396
397 return 0;
398 }
399
400 virtual void reserve(size_t more) override
401 {
402 m_metaItems.reserve(m_metaItems.size() + more);
403 m_pollItems.reserve(m_metaItems.capacity());
404 }
405
406 int add(Pollable& p, Client* client) noexcept final
407 {
408 try {
409 MetaItem m = {&p, client};
410 m_metaItems.push_back(m);
411 } catch(...) {
412 return ENOMEM;
413 }
414
415 try {
416 // Reserve space, but do not initialize yet.
417 m_pollItems.reserve(m_metaItems.size());
418 } catch(...) {
419 // Rollback.
420 m_metaItems.pop_back();
421 return ENOMEM;
422 }
423
424 if(client)
425 zth_dbg(io, "[%s] added pollable %p for client %s", this->id_str(), &p,
426 client->id_str());
427 else
428 zth_dbg(io, "[%s] added pollable %p", this->id_str(), &p);
429
430 zth_assert(m_metaItems.size() >= m_pollItems.size());
432 return 0;
433 }
434
435 int add(Pollable& p) noexcept final
436 {
437 return add(p, nullptr);
438 }
439
440 // cppcheck-suppress[constParameter,constParameterPointer]
441 int remove(Pollable& p, Client* client) noexcept final
442 {
443 size_t count = m_metaItems.size();
444
445 size_t i;
446 for(i = count; i > 0; i--) {
447 MetaItem const& m = m_metaItems[i - 1U];
448 if(m.pollable == &p && m.client == client)
449 break;
450 }
451
452 if(i == 0U)
453 return ESRCH;
454
455 i--;
456
457 if(i < count - 1U) {
458 // Not removing the last element, fill the gap.
459 if(i < m_pollItems.size()) {
460 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
461 if(i < m_pollItems.size() - 1U) {
462# if __cplusplus >= 201103L
463 m_pollItems[i] = std::move(m_pollItems.back());
464# else
465 m_pollItems[i] = m_pollItems.back();
466# endif
467 }
468 m_pollItems.pop_back();
469 }
470
471# if __cplusplus >= 201103L
472 m_metaItems[i] = std::move(m_metaItems.back());
473# else
474 m_metaItems[i] = m_metaItems.back();
475# endif
476 m_metaItems.pop_back();
477 } else {
478 // Drop the last element.
479 remove(1U);
480 }
481
482 zth_dbg(io, "[%s] removed pollable %p", this->id_str(), &p);
483 zth_assert(m_metaItems.size() >= m_pollItems.size());
484 return 0;
485 }
486
490 void remove(size_t last) noexcept
491 {
492 if(last >= m_metaItems.size()) {
493 clear();
494 } else {
495 for(size_t i = m_metaItems.size() - last; i < m_pollItems.size(); i++)
496 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
497
498 m_metaItems.resize(m_metaItems.size() - last);
499 if(m_pollItems.size() > m_metaItems.size())
500 m_pollItems.resize(m_metaItems.size());
501 }
502 }
503
504 int remove(Pollable& p) noexcept final
505 {
506 return remove(p, nullptr);
507 }
508
514 void clear() noexcept
515 {
516 for(size_t i = 0; i < m_pollItems.size(); i++)
517 deinit(*m_metaItems[i].pollable, m_pollItems[i]);
518
519 m_metaItems.clear();
520 m_pollItems.clear();
521 }
522
523 bool empty() const noexcept final
524 {
525 return m_metaItems.empty();
526 }
527
528 virtual int poll(int timeout_ms) noexcept override
529 {
530 int res = 0;
531
532 if((res = mirror()))
533 return res;
534
535 if(m_metaItems.empty()) {
536 return EINVAL;
537 }
538
539 zth_dbg(io, "[%s] polling %u items for %d ms", this->id_str(),
540 (unsigned)m_metaItems.size(), timeout_ms);
541 return doPoll(timeout_ms, m_pollItems);
542 }
543
544
545protected:
546 typedef PollItem_ PollItem;
547
550
557 // cppcheck-suppress passedByValue
558 void event(Pollable::Events revents, size_t index) noexcept
559 {
560 zth_assert(index < m_metaItems.size());
561
562 MetaItem const& m = m_metaItems[index];
563 m.pollable->revents = revents;
564
565 if(revents.any()) {
566 zth_dbg(io, "[%s] pollable %p got event 0x%lx", this->id_str(), m.pollable,
567 revents.to_ulong());
568 if(m.client)
569 m.client->event(*m.pollable);
570 }
571 }
572
573private:
578 virtual int init(Pollable const& p, PollItem& item) noexcept = 0;
579
583 virtual void deinit(Pollable const& UNUSED_PAR(p), PollItem& UNUSED_PAR(item)) noexcept {}
584
595 virtual int doPoll(int timeout_ms, PollItemList& items) noexcept = 0;
596
602 int mirror() noexcept
603 {
604 int res = 0;
605
606 size_t doInit = m_pollItems.size();
607 m_pollItems.resize(m_metaItems.size());
608
609 for(size_t i = doInit; i < m_metaItems.size(); i++)
610 if((res = init(*m_metaItems[i].pollable, m_pollItems[i]))) {
611 m_pollItems.resize(i);
612 break;
613 }
614
615 zth_assert(m_metaItems.size() == m_pollItems.size());
616 return res;
617 }
618
619private:
623 struct MetaItem {
624 Pollable* pollable;
625 Client* client;
626 };
627
628 typedef small_vector<MetaItem, PollItemList::prealloc> MetaItemList;
629
633 MetaItemList m_metaItems;
634
642 PollItemList m_pollItems;
643};
644
645
646# ifdef ZTH_HAVE_LIBZMQ
647// By default, use zmq_poll(). It allows polling everything poll() can do,
648// including all ZeroMQ sockets.
649
654class ZmqPoller : public PollerServer<zmq_pollitem_t> {
656public:
658
659 ZmqPoller();
660 virtual ~ZmqPoller() override;
661
662private:
663 virtual int init(Pollable const& p, zmq_pollitem_t& item) noexcept override;
664 virtual int doPoll(int timeout_ms, base::PollItemList& items) noexcept override;
665};
666
668
669# elif defined(ZTH_HAVE_POLL)
670// If we don't have ZeroMQ, use the OS's poll() instead.
671
676class PollPoller : public PollerServer<struct pollfd> {
677 ZTH_CLASS_NEW_DELETE(PollPoller)
678public:
679 typedef PollerServer<struct pollfd> base;
680
681 PollPoller();
682 virtual ~PollPoller() override;
683
684private:
685 virtual int init(Pollable const& p, struct pollfd& item) noexcept override;
686 virtual int doPoll(int timeout_ms, base::PollItemList& items) noexcept override;
687};
688
689typedef PollPoller DefaultPollerServer;
690
691# else
692// No poller available.
693// Do provide a PollerServer, but it can only return an error upon a poll.
694
699class NoPoller final : public PollerServer<int> {
700 ZTH_CLASS_NEW_DELETE(NoPoller)
701public:
702 typedef PollerServer<int> base;
703
704 NoPoller();
705 ~NoPoller() final;
706
707protected:
708 int init(Pollable const& p, int& item) noexcept final;
709 int doPoll(int timeout_ms, base::PollItemList& items) noexcept final;
710};
711
712typedef NoPoller DefaultPollerServer;
713# endif // No poller
714
723// PollerClient
724//
725
736
738
739public:
741 using base::Result;
742
743 PollerClient();
744
745# if __cplusplus >= 201103L
746 // cppcheck-suppress noExplicitConstructor
747 PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l);
748# endif
749
750 virtual ~PollerClient() override;
751
752 virtual void reserve(size_t more) override;
753 virtual int add(Pollable& p) noexcept override;
754 using base::add;
755 virtual int remove(Pollable& p) noexcept override;
756
757 virtual Result const& poll(int timeout_ms = -1) noexcept override;
758 virtual void event(Pollable& p) noexcept override;
759
760 bool empty() const noexcept final;
761
762private:
764 TimedWaitable m_wait;
765
767 Pollables m_pollables;
768
770 Result m_result;
771};
772
778
783template <typename P>
784int poll(P pollable, int timeout_ms = -1)
785{
786 try {
787 Poller poller;
788 poller.add(pollable);
789
790 while(true) {
791 Poller::Result const& result = poller.poll(timeout_ms);
792
793 if(result.empty()) {
794 switch(errno) {
795 case EINTR:
796 case EAGAIN:
797 if(timeout_ms == -1)
798 // Retry.
799 continue;
801 default:
802 // Error.
803 return errno;
804 }
805 }
806
807 if((result[0]->revents & result[0]->events).any()) {
808 // Got it.
809 return 0;
810 }
811
812 // Hmm, there is another reason we returned...
813 return EIO;
814 }
815 } catch(...) {
816 }
817 return ENOMEM;
818}
819
820} // namespace zth
821#endif // __cplusplus
822#endif // ZTH_POLLER_H
The abstract base class of a Poller client.
Definition poller.h:237
virtual ~PollerClientBase() override=default
The poller to be used by a fiber.
Definition poller.h:734
virtual ~PollerClient() override
PollerClientBase base
Definition poller.h:740
virtual Result const & poll(int timeout_ms=-1) noexcept override
Poll.
Definition poller.cpp:270
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Definition poller.cpp:244
Abstract base class of a poller.
Definition poller.h:188
virtual int remove(Pollable &p) noexcept=0
Remove a pollable object.
virtual ~PollerInterface() override=default
Dtor.
virtual int add(Pollable &p) noexcept=0
Add a pollable object.
virtual void reserve(size_t more)=0
Reserve memory to add more pollables.
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
Definition poller.h:270
PollerClientBase Client
Definition poller.h:272
virtual ~PollerServerBase() override=default
Poller to be executed by the Waiter.
Definition poller.h:338
void operator=(PollerServer const &)=delete
PollerServer(PollerServer &&p)=delete
virtual ~PollerServer() override
Definition poller.h:361
int remove(Pollable &p) noexcept final
Remove a pollable object.
Definition poller.h:504
void operator=(PollerServer &&p)=delete
int add(Pollable &p) noexcept final
Add a pollable object.
Definition poller.h:435
PollerServerBase base
Definition poller.h:340
small_vector< PollItem, 4 > PollItemList
Type of list of poll things.
Definition poller.h:549
void event(Pollable::Events revents, size_t index) noexcept
Register an revents for the PollItem at the given index.
Definition poller.h:558
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
Definition poller.h:400
virtual int poll(int timeout_ms) noexcept override
Poll.
Definition poller.h:528
PollerServer()=default
Ctor.
int remove(Pollable &p, Client *client) noexcept final
Remove the given Pollable, belonging to the given client.
Definition poller.h:441
PollItem_ PollItem
Definition poller.h:546
PollerServer(PollerServer const &)=delete
void clear() noexcept
Clear all Pollables.
Definition poller.h:514
int add(Pollable &p, Client *client) noexcept final
Add a Pollable, that belongs to a given client.
Definition poller.h:406
bool empty() const noexcept final
Checks if there is any pollable registered.
Definition poller.h:523
virtual int migrateTo(PollerServerBase &p) noexcept override
Move all registered Pollables to another server.
Definition poller.h:367
void remove(size_t last) noexcept
Remove the last added Pollables.
Definition poller.h:490
Keeps track of a process-wide unique ID within the type T.
Definition util.h:715
virtual char const * id_str() const override
Definition util.h:809
UniqueID & operator=(UniqueID const &)=delete
void wakeup(TimedWaitable &w)
Definition waiter.cpp:82
Waiter & waiter() noexcept
Definition worker.h:99
A PollerServer that uses zmq_poll().
Definition poller.h:654
PollerServer< zmq_pollitem_t > base
Definition poller.h:657
virtual ~ZmqPoller() override
A simple std::vector, which can contain Prealloc without heap allocation.
Definition util.h:1079
bool empty() const noexcept
Check if the vector is empty.
Definition util.h:1194
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:389
int poll(P pollable, int timeout_ms=-1)
Fiber-aware poll() for a single pollable thing.
Definition poller.h:784
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:189
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition allocator.h:143
#define is_default
Definition macros.h:212
#define ZTH_FALLTHROUGH
Definition macros.h:89
#define UNUSED_PAR(name)
Definition macros.h:78
ZmqPoller DefaultPollerServer
Definition poller.h:667
A pollable file descriptor.
Definition poller.h:135
constexpr PollableFd(void *s, Events const &e, void *user=nullptr) noexcept
Ctor for a ZeroMQ socket.
Definition poller.h:154
constexpr PollableFd(int f, Events const &e, void *user=nullptr) noexcept
Ctor for a file descriptor.
Definition poller.h:142
int fd
The file descriptor.
Definition poller.h:175
void * socket
The ZeroMQ socket.
Definition poller.h:166
A pollable thing.
Definition poller.h:72
static const unsigned long PollIn
Definition poller.h:89
constexpr Pollable(Events const &e, void *user=nullptr) noexcept
Ctor.
Definition poller.h:98
static const unsigned long PollOut
Definition poller.h:90
Events events
Events to poll.
Definition poller.h:118
static const unsigned long PollErr
Definition poller.h:91
static const unsigned long PollPri
Definition poller.h:92
void * user_data
User data.
Definition poller.h:109
EventsFlags
Flags to be used with events and revents.
Definition poller.h:76
Events revents
Returned events by a poll.
Definition poller.h:125
std::bitset< FlagCount > Events
Type of events and revents.
Definition poller.h:86
static const unsigned long PollHup
Definition poller.h:93
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:212