Zth (libzth)
Loading...
Searching...
No Matches
waiter.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
3 *
4 * SPDX-License-Identifier: MPL-2.0
5 */
6
7#include <libzth/waiter.h>
8
9#include <libzth/poller.h>
10#include <libzth/worker.h>
11
12namespace zth {
13
14Waiter::Waiter(Worker& worker)
15 : m_worker(worker)
16 , m_poller()
17 , m_defaultPoller()
18{}
19
20Waiter::~Waiter() noexcept
21{
22 delete m_defaultPoller;
23}
24
26{
27 perf_syscall("waitUntil()");
29}
30
31void Waiter::wait(TimedWaitable& w)
32{
33 Fiber* fiber = m_worker.currentFiber();
35 return;
36
37 Timestamp t_now = Timestamp::now();
38 if(unlikely(w.poll(t_now))) {
39 yield(nullptr, false, t_now);
40 return;
41 }
42
43 w.setFiber(*fiber);
44 fiber->nap(w.timeout());
45 m_worker.release(*fiber);
46
47 m_waiting.insert(w);
48 if(this->fiber())
49 m_worker.resume(*this->fiber());
50
51 m_worker.schedule();
52}
53
58
59void Waiter::scheduleTask(TimedWaitable& w)
60{
61 m_waiting.insert(w);
62 if(fiber())
63 m_worker.resume(*fiber());
64}
65
70
71void Waiter::unscheduleTask(TimedWaitable& w)
72{
73 if(m_waiting.contains(w))
74 m_waiting.erase(w);
75}
76
78{
80}
81
82void Waiter::wakeup(TimedWaitable& w)
83{
84 if(m_waiting.contains(w)) {
85 m_waiting.erase(w);
86
87 if(w.hasFiber()) {
88 w.fiber().wakeup();
89 m_worker.add(&w.fiber(), true);
90 w.resetFiber();
91 }
92 }
93}
94
95PollerServerBase& Waiter::poller()
96{
97 if(m_poller)
98 return *m_poller;
99 if(m_defaultPoller)
100 return *m_defaultPoller;
101
102 m_defaultPoller = new DefaultPollerServer();
103 return *m_defaultPoller;
104}
105
106bool Waiter::polling() const
107{
108 PollerServerBase const* p = m_poller;
109
110 if(!p)
111 p = m_defaultPoller;
112
113 if(!p)
114 return false;
115
116 return !p->empty();
117}
118
119void Waiter::wakeup()
120{
121 if(fiber())
122 m_worker.resume(*fiber());
123}
124
125void Waiter::setPoller(PollerServerBase* p)
126{
127 if(!m_poller) {
128 // No poller set, just use the given one (which may be nullptr).
129 m_poller = p;
130 } else if(p) {
131 // Replace poller by another one.
132 m_poller->migrateTo(*p);
133 m_poller = p;
134 } else {
135 // Replace poller by default one.
136 if(!m_defaultPoller && !m_poller->empty()) {
137 // ...but it doesn't exist yet and we need it.
138 m_defaultPoller = new DefaultPollerServer();
139 }
140
141 if(m_defaultPoller)
142 m_poller->migrateTo(*m_defaultPoller);
143
144 m_poller = m_defaultPoller;
145 }
146}
147
148void Waiter::entry()
149{
150 zth_assert(&currentWorker() == &m_worker);
151 fiber()->setName(format("zth::Waiter of %s", m_worker.id_str()));
152
153 while(true) {
154 Timestamp t_now = Timestamp::now();
155 m_worker.load().stop(t_now);
156
157 while(!m_waiting.empty() && m_waiting.front().timeout() < t_now) {
158 TimedWaitable& w = m_waiting.front();
159 m_waiting.erase(w);
160 if(w.poll(t_now)) {
161 if(w.hasFiber()) {
162 w.fiber().wakeup();
163 m_worker.add(&w.fiber(), true);
164 }
165 } else {
166 // Reinsert, as the timeout() might have changed (and therefore the
167 // position in the list).
168 m_waiting.insert(w);
169
170 // Update administration, although that does not influence the
171 // actual sleep time. It is mostly handy for debugging while
172 // printing fiber information.
173 if(Config::Debug && w.hasFiber())
174 w.fiber().nap(w.timeout());
175 }
176 }
177
178 bool doRealSleep = false;
179
180 m_worker.load().start(t_now);
181
182 if(m_waiting.empty() && !polling()) {
183 // No fiber is waiting. suspend() till anyone is going to nap().
184 zth_dbg(waiter, "[%s] No sleeping fibers anymore; suspend", id_str());
185 m_worker.suspend(*fiber());
186 } else if(!m_worker.schedule()) {
187 // When true, we were not rescheduled, which means that we are the only
188 // runnable fiber. Do a real sleep, until something interesting happens in
189 // the system.
190 doRealSleep = true;
191 m_worker.load().stop(t_now);
192 }
193
194 Timestamp const* end = nullptr;
195 if(!m_waiting.empty())
196 end = &m_waiting.front().timeout();
197 if(!m_worker.runEnd().isNull() && (!end || *end > m_worker.runEnd()))
198 end = &m_worker.runEnd();
199
200 if(polling()) {
201 int timeout_ms = 0;
202 if(doRealSleep) {
203 if(!end) {
204 // Infinite sleep.
205 timeout_ms = -1;
206 zth_dbg(waiter, "[%s] Out of other work than doing poll()",
207 id_str());
208 } else {
209 TimeInterval dt = *end - Timestamp::now();
210 zth_dbg(waiter,
211 "[%s] Out of other work than doing poll(); timeout "
212 "is %s",
213 id_str(), dt.str().c_str());
214 timeout_ms =
215 std::max<int>(0, (int)(dt.s<float>() * 1000.0F));
216 }
217
219 perf_mark("blocking poll()");
221 }
222 }
223
224 int res = poller().poll(timeout_ms);
225
226 if(doRealSleep && Config::EnablePerfEvent) {
227 perf_fiber(*fiber());
228 perf_mark("wakeup");
229 }
230
231 if(res && res != EAGAIN)
232 zth_dbg(waiter, "[%s] poll() failed; %s", id_str(),
233 err(res).c_str());
234 } else if(doRealSleep) {
235 zth_dbg(waiter, "[%s] Out of work; suspend thread, while waiting for %s",
236 id_str(), m_waiting.front().str().c_str());
237
239 perf_mark("idle system; sleep");
241 }
242
243 realsleep(end->ts());
244
247 perf_mark("wakeup");
248 }
249 }
250
252 }
253}
254
255
256
257bool PeriodicWakeUp::nap(Timestamp const& reference, Timestamp const& now)
258{
259 m_t = reference + interval();
260 if(likely(m_t > now)) {
261 // Proper sleep till next deadline.
262 zth::nap(m_t);
263 return true;
264 } else if(m_t + interval() > now) {
265 // Deadline has just passed. Don't sleep and try to catch up.
266 yield();
267 return false;
268 } else {
269 // Way passed deadline. Don't sleep and skip a few cycles.
270 m_t = now;
271 yield();
272 return false;
273 }
274}
275
276} // namespace zth
The fiber.
Definition fiber.h:62
State state() const noexcept
Definition fiber.h:163
void wakeup() noexcept
Definition fiber.h:379
@ Waiting
Definition fiber.h:82
@ Running
Definition fiber.h:82
void nap(Timestamp const &sleepUntil=Timestamp::null()) noexcept
Definition fiber.h:346
void stop(Timestamp const &now=Timestamp::now()) noexcept
Definition perf.h:123
void start(Timestamp const &now=Timestamp::now()) noexcept
Definition perf.h:113
virtual char const * id_str() const noexcept override
Definition util.h:787
void setName(string const &name)
Definition util.h:766
TimeInterval const & interval() const noexcept
Definition waiter.h:371
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
Definition poller.h:270
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int migrateTo(PollerServerBase &p) noexcept=0
Move all registered Pollables to another server.
Fiber * fiber() const noexcept
Definition fiber.h:571
char const * id_str() const
Definition fiber.h:585
Convenient wrapper around struct timespec that contains a time interval.
Definition time.h:82
string str() const
Definition time.h:488
constexpr double s() const noexcept
Definition time.h:333
virtual bool poll(Timestamp const &now=Timestamp::now()) noexcept override
Definition waiter.h:81
Timestamp const & timeout() const noexcept
Definition waiter.h:76
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition time.h:629
static Timestamp now()
Definition time.h:656
constexpr struct timespec const & ts() const noexcept
Definition time.h:664
constexpr bool isNull() const noexcept
Definition time.h:780
void setFiber(Fiber &fiber) noexcept
Definition waiter.h:48
void resetFiber() noexcept
Definition waiter.h:53
bool hasFiber() const noexcept
Definition waiter.h:58
Fiber & fiber() const noexcept
Definition waiter.h:35
void wakeup(TimedWaitable &w)
Definition waiter.cpp:82
void scheduleTask(TimedWaitable &w)
Definition waiter.cpp:59
bool polling() const
Definition waiter.cpp:106
void wait(TimedWaitable &w)
Definition waiter.cpp:31
void unscheduleTask(TimedWaitable &w)
Definition waiter.cpp:71
PollerServerBase & poller()
Definition waiter.cpp:95
The class that manages the fibers within this thread.
Definition worker.h:35
void release(Fiber &fiber) noexcept
Definition worker.h:170
Load_type & load() noexcept
Definition worker.h:348
Waiter & waiter() noexcept
Definition worker.h:108
Fiber * currentFiber() const noexcept
Definition worker.h:103
void add(Fiber *fiber, bool front=false) noexcept
Definition worker.h:113
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition worker.h:184
void suspend(Fiber &fiber)
Definition worker.h:281
void resume(Fiber &fiber) noexcept
Definition worker.h:311
Timestamp const & runEnd() const noexcept
Definition worker.h:321
void nap(Timestamp const &sleepUntil)
Sleep until the given time stamp.
Definition waiter.h:274
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:417
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition worker.h:464
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
Definition waiter.cpp:25
void perf_mark(char const *marker, Timestamp const &t=Timestamp()) noexcept
Put a string marker into the perf output.
Definition perf.cpp:394
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:194
void perf_fiber_state(Fiber &f, int state=-1, Timestamp const &t=Timestamp()) noexcept
Record the current fiber state.
Definition perf.cpp:528
void now(struct timespec &ts)
Returns the current timestamp.
Definition time.h:59
void unscheduleTask(TimedWaitable &w)
Definition waiter.cpp:66
void scheduleTask(TimedWaitable &w)
Definition waiter.cpp:54
ZmqPoller DefaultPollerServer
Definition poller.h:667
int realsleep(struct timespec const &ts)
Sleeps the current thread.
Definition time.cpp:141
void sigchld_check()
Definition worker.cpp:199
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:701
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp()) noexcept
Put a syscall into the perf output.
Definition perf.h:71
string format(char const *fmt,...)
Format like sprintf(), but save the result in an zth::string.
Definition util.h:498
void perf_fiber(Fiber &f) noexcept
Write fiber ID/name to the perf buffer.
Definition perf.cpp:500
void wakeup(TimedWaitable &w)
Definition waiter.cpp:77
static bool const EnablePerfEvent
Enable (but not necessarily record) perf.
Definition config.h:265
static bool const Debug
This is a debug build when set to true.
Definition config.h:58
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:217
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:45
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:60