Zth (libzth)
waiter.cpp
Go to the documentation of this file.
1 /*
2  * Zth (libzth), a cooperative userspace multitasking library.
3  * Copyright (C) 2019-2022 Jochem Rutgers
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
8  */
9 
10 #include <libzth/waiter.h>
11 
12 #include <libzth/poller.h>
13 #include <libzth/worker.h>
14 
15 namespace zth {
16 
18  : m_worker(worker)
19  , m_poller()
20  , m_defaultPoller()
21 {}
22 
24 {
25  delete m_defaultPoller;
26 }
27 
29 {
30  perf_syscall("waitUntil()");
31  currentWorker().waiter().wait(w);
32 }
33 
35 {
36  Fiber* fiber = m_worker.currentFiber();
37  if(unlikely(!fiber || fiber->state() != Fiber::Running))
38  return;
39 
40  Timestamp now = Timestamp::now();
41  if(unlikely(w.poll(now))) {
42  yield(NULL, false, now);
43  return;
44  }
45 
46  w.setFiber(*fiber);
47  fiber->nap(w.timeout());
48  m_worker.release(*fiber);
49 
50  m_waiting.insert(w);
51  if(this->fiber())
52  m_worker.resume(*this->fiber());
53 
54  m_worker.schedule();
55 }
56 
58 {
60 }
61 
63 {
64  m_waiting.insert(w);
65  if(fiber())
66  m_worker.resume(*fiber());
67 }
68 
70 {
72 }
73 
75 {
76  if(m_waiting.contains(w))
77  m_waiting.erase(w);
78 }
79 
81 {
83 }
84 
86 {
87  if(m_waiting.contains(w)) {
88  m_waiting.erase(w);
89 
90  if(w.hasFiber()) {
91  w.fiber().wakeup();
92  m_worker.add(&w.fiber());
93  w.resetFiber();
94  }
95  }
96 }
97 
99 {
100  if(m_poller)
101  return *m_poller;
102  if(m_defaultPoller)
103  return *m_defaultPoller;
104 
105  m_defaultPoller = new DefaultPollerServer();
106  return *m_defaultPoller;
107 }
108 
109 bool Waiter::polling() const
110 {
111  PollerServerBase* p = m_poller;
112 
113  if(!p)
114  p = m_defaultPoller;
115 
116  if(!p)
117  return false;
118 
119  return !p->empty();
120 }
121 
123 {
124  if(fiber())
125  m_worker.resume(*fiber());
126 }
127 
129 {
130  if(!m_poller) {
131  // No poller set, just use the given one (which may be nullptr).
132  m_poller = p;
133  } else if(p) {
134  // Replace poller by another one.
135  m_poller->migrateTo(*p);
136  m_poller = p;
137  } else {
138  // Replace poller by default one.
139  if(!m_defaultPoller && !m_poller->empty()) {
140  // ...but it doesn't exist yet and we need it.
141  m_defaultPoller = new DefaultPollerServer();
142  }
143 
144  if(m_defaultPoller)
145  m_poller->migrateTo(*m_defaultPoller);
146 
147  m_poller = m_defaultPoller;
148  }
149 }
150 
152 {
153  zth_assert(&currentWorker() == &m_worker);
154  fiber()->setName(format("zth::Waiter of %s", m_worker.id_str()));
155 
156  while(true) {
157  Timestamp now = Timestamp::now();
158  m_worker.load().stop(now);
159 
160  while(!m_waiting.empty() && m_waiting.front().timeout() < now) {
161  TimedWaitable& w = m_waiting.front();
162  m_waiting.erase(w);
163  if(w.poll(now)) {
164  if(w.hasFiber()) {
165  w.fiber().wakeup();
166  m_worker.add(&w.fiber());
167  }
168  } else {
169  // Reinsert, as the timeout() might have changed (and therefore the
170  // position in the list).
171  m_waiting.insert(w);
172 
173  // Update administration, although that does not influence the
174  // actual sleep time. It is mostly handy for debugging while
175  // printing fiber information.
176  if(Config::Debug && w.hasFiber())
177  w.fiber().nap(w.timeout());
178  }
179  }
180 
181  bool doRealSleep = false;
182 
183  m_worker.load().start(now);
184 
185  if(m_waiting.empty() && !polling()) {
186  // No fiber is waiting. suspend() till anyone is going to nap().
187  zth_dbg(waiter, "[%s] No sleeping fibers anymore; suspend", id_str());
188  m_worker.suspend(*fiber());
189  } else if(!m_worker.schedule()) {
190  // When true, we were not rescheduled, which means that we are the only
191  // runnable fiber. Do a real sleep, until something interesting happens in
192  // the system.
193  doRealSleep = true;
194  m_worker.load().stop(now);
195  }
196 
197  Timestamp const* end = nullptr;
198  if(!m_waiting.empty())
199  end = &m_waiting.front().timeout();
200  if(!m_worker.runEnd().isNull() && (!end || *end > m_worker.runEnd()))
201  end = &m_worker.runEnd();
202 
203  if(polling()) {
204  int timeout_ms = 0;
205  if(doRealSleep) {
206  if(!end) {
207  // Infinite sleep.
208  timeout_ms = -1;
209  zth_dbg(waiter, "[%s] Out of other work than doing poll()",
210  id_str());
211  } else {
212  TimeInterval dt = *end - Timestamp::now();
213  zth_dbg(waiter,
214  "[%s] Out of other work than doing poll(); timeout "
215  "is %s",
216  id_str(), dt.str().c_str());
217  timeout_ms =
218  std::max<int>(0, (int)(dt.s<float>() * 1000.0f));
219  }
220 
221  perf_mark("blocking poll()");
223  }
224 
225  int res = poller().poll(timeout_ms);
226 
227  if(doRealSleep) {
228  zth_perf_event(*fiber(), fiber()->state());
229  perf_mark("wakeup");
230  }
231 
232  if(res && res != EAGAIN)
233  zth_dbg(waiter, "[%s] poll() failed; %s", id_str(),
234  err(res).c_str());
235  } else if(doRealSleep) {
236  zth_dbg(waiter, "[%s] Out of work; suspend thread, while waiting for %s",
237  id_str(), m_waiting.front().str().c_str());
238  perf_mark("idle system; sleep");
240  clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &end->ts(), NULL);
241  zth_perf_event(*fiber(), fiber()->state());
242  perf_mark("wakeup");
243  }
244 
245  sigchld_check();
246  }
247 }
248 
249 
250 
251 bool PeriodicWakeUp::nap(Timestamp const& reference, Timestamp const& now)
252 {
253  m_t = reference + interval();
254  if(likely(m_t > now)) {
255  // Proper sleep till next deadline.
256  zth::nap(m_t);
257  return true;
258  } else if(m_t + interval() > now) {
259  // Deadline has just passed. Don't sleep and try to catch up.
260  yield();
261  return false;
262  } else {
263  // Way passed deadline. Don't sleep and skip a few cycles.
264  m_t = now;
265  yield();
266  return false;
267  }
268 }
269 
270 } // namespace zth
The fiber.
Definition: fiber.h:52
State state() const noexcept
Definition: fiber.h:143
void wakeup() noexcept
Definition: fiber.h:310
@ Waiting
Definition: fiber.h:71
@ Running
Definition: fiber.h:71
void nap(Timestamp const &sleepUntil=Timestamp::null()) noexcept
Definition: fiber.h:280
void stop(Timestamp const &now=Timestamp::now()) noexcept
Definition: perf.h:408
void start(Timestamp const &now=Timestamp::now()) noexcept
Definition: perf.h:398
TimeInterval const & interval() const noexcept
Definition: waiter.h:374
virtual bool empty() const noexcept=0
Checks if there is any pollable registered.
Abstract base class of a Poller server.
Definition: poller.h:273
virtual int poll(int timeout_ms) noexcept=0
Poll.
virtual int migrateTo(PollerServerBase &p) noexcept=0
Move all registered Pollables to another server.
char const * id_str() const
Definition: fiber.h:490
Fiber * fiber() const noexcept
Definition: fiber.h:479
Convenient wrapper around struct timespec that contains a time interval.
Definition: time.h:52
string str() const
Definition: time.h:404
constexpr double s() const noexcept
Definition: time.h:249
virtual bool poll(Timestamp const &now=Timestamp::now()) noexcept override
Definition: waiter.h:84
Timestamp const & timeout() const noexcept
Definition: waiter.h:79
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition: time.h:527
static Timestamp now()
Definition: time.h:554
constexpr bool isNull() const noexcept
Definition: time.h:679
virtual char const * id_str() const override
Definition: util.h:751
void setName(string const &name)
Definition: util.h:730
void setFiber(Fiber &fiber) noexcept
Definition: waiter.h:49
Fiber & fiber() const noexcept
Definition: waiter.h:36
void resetFiber() noexcept
Definition: waiter.h:54
bool hasFiber() const noexcept
Definition: waiter.h:59
virtual ~Waiter() override
Definition: waiter.cpp:23
void wakeup(TimedWaitable &w)
Definition: waiter.cpp:85
void scheduleTask(TimedWaitable &w)
Definition: waiter.cpp:62
bool polling() const
Definition: waiter.cpp:109
void wait(TimedWaitable &w)
Definition: waiter.cpp:34
virtual void entry() override
Definition: waiter.cpp:151
void unscheduleTask(TimedWaitable &w)
Definition: waiter.cpp:74
PollerServerBase & poller()
Definition: waiter.cpp:98
Waiter(Worker &worker)
Definition: waiter.cpp:17
void setPoller(PollerServerBase *p=nullptr)
Definition: waiter.cpp:128
void wakeup()
Definition: waiter.cpp:122
The class that manages the fibers within this thread.
Definition: worker.h:38
Load_type & load() noexcept
Definition: worker.h:328
void release(Fiber &fiber) noexcept
Definition: worker.h:147
Waiter & waiter() noexcept
Definition: worker.h:101
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition: worker.h:164
void suspend(Fiber &fiber)
Definition: worker.h:265
void resume(Fiber &fiber) noexcept
Definition: worker.h:291
Fiber * currentFiber() const noexcept
Definition: worker.h:96
void add(Fiber *fiber) noexcept
Definition: worker.h:106
Timestamp const & runEnd() const noexcept
Definition: worker.h:301
void nap(Timestamp const &sleepUntil)
Sleep until the given time stamp.
Definition: waiter.h:277
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition: worker.h:388
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition: worker.h:435
void waitUntil(TimedWaitable &w)
Wait until the given Waitable has passed.
Definition: waiter.cpp:28
constexpr auto timeout_ms
A guard that is enabled after a ms milliseconds after entering the current state.
Definition: fsm14.h:2069
@ end
End-of-guard-list marker.
Definition: fsm.h:45
void perf_mark(char const *marker)
Put a string marker into the perf output.
Definition: perf.h:324
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
Definition: perf.h:292
ZmqPoller DefaultPollerServer
The poller server, by default instantiated by the zth::Waiter.
Definition: poller.h:666
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition: util.h:210
Definition: allocator.h:23
void unscheduleTask(TimedWaitable &w)
Definition: waiter.cpp:69
void scheduleTask(TimedWaitable &w)
Definition: waiter.cpp:57
void sigchld_check()
Definition: worker.cpp:190
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition: util.h:617
string format(char const *fmt,...)
Format like sprintf(), but save the result in an zth::string.
Definition: util.h:503
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition: perf.h:359
void wakeup(TimedWaitable &w)
Definition: waiter.cpp:80
static bool const Debug
This is a debug build when set to true.
Definition: config.h:50
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition: util.h:236
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition: util.h:42
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition: util.h:56