Zth (libzth)
Loading...
Searching...
No Matches
poller.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 */
6
7#include <libzth/poller.h>
8
9namespace zth {
10
11
13// PollerInterface
14//
15
16#if __cplusplus >= 201103L
22int PollerInterface::add(std::initializer_list<std::reference_wrapper<Pollable>> l) noexcept
23{
24 try {
25 reserve(l.size());
26 } catch(...) {
27 return ENOMEM;
28 }
29
30 int res = 0;
31 size_t count = 0;
32
33 for(Pollable& p : l) {
34 res = add(p);
35 if(res) {
36 // Rollback.
37 for(auto const* it = l.begin(); it != l.end() && count > 0; ++it, count--)
38 remove(*it);
39 break;
40 }
41
42 // Success.
43 count++;
44 }
45
46 return res;
47}
48#endif // C++11
49
50
51
52#ifdef ZTH_HAVE_LIBZMQ
54// ZmqPoller
55//
56
57ZmqPoller::ZmqPoller()
58{
59 this->setName("zth::ZmqPoller");
60}
61
62ZmqPoller::~ZmqPoller() is_default
63
64int ZmqPoller::init(Pollable const& p, zmq_pollitem_t& item) noexcept
65{
66 // Zth only has PollableFds, so casting is safe.
67 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
68 PollableFd const& pfd = static_cast<PollableFd const&>(p);
69
70 item.socket = pfd.socket;
71 item.fd = pfd.fd;
72
73 item.events = 0;
74 if((pfd.events.test(Pollable::PollInIndex)))
75 item.events |= ZMQ_POLLIN;
76 if((pfd.events.test(Pollable::PollOutIndex)))
77 item.events |= ZMQ_POLLOUT;
78 return 0;
79}
80
81int ZmqPoller::doPoll(int timeout_ms, base::PollItemList& items) noexcept
82{
83 zth_assert(items.size() <= (size_t)std::numeric_limits<int>::max());
84 int res = zmq_poll(items.data(), (int)items.size(), timeout_ms);
85 if(res < 0)
86 return zmq_errno();
87 if(res == 0)
88 // Timeout.
89 return 0;
90
91 for(size_t i = 0; res > 0 && i < items.size(); i++) {
92 zmq_pollitem_t const& item = items[i];
93 Pollable::Events revents = 0;
94
95 if(item.revents) {
96 res--;
97
98 if(item.revents & ZMQ_POLLIN) // NOLINT(hicpp-signed-bitwise)
99 revents |= Pollable::PollIn;
100 if(item.revents & ZMQ_POLLOUT) // NOLINT(hicpp-signed-bitwise)
101 revents |= Pollable::PollOut;
102 if(item.revents & ZMQ_POLLERR) // NOLINT(hicpp-signed-bitwise)
103 revents |= Pollable::PollErr;
104 }
105
106 this->event(revents, i);
107 }
108
109 return 0;
110}
111
112#elif defined(ZTH_HAVE_POLL)
114// PollPoller
115//
116
117PollPoller::PollPoller()
118{
119 setName("zth::PollPoller");
120}
121
122PollPoller::~PollPoller()
123{
124 clear();
125}
126
127int PollPoller::init(Pollable const& p, struct pollfd& item) noexcept
128{
129 // Zth only has PollableFds, so casting is safe.
130 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
131 PollableFd const& pfd = static_cast<PollableFd const&>(p);
132
133 item.fd = pfd.fd;
134
135 item.events = 0;
136 if((pfd.events.test(Pollable::PollInIndex)))
137 item.events |= POLLIN;
138 if((pfd.events.test(Pollable::PollOutIndex)))
139 item.events |= POLLOUT;
140 if((pfd.events.test(Pollable::PollPriIndex)))
141 item.events |= POLLPRI;
142 if((pfd.events.test(Pollable::PollHupIndex)))
143 item.events |= POLLHUP;
144
145 return 0;
146}
147
148int PollPoller::doPoll(int timeout_ms, base::PollItemList& items) noexcept
149{
150 int res = ::poll(items.data(), items.size(), timeout_ms);
151 if(res < 0)
152 return errno;
153 if(res == 0)
154 // Timeout.
155 return 0;
156
157 for(size_t i = 0; res > 0 && i < items.size(); i++) {
158 struct pollfd const& item = items[i];
159 Pollable::Events revents = 0;
160
161 if(item.revents) {
162 res--;
163
164 if(item.revents & POLLIN)
165 revents |= Pollable::PollIn;
166 if(item.revents & POLLOUT)
167 revents |= Pollable::PollOut;
168 if(item.revents & POLLERR)
169 revents |= Pollable::PollErr;
170 if(item.revents & POLLPRI)
171 revents |= Pollable::PollPri;
172 if(item.revents & POLLHUP)
173 revents |= Pollable::PollHup;
174 }
175
176 event(revents, i);
177 }
178
179 return 0;
180}
181
182
183#else
185// NoPoller
186//
187
188NoPoller::NoPoller()
189{
190 setName("zth::NoPoller");
191}
192
193NoPoller::~NoPoller() is_default
194
195int NoPoller::init(Pollable const& UNUSED_PAR(p), int& UNUSED_PAR(item)) noexcept
196{
197 return EINVAL;
198}
199
200int NoPoller::doPoll(int UNUSED_PAR(timeout_ms), base::PollItemList& UNUSED_PAR(items)) noexcept
201{
202 return ENOSYS;
203}
204
205#endif // Poller method
207
208
210// PollerClient
211//
212
213PollerClient::PollerClient()
214{
215 this->setName("zth::PollerClient");
216}
217
218#if __cplusplus >= 201103L
219PollerClient::PollerClient(std::initializer_list<std::reference_wrapper<Pollable>> l)
220{
221 errno = add(l);
222
223# ifdef __cpp_exceptions
224 switch(errno) {
225 case 0:
226 break;
227 case ENOMEM:
228 throw std::bad_alloc();
229 default:
230 throw std::runtime_error("");
231 }
232# endif
233}
234#endif
235
236PollerClient::~PollerClient() is_default
237
238void PollerClient::reserve(size_t more)
239{
240 m_pollables.reserve(m_pollables.size() + more);
241 m_result.reserve(m_pollables.capacity());
242}
243
244int PollerClient::add(Pollable& p) noexcept
245{
246 try {
247 m_result.reserve(m_pollables.size() + 1U);
248 m_pollables.push_back(&p);
249 zth_dbg(io, "[%s] poll %p for events 0x%lu", id_str(), &p, p.events.to_ulong());
250 return 0;
251 } catch(...) {
252 return ENOMEM;
253 }
254}
255
256int PollerClient::remove(Pollable& p) noexcept
257{
258 for(size_t i = m_pollables.size(); i > 0; i--) {
259 Pollable*& pi = m_pollables[i - 1U];
260 if(pi == &p) {
261 pi = m_pollables.back();
262 m_pollables.pop_back();
263 return 0;
264 }
265 }
266
267 return EAGAIN;
268}
269
270PollerClient::Result const& PollerClient::poll(int timeout_ms) noexcept
271{
272 m_result.clear();
273
274 if(m_pollables.empty()) {
275 errno = EINVAL;
276 return m_result;
277 }
278
279 Waiter& waiter = currentWorker().waiter();
280 PollerServerBase& p = waiter.poller();
281
282 // Add all our pollables to the server.
283 for(size_t i = 0; i < m_pollables.size(); i++)
284 p.add(*m_pollables[i], this);
285
286 // Go do the poll.
287 zth_dbg(io, "[%s] polling %u items for %d ms", id_str(), (unsigned)m_pollables.size(),
288 timeout_ms);
289
290 // First try, in the current fiber's context.
291 m_wait = TimedWaitable();
292 int res = p.poll(0);
293
294 if(!m_result.empty()) { // NOLINT(bugprone-branch-clone)
295 // Completed already.
296 } else if(res && res != EAGAIN) {
297 // Completed with an error.
298 } else if(timeout_ms == 0) {
299 // Just tried. Got nothing, apparently.
300 } else if(timeout_ms < 0) {
301 // Wait for the event callback.
302 zth_dbg(io, "[%s] hand-off to server", id_str());
303 m_wait.setFiber(currentFiber());
304 suspend();
305 } else {
306 zth_dbg(io, "[%s] hand-off to server with timeout", id_str());
307 m_wait = TimedWaitable(
308 Timestamp::now()
309 + TimeInterval(timeout_ms / 1000L, (timeout_ms * 1000000L) % 1000000000L));
310 waitUntil(m_wait);
311 }
312
313 // Remove our pollables from the server.
314 for(size_t i = m_pollables.size(); i > 0; i--)
315 p.remove(*m_pollables[i - 1U], this);
316
317 // m_result got populated by event().
318 if(m_result.empty() && !res)
319 res = EAGAIN;
320
321 if(res)
322 zth_dbg(io, "[%s] poll returned %s", id_str(), err(res).c_str());
323 else
324 zth_dbg(io, "[%s] poll returned %u pollable(s)", id_str(),
325 (unsigned)m_result.size());
326
327 errno = res;
328 return m_result;
329}
330
331void PollerClient::event(Pollable& p) noexcept
332{
333 m_result.push_back(&p);
334
335 if(!m_wait.timeout().isNull()) {
336 // Managed by the waiter
337 currentWorker().waiter().wakeup(m_wait);
338 } else if(m_wait.hasFiber()) {
339 // Just suspended.
340 resume(m_wait.fiber());
341 }
342}
343
344bool PollerClient::empty() const noexcept
345{
346 return m_pollables.empty();
347}
348
349} // namespace zth
The poller to be used by a fiber.
Definition poller.h:734
virtual void reserve(size_t more) override
Reserve memory to add more pollables.
Definition poller.cpp:238
virtual int add(Pollable &p) noexcept override
Add a pollable object.
Definition poller.cpp:244
Abstract base class of a Poller server.
Definition poller.h:270
virtual int remove(Pollable &p, Client *client) noexcept=0
Remove the given Pollable, belonging to the given client.
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int add(Pollable &p, Client *client) noexcept=0
Add a Pollable, that belongs to a given client.
Convenient wrapper around struct timespec that contains a time interval.
Definition time.h:55
A single fiber per Worker that manages sleeping and blocked fibers.
Definition waiter.h:194
void wakeup(TimedWaitable &w)
Definition waiter.cpp:82
PollerServerBase & poller()
Definition waiter.cpp:95
Waiter & waiter() noexcept
Definition worker.h:99
A PollerServer that uses zmq_poll().
Definition poller.h:654
Change the name of a fiber returned by zth_async.
Definition async.h:173
void clear() noexcept
Erase all elements from the vector.
Definition util.h:1237
bool empty() const noexcept
Check if the vector is empty.
Definition util.h:1194
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:389
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition worker.h:399
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
Definition waiter.cpp:25
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:189
#define is_default
Definition macros.h:212
#define UNUSED_PAR(name)
Definition macros.h:78
void resume(Fiber &fiber)
Definition worker.h:471
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:675
void suspend()
Definition worker.h:463
A pollable file descriptor.
Definition poller.h:135
int fd
The file descriptor.
Definition poller.h:175
void * socket
The ZeroMQ socket.
Definition poller.h:166
A pollable thing.
Definition poller.h:72
Events events
Events to poll.
Definition poller.h:118
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:212