Zth (libzth)
Loading...
Searching...
No Matches
waiter.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 */
6
7#include <libzth/waiter.h>
8
9#include <libzth/poller.h>
10#include <libzth/worker.h>
11
12namespace zth {
13
14Waiter::Waiter(Worker& worker)
15 : m_worker(worker)
16 , m_poller()
17 , m_defaultPoller()
18{}
19
20Waiter::~Waiter()
21{
22 delete m_defaultPoller;
23}
24
26{
27 perf_syscall("waitUntil()");
29}
30
31void Waiter::wait(TimedWaitable& w)
32{
33 Fiber* fiber = m_worker.currentFiber();
35 return;
36
38 if(unlikely(w.poll(now))) {
39 yield(nullptr, false, now);
40 return;
41 }
42
43 w.setFiber(*fiber);
44 fiber->nap(w.timeout());
45 m_worker.release(*fiber);
46
47 m_waiting.insert(w);
48 if(this->fiber())
49 m_worker.resume(*this->fiber());
50
51 m_worker.schedule();
52}
53
58
59void Waiter::scheduleTask(TimedWaitable& w)
60{
61 m_waiting.insert(w);
62 if(fiber())
63 m_worker.resume(*fiber());
64}
65
70
71void Waiter::unscheduleTask(TimedWaitable& w)
72{
73 if(m_waiting.contains(w))
74 m_waiting.erase(w);
75}
76
78{
80}
81
82void Waiter::wakeup(TimedWaitable& w)
83{
84 if(m_waiting.contains(w)) {
85 m_waiting.erase(w);
86
87 if(w.hasFiber()) {
88 w.fiber().wakeup();
89 m_worker.add(&w.fiber());
90 w.resetFiber();
91 }
92 }
93}
94
95PollerServerBase& Waiter::poller()
96{
97 if(m_poller)
98 return *m_poller;
99 if(m_defaultPoller)
100 return *m_defaultPoller;
101
102 m_defaultPoller = new DefaultPollerServer();
103 return *m_defaultPoller;
104}
105
106bool Waiter::polling() const
107{
108 PollerServerBase const* p = m_poller;
109
110 if(!p)
111 p = m_defaultPoller;
112
113 if(!p)
114 return false;
115
116 return !p->empty();
117}
118
119void Waiter::wakeup()
120{
121 if(fiber())
122 m_worker.resume(*fiber());
123}
124
125void Waiter::setPoller(PollerServerBase* p)
126{
127 if(!m_poller) {
128 // No poller set, just use the given one (which may be nullptr).
129 m_poller = p;
130 } else if(p) {
131 // Replace poller by another one.
132 m_poller->migrateTo(*p);
133 m_poller = p;
134 } else {
135 // Replace poller by default one.
136 if(!m_defaultPoller && !m_poller->empty()) {
137 // ...but it doesn't exist yet and we need it.
138 m_defaultPoller = new DefaultPollerServer();
139 }
140
141 if(m_defaultPoller)
142 m_poller->migrateTo(*m_defaultPoller);
143
144 m_poller = m_defaultPoller;
145 }
146}
147
148void Waiter::entry()
149{
150 zth_assert(&currentWorker() == &m_worker);
151 fiber()->setName(format("zth::Waiter of %s", m_worker.id_str()));
152
153 while(true) {
155 m_worker.load().stop(now);
156
157 while(!m_waiting.empty() && m_waiting.front().timeout() < now) {
158 TimedWaitable& w = m_waiting.front();
159 m_waiting.erase(w);
160 if(w.poll(now)) {
161 if(w.hasFiber()) {
162 w.fiber().wakeup();
163 m_worker.add(&w.fiber());
164 }
165 } else {
166 // Reinsert, as the timeout() might have changed (and therefore the
167 // position in the list).
168 m_waiting.insert(w);
169
170 // Update administration, although that does not influence the
171 // actual sleep time. It is mostly handy for debugging while
172 // printing fiber information.
173 if(Config::Debug && w.hasFiber())
174 w.fiber().nap(w.timeout());
175 }
176 }
177
178 bool doRealSleep = false;
179
180 m_worker.load().start(now);
181
182 if(m_waiting.empty() && !polling()) {
183 // No fiber is waiting. suspend() till anyone is going to nap().
184 zth_dbg(waiter, "[%s] No sleeping fibers anymore; suspend", id_str());
185 m_worker.suspend(*fiber());
186 } else if(!m_worker.schedule()) {
187 // When true, we were not rescheduled, which means that we are the only
188 // runnable fiber. Do a real sleep, until something interesting happens in
189 // the system.
190 doRealSleep = true;
191 m_worker.load().stop(now);
192 }
193
194 Timestamp const* end = nullptr;
195 if(!m_waiting.empty())
196 end = &m_waiting.front().timeout();
197 if(!m_worker.runEnd().isNull() && (!end || *end > m_worker.runEnd()))
198 end = &m_worker.runEnd();
199
200 if(polling()) {
201 int timeout_ms = 0;
202 if(doRealSleep) {
203 if(!end) {
204 // Infinite sleep.
205 timeout_ms = -1;
206 zth_dbg(waiter, "[%s] Out of other work than doing poll()",
207 id_str());
208 } else {
209 TimeInterval dt = *end - Timestamp::now();
210 zth_dbg(waiter,
211 "[%s] Out of other work than doing poll(); timeout "
212 "is %s",
213 id_str(), dt.str().c_str());
214 timeout_ms =
215 std::max<int>(0, (int)(dt.s<float>() * 1000.0F));
216 }
217
218 perf_mark("blocking poll()");
220 }
221
222 int res = poller().poll(timeout_ms);
223
224 if(doRealSleep) {
225 zth_perf_event(*fiber(), fiber()->state());
226 perf_mark("wakeup");
227 }
228
229 if(res && res != EAGAIN)
230 zth_dbg(waiter, "[%s] poll() failed; %s", id_str(),
231 err(res).c_str());
232 } else if(doRealSleep) {
233 zth_dbg(waiter, "[%s] Out of work; suspend thread, while waiting for %s",
234 id_str(), m_waiting.front().str().c_str());
235 perf_mark("idle system; sleep");
237 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &end->ts(), nullptr);
238 zth_perf_event(*fiber(), fiber()->state());
239 perf_mark("wakeup");
240 }
241
243 }
244}
245
246
247
248bool PeriodicWakeUp::nap(Timestamp const& reference, Timestamp const& now)
249{
250 m_t = reference + interval();
251 if(likely(m_t > now)) {
252 // Proper sleep till next deadline.
253 zth::nap(m_t);
254 return true;
255 } else if(m_t + interval() > now) {
256 // Deadline has just passed. Don't sleep and try to catch up.
257 yield();
258 return false;
259 } else {
260 // Way passed deadline. Don't sleep and skip a few cycles.
261 m_t = now;
262 yield();
263 return false;
264 }
265}
266
267} // namespace zth
The fiber.
Definition fiber.h:49
State state() const noexcept
Definition fiber.h:139
void wakeup() noexcept
Definition fiber.h:311
@ Waiting
Definition fiber.h:68
@ Running
Definition fiber.h:68
void nap(Timestamp const &sleepUntil=Timestamp::null()) noexcept
Definition fiber.h:279
void stop(Timestamp const &now=Timestamp::now()) noexcept
Definition perf.h:410
void start(Timestamp const &now=Timestamp::now()) noexcept
Definition perf.h:400
TimeInterval const & interval() const noexcept
Definition waiter.h:371
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
Definition poller.h:270
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int migrateTo(PollerServerBase &p) noexcept=0
Move all registered Pollables to another server.
Fiber * fiber() const noexcept
Definition fiber.h:490
char const * id_str() const
Definition fiber.h:504
Convenient wrapper around struct timespec that contains a time interval.
Definition time.h:55
string str() const
Definition time.h:426
constexpr double s() const noexcept
Definition time.h:271
virtual bool poll(Timestamp const &now=Timestamp::now()) noexcept override
Definition waiter.h:81
Timestamp const & timeout() const noexcept
Definition waiter.h:76
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition time.h:568
static Timestamp now()
Definition time.h:595
constexpr bool isNull() const noexcept
Definition time.h:720
virtual char const * id_str() const override
Definition util.h:809
void setName(string const &name)
Definition util.h:788
void setFiber(Fiber &fiber) noexcept
Definition waiter.h:48
void resetFiber() noexcept
Definition waiter.h:53
bool hasFiber() const noexcept
Definition waiter.h:58
Fiber & fiber() const noexcept
Definition waiter.h:35
void wakeup(TimedWaitable &w)
Definition waiter.cpp:82
void scheduleTask(TimedWaitable &w)
Definition waiter.cpp:59
bool polling() const
Definition waiter.cpp:106
void wait(TimedWaitable &w)
Definition waiter.cpp:31
void unscheduleTask(TimedWaitable &w)
Definition waiter.cpp:71
PollerServerBase & poller()
Definition waiter.cpp:95
The class that manages the fibers within this thread.
Definition worker.h:35
void release(Fiber &fiber) noexcept
Definition worker.h:145
Load_type & load() noexcept
Definition worker.h:329
Waiter & waiter() noexcept
Definition worker.h:99
Fiber * currentFiber() const noexcept
Definition worker.h:94
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition worker.h:162
void suspend(Fiber &fiber)
Definition worker.h:263
void resume(Fiber &fiber) noexcept
Definition worker.h:292
void add(Fiber *fiber) noexcept
Definition worker.h:104
Timestamp const & runEnd() const noexcept
Definition worker.h:302
void nap(Timestamp const &sleepUntil)
Sleep until the given time stamp.
Definition waiter.h:274
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:389
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition worker.h:436
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
Definition waiter.cpp:25
void perf_mark(char const *marker)
Put a string marker into the perf output.
Definition perf.h:326
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
Definition perf.h:294
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:189
void unscheduleTask(TimedWaitable &w)
Definition waiter.cpp:66
void scheduleTask(TimedWaitable &w)
Definition waiter.cpp:54
ZmqPoller DefaultPollerServer
Definition poller.h:667
void sigchld_check()
Definition worker.cpp:189
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:675
string format(char const *fmt,...)
Format like sprintf(), but save the result in an zth::string.
Definition util.h:478
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition perf.h:361
void wakeup(TimedWaitable &w)
Definition waiter.cpp:77
static bool const Debug
This is a debug build when set to true.
Definition config.h:56
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:212
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:40
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:55