Zth (libzth)
worker.h
Go to the documentation of this file.
1 #ifndef ZTH_WORKER_H
2 #define ZTH_WORKER_H
3 /*
4  * Zth (libzth), a cooperative userspace multitasking library.
5  * Copyright (C) 2019-2022 Jochem Rutgers
6  *
7  * This Source Code Form is subject to the terms of the Mozilla Public
8  * License, v. 2.0. If a copy of the MPL was not distributed with this
9  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
10  */
11 
12 #ifdef __cplusplus
13 # include <libzth/allocator.h>
14 # include <libzth/config.h>
15 # include <libzth/context.h>
16 # include <libzth/fiber.h>
17 # include <libzth/init.h>
18 # include <libzth/list.h>
19 # include <libzth/perf.h>
20 # include <libzth/waiter.h>
21 
22 # include <cstring>
23 # include <limits>
24 # include <time.h>
25 
26 namespace zth {
27 
28 void sigchld_check();
29 
30 class Worker;
31 
36 class Worker
37  : public UniqueID<Worker>
38  , public ThreadLocalSingleton<Worker> {
40 public:
42  : UniqueID("Worker")
43  , m_currentFiber()
44  , m_workerFiber(&dummyWorkerEntry)
45  , m_waiter(*this)
46  , m_disableContextSwitch()
47  {
48  zth_init();
49 
50  int res = 0;
51  zth_dbg(worker, "[%s] Created", id_str());
52 
53  if(instance() != this)
54  zth_abort("Only one worker allowed per thread");
55 
56  if((res = context_init()))
57  goto error;
58 
59  m_workerFiber.setName("zth::Worker");
60  m_workerFiber.setStackSize(0); // no stack
61  if((res = m_workerFiber.init()))
62  goto error;
63 
64  if((res = perf_init()))
65  goto error;
66 
67  zth_perf_event(m_workerFiber);
68 
69  if((res = waiter().run()))
70  goto error;
71 
72  return;
73 
74 error:
75  zth_abort("Cannot create Worker; %s", err(res).c_str());
76  }
77 
78  virtual ~Worker() override
79  {
80  zth_dbg(worker, "[%s] Destruct", id_str());
81 
82  while(!m_suspendedQueue.empty()) {
83  Fiber& f = m_suspendedQueue.front();
84  resume(f);
85  f.kill();
86  }
87  while(!m_runnableQueue.empty()) {
88  m_runnableQueue.front().kill();
89  cleanup(m_runnableQueue.front());
90  }
91 
92  perf_deinit();
94  }
95 
96  Fiber* currentFiber() const noexcept
97  {
98  return m_currentFiber;
99  }
100 
101  Waiter& waiter() noexcept
102  {
103  return m_waiter;
104  }
105 
106  void add(Fiber* fiber) noexcept
107  {
108  zth_assert(fiber);
109  zth_assert(fiber->state() != Fiber::Waiting); // We don't manage 'Waiting' here.
110 
111  if(unlikely(fiber->state() == Fiber::Suspended)) {
112  m_suspendedQueue.push_back(*fiber);
113  zth_dbg(worker, "[%s] Added suspended %s", id_str(), fiber->id_str());
114  } else {
115  m_runnableQueue.push_front(*fiber);
116  zth_dbg(worker, "[%s] Added runnable %s", id_str(), fiber->id_str());
117  }
118 
119  dbgStats();
120  }
121 
123  {
124  add(fiber);
125  return *this;
126  }
127 
128  void contextSwitchEnable(bool enable = true) noexcept
129  {
130  if(enable) {
131  zth_assert(m_disableContextSwitch > 0);
132  m_disableContextSwitch--;
133  } else
134  m_disableContextSwitch++;
135  }
136 
137  void contextSwitchDisable() noexcept
138  {
139  contextSwitchEnable(false);
140  }
141 
142  bool contextSwitchEnabled() const noexcept
143  {
144  return m_disableContextSwitch == 0;
145  }
146 
147  void release(Fiber& fiber) noexcept
148  {
149  if(unlikely(fiber.state() == Fiber::Suspended)) {
150  m_suspendedQueue.erase(fiber);
151  zth_dbg(worker, "[%s] Removed %s from suspended queue", id_str(),
152  fiber.id_str());
153  } else {
154  if(&fiber == m_currentFiber)
155  m_runnableQueue.rotate(*fiber.listNext());
156 
157  m_runnableQueue.erase(fiber);
158  zth_dbg(worker, "[%s] Removed %s from runnable queue", id_str(),
159  fiber.id_str());
160  }
161  dbgStats();
162  }
163 
164  bool schedule(Fiber* preferFiber = nullptr, Timestamp const& now = Timestamp::now())
165  {
167  // Don't switch, immediately continue.
168  return true;
169 
170  if(preferFiber)
171  zth_dbg(worker, "[%s] Schedule to %s", id_str(), preferFiber->id_str());
172  else
173  zth_dbg(worker, "[%s] Schedule", id_str());
174 
175  dbgStats();
176 
177  // Check if fiber is within the runnable queue.
178  zth_assert(
179  !preferFiber || preferFiber == &m_workerFiber
180  || m_runnableQueue.contains(*preferFiber));
181 
182  if(unlikely(!runEnd().isNull() && runEnd().isBefore(now))) {
183  // Stop worker and return to its run1() call.
184  zth_dbg(worker, "[%s] Time is up", id_str());
185  preferFiber = &m_workerFiber;
186  }
187 
188  Fiber* nextFiber = preferFiber;
189  bool didSchedule = false;
190 reschedule:
191  if(likely(!nextFiber)) {
192  if(likely(!m_runnableQueue.empty()))
193  // Use first of the queue.
194  nextFiber = &m_runnableQueue.front();
195  else
196  // No fiber to switch to.
197  nextFiber = &m_workerFiber;
198  }
199 
200  zth_assert(nextFiber == &m_workerFiber || nextFiber->listPrev());
201  zth_assert(nextFiber == &m_workerFiber || nextFiber->listNext());
202 
203  {
204  // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDelete)
205  Fiber* prevFiber = m_currentFiber;
206  m_currentFiber = nextFiber;
207 
208  if(unlikely(nextFiber != &m_workerFiber))
209  m_runnableQueue.rotate(*nextFiber->listNext());
210 
211  int res =
212  nextFiber->run(likely(prevFiber) ? *prevFiber : m_workerFiber, now);
213  // Warning! When res == 0, fiber might already have been deleted.
214  m_currentFiber = prevFiber;
215 
216  switch(res) {
217  case 0:
218  // Ok, just returned to this fiber. Continue execution.
219  return true;
220  case EAGAIN:
221  // Switching to the same fiber.
222  return didSchedule;
223  case EPERM:
224  // fiber just died.
225  cleanup(*nextFiber);
226  // Retry to find a fiber.
227  nextFiber = nullptr;
228  didSchedule = true;
229  goto reschedule;
230  default:
231  zth_abort("Unhandled Fiber::run() error: %s", err(res).c_str());
232  }
233  }
234  }
235 
237  {
238  zth_assert(fiber.state() == Fiber::Dead);
239 
240  if(unlikely(m_currentFiber == &fiber)) {
241  // It seems that the current fiber is dead.
242  // In this case, we cannot delete the fiber and its context,
243  // as that is the context we are currently using.
244  // Return to the worker's context and sort it out from there.
245 
246  zth_dbg(worker, "[%s] Current fiber %s just died; switch to worker",
247  id_str(), fiber.id_str());
249  schedule(&m_workerFiber);
250 
251  // We should not get here, as we are dead.
252  zth_abort("[Worker %p] Failed to switch to worker", this);
253  }
254 
255  zth_dbg(worker, "[%s] Fiber %s is dead; cleanup", id_str(), fiber.id_str());
256  // Remove from runnable queue
257  m_runnableQueue.erase(fiber);
258  zth_assert(&fiber != &m_workerFiber);
259  // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDelete)
260  delete &fiber;
261 
262  sigchld_check();
263  }
264 
266  {
267  switch(fiber.state()) {
268  case Fiber::New:
269  case Fiber::Ready:
270  release(fiber);
271  fiber.suspend();
272  add(&fiber);
273  break;
274  case Fiber::Running:
275  release(fiber);
276  fiber.suspend();
277  add(&fiber);
280  // Current fiber suspended, immediate reschedule.
281  schedule();
282  break;
283  case Fiber::Waiting:
284  // Not on my queue...
285  fiber.suspend();
286  break;
287  default:; // Ignore.
288  }
289  }
290 
291  void resume(Fiber& fiber) noexcept
292  {
293  if(fiber.state() != Fiber::Suspended)
294  return;
295 
296  release(fiber);
297  fiber.resume();
298  add(&fiber);
299  }
300 
301  Timestamp const& runEnd() const noexcept
302  {
303  return m_end;
304  }
305 
306  void run(TimeInterval const& duration = TimeInterval())
307  {
308  if(duration <= 0) {
309  zth_dbg(worker, "[%s] Run", id_str());
310  m_end = Timestamp::null();
311  } else {
312  zth_dbg(worker, "[%s] Run for %s", id_str(), duration.str().c_str());
313  m_end = Timestamp::now() + duration;
314  }
315 
316  while(!m_runnableQueue.empty()
317  && (runEnd().isNull() || Timestamp::now() < runEnd())) {
318  schedule();
320  }
321 
322  sigchld_check();
323  zth_dbg(worker, "[%s] Stopped", id_str());
324  }
325 
326  typedef Load<> Load_type;
327 
328  Load_type& load() noexcept
329  {
330  return m_load;
331  }
332 
333  Load_type const& load() const noexcept
334  {
335  return m_load;
336  }
337 
338 protected:
339  static void dummyWorkerEntry(void*)
340  {
341  zth_abort("The worker fiber should not be executed.");
342  }
343 
344  bool isInWorkerContext() const noexcept
345  {
346  return m_currentFiber == nullptr || m_currentFiber == &m_workerFiber;
347  }
348 
349  void dbgStats() noexcept
350  {
352  return;
353 
354  zth_dbg(list, "[%s] Run queue:", id_str());
355  if(m_runnableQueue.empty())
356  zth_dbg(list, "[%s] <empty>", id_str());
357  else
358  for(decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
359  it != m_runnableQueue.end(); ++it)
360  zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
361 
362  zth_dbg(list, "[%s] Suspended queue:", id_str());
363  if(m_suspendedQueue.empty())
364  zth_dbg(list, "[%s] <empty>", id_str());
365  else
366  for(decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
367  it != m_suspendedQueue.end(); ++it)
368  zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
369  }
370 
371 private:
372  Fiber* m_currentFiber;
373  List<Fiber> m_runnableQueue;
374  List<Fiber> m_suspendedQueue;
375  Fiber m_workerFiber;
376  Waiter m_waiter;
377  Timestamp m_end;
378  int m_disableContextSwitch;
379  Load_type m_load;
380 
381  friend void worker_global_init();
382 };
383 
388 ZTH_EXPORT __attribute__((pure)) inline Worker& currentWorker() noexcept
389 {
390  // Dereference is guarded by the safe_ptr.
391  return *Worker::instance();
392 }
393 
398 ZTH_EXPORT __attribute__((pure)) inline Fiber& currentFiber() noexcept
399 {
400  Worker const& w = currentWorker();
401  Fiber* f = w.currentFiber();
402  zth_assert(f);
403  // cppcheck-suppress nullPointerRedundantCheck
404  return *f;
405 }
406 
407 __attribute__((pure)) inline UniqueID<Fiber> const& currentFiberID() noexcept
408 {
409  return currentFiber();
410 }
411 
412 inline void getContext(Worker** worker, Fiber** fiber) noexcept
413 {
414  Worker& current_worker = currentWorker();
415  if(likely(worker))
416  *worker = &current_worker;
417 
418  if(likely(fiber)) {
419  Fiber* currentFiber_ = *fiber = current_worker.currentFiber();
420  if(unlikely(!currentFiber_))
421  zth_abort("Not within fiber context");
422  }
423 }
424 
434 ZTH_EXPORT inline void
435 yield(Fiber* preferFiber = nullptr, bool alwaysYield = false,
436  Timestamp const& now = Timestamp::now())
437 {
438  Fiber const& f = currentFiber();
439 
440  perf_syscall("yield()", now);
441  if(unlikely(!alwaysYield && !f.allowYield(now)))
442  return;
443 
444  currentWorker().schedule(preferFiber, now);
445 }
446 
457 ZTH_EXPORT inline void outOfWork()
458 {
459  yield(nullptr, true);
460 }
461 
462 inline void suspend()
463 {
464  Worker* worker = nullptr;
465  Fiber* f = nullptr;
466  getContext(&worker, &f);
467  worker->suspend(*f);
468 }
469 
470 inline void resume(Fiber& fiber)
471 {
472  Worker* worker = nullptr;
473  getContext(&worker, nullptr);
474  worker->resume(fiber);
475 }
476 
477 int startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr);
478 int execlp(char const* file, char const* arg, ... /*, nullptr */);
479 int execvp(char const* file, char* const arg[]);
480 
481 } // namespace zth
482 
488 EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_yield() noexcept
489 {
490  zth::yield();
491 }
492 
498 EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_outOfWork() noexcept
499 {
500  zth::outOfWork();
501 }
502 
510 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_worker_create() noexcept
511 {
512  if(!zth::Worker::instance())
513  return EINVAL;
514 
515  try {
516  new zth::Worker();
517  return 0;
518  } catch(std::bad_alloc const&) {
519  return ENOMEM;
520  } catch(...) {
521  }
522  return EAGAIN;
523 }
524 
530 EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_run(struct timespec const* ts = nullptr) noexcept
531 {
533  if(unlikely(!w))
534  return;
535 
536  w->run(ts ? zth::TimeInterval(ts->tv_sec, ts->tv_nsec) : zth::TimeInterval());
537 }
538 
544 EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_destroy() noexcept
545 {
547  if(unlikely(!w))
548  return;
549  delete w;
550 }
551 
557 EXTERN_C ZTH_EXPORT ZTH_INLINE int
558 zth_startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr) noexcept
559 {
560  return zth::startWorkerThread(f, stack, name);
561 }
562 
568 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_execvp(char const* file, char* const arg[]) noexcept
569 {
570  return zth::execvp(file, arg);
571 }
572 
573 #else // !__cplusplus
574 
575 # include <time.h>
576 
577 ZTH_EXPORT void zth_yield();
578 ZTH_EXPORT void zth_outOfWork();
579 
580 ZTH_EXPORT int zth_worker_create();
581 ZTH_EXPORT void zth_worker_run(struct timespec const* ts);
582 ZTH_EXPORT int zth_worker_destroy();
583 
584 ZTH_EXPORT int zth_startWorkerThread(void (*f)(), size_t stack, char const* name);
585 ZTH_EXPORT int zth_execvp(char const* file, char* const arg[]);
586 
587 #endif // __cplusplus
588 #endif // ZTH_WORKER_H
The fiber.
Definition: fiber.h:52
int init(Timestamp const &now=Timestamp::now())
Definition: fiber.h:178
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
Definition: fiber.h:266
int setStackSize(size_t size) noexcept
Definition: fiber.h:119
@ Suspended
Definition: fiber.h:71
@ Dead
Definition: fiber.h:71
@ Waiting
Definition: fiber.h:71
@ New
Definition: fiber.h:71
@ Ready
Definition: fiber.h:71
@ Running
Definition: fiber.h:71
int run(Fiber &from, Timestamp now=Timestamp::now())
Definition: fiber.h:200
void kill() noexcept
Definition: fiber.h:271
Definition: list.h:97
type * listNext() const noexcept
Definition: list.h:69
type * listPrev() const noexcept
Definition: list.h:74
Measure the load of some activity.
Definition: perf.h:376
Singleton pattern, but only per-thread.
Definition: util.h:954
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Definition: util.h:989
Convenient wrapper around struct timespec that contains a time interval.
Definition: time.h:52
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition: time.h:527
static Timestamp now()
Definition: time.h:554
static constexpr Timestamp null() noexcept
Definition: time.h:674
Keeps track of a process-wide unique ID within the type T.
Definition: util.h:657
virtual char const * id_str() const override
Definition: util.h:751
void setName(string const &name)
Definition: util.h:730
A single fiber per Worker that manages sleeping and blocked fibers.
Definition: waiter.h:197
The class that manages the fibers within this thread.
Definition: worker.h:38
static void dummyWorkerEntry(void *)
Definition: worker.h:339
void contextSwitchDisable() noexcept
Definition: worker.h:137
void dbgStats() noexcept
Definition: worker.h:349
friend void worker_global_init()
Definition: worker.cpp:34
Load_type & load() noexcept
Definition: worker.h:328
Load Load_type
Definition: worker.h:326
void run(TimeInterval const &duration=TimeInterval())
Definition: worker.h:306
void release(Fiber &fiber) noexcept
Definition: worker.h:147
Waiter & waiter() noexcept
Definition: worker.h:101
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition: worker.h:164
void contextSwitchEnable(bool enable=true) noexcept
Definition: worker.h:128
void cleanup(Fiber &fiber)
Definition: worker.h:236
void suspend(Fiber &fiber)
Definition: worker.h:265
void resume(Fiber &fiber) noexcept
Definition: worker.h:291
bool contextSwitchEnabled() const noexcept
Definition: worker.h:142
Worker()
Definition: worker.h:41
bool isInWorkerContext() const noexcept
Definition: worker.h:344
Fiber * currentFiber() const noexcept
Definition: worker.h:96
virtual ~Worker() override
Definition: worker.h:78
void add(Fiber *fiber) noexcept
Definition: worker.h:106
Timestamp const & runEnd() const noexcept
Definition: worker.h:301
Load_type const & load() const noexcept
Definition: worker.h:333
Worker & operator<<(Fiber *fiber) noexcept
Definition: worker.h:122
void zth_outOfWork() noexcept
Force a context switch.
Definition: worker.h:498
void zth_worker_destroy() noexcept
Cleanup the worker.
Definition: worker.h:544
int zth_worker_create() noexcept
Create a Worker.
Definition: worker.h:510
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
Definition: worker.h:568
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
Definition: worker.h:558
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
Definition: worker.h:530
void zth_yield() noexcept
Allow a context switch.
Definition: worker.h:488
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition: util.cpp:280
void outOfWork()
Force a context switch.
Definition: worker.h:457
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
Definition: worker.cpp:65
int execlp(char const *file, char const *arg,...)
Start an external program.
Definition: worker.cpp:96
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition: worker.h:388
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition: worker.h:435
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition: worker.h:398
fiber_type< F >::factory fiber(F f, char const *name=nullptr)
Create a new fiber.
Definition: async.h:713
int execvp(char const *file, char *const arg[])
Start an external program.
Definition: worker.cpp:155
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
Definition: perf.h:292
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition: util.h:210
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition: allocator.h:114
void zth_init()
Perform one-time global initialization of the Zth library.
Definition: init.cpp:25
#define ZTH_INLINE
Definition: macros.h:130
Definition: allocator.h:23
void getContext(Worker **worker, Fiber **fiber) noexcept
Definition: worker.h:412
void perf_deinit()
Definition: perf.cpp:479
int perf_init()
Definition: perf.cpp:465
void resume(Fiber &fiber)
Definition: worker.h:470
int context_init() noexcept
One-time context mechanism initialization.
Definition: context.cpp:32
void sigchld_check()
Definition: worker.cpp:190
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition: util.h:617
void suspend()
Definition: worker.h:462
UniqueID< Fiber > const & currentFiberID() noexcept
Definition: worker.h:407
void context_deinit() noexcept
Final cleanup.
Definition: context.cpp:43
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition: perf.h:359
static int const Print_worker
Definition: config.h:120
static bool const SupportDebugPrint
Add support to enable debug output prints.
Definition: config.h:103
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition: util.h:236
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition: util.h:42
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition: util.h:56