Zth (libzth)
Loading...
Searching...
No Matches
worker.h
Go to the documentation of this file.
1#ifndef ZTH_WORKER_H
2#define ZTH_WORKER_H
3/*
4 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 */
8
9#include <libzth/macros.h>
10
11#ifdef __cplusplus
12# include <libzth/allocator.h>
13# include <libzth/config.h>
14# include <libzth/context.h>
15# include <libzth/fiber.h>
16# include <libzth/init.h>
17# include <libzth/list.h>
18# include <libzth/perf.h>
19# include <libzth/waiter.h>
20
21# include <cstring>
22# include <limits>
23# include <time.h>
24
25namespace zth {
26
27void sigchld_check();
28
29class Worker;
30
35class Worker : public UniqueID<Worker>, public ThreadLocalSingleton<Worker> {
37public:
39 : UniqueID("Worker")
40 , m_currentFiber()
41 , m_workerFiber(&dummyWorkerEntry)
42 , m_waiter(*this)
43 , m_disableContextSwitch()
44 {
45 zth_init();
46
47 int res = 0;
48 zth_dbg(worker, "[%s] Created", id_str());
49
50 if(instance() != this)
51 zth_abort("Only one worker allowed per thread");
52
53 // cppcheck-suppress knownConditionTrueFalse
54 if((res = context_init()))
55 goto error;
56
57 m_workerFiber.setName("zth::Worker");
58 m_workerFiber.setStackSize(0); // no stack
59 if((res = m_workerFiber.init()))
60 goto error;
61
62 if((res = perf_init()))
63 goto error;
64
65 zth_perf_event(m_workerFiber);
66
67 if((res = waiter().run()))
68 goto error;
69
70 return;
71
72error:
73 zth_abort("Cannot create Worker; %s", err(res).c_str());
74 }
75
76 virtual ~Worker() override
77 {
78 zth_dbg(worker, "[%s] Destruct", id_str());
79
80 while(!m_suspendedQueue.empty()) {
81 Fiber& f = m_suspendedQueue.front();
82 resume(f);
83 f.kill();
84 }
85 while(!m_runnableQueue.empty()) {
86 m_runnableQueue.front().kill();
87 cleanup(m_runnableQueue.front());
88 }
89
92 }
93
94 Fiber* currentFiber() const noexcept
95 {
96 return m_currentFiber;
97 }
98
99 Waiter& waiter() noexcept
100 {
101 return m_waiter;
102 }
103
104 void add(Fiber* fiber) noexcept
105 {
107 zth_assert(fiber->state() != Fiber::Waiting); // We don't manage 'Waiting' here.
108
109 if(unlikely(fiber->state() == Fiber::Suspended)) {
110 m_suspendedQueue.push_back(*fiber);
111 zth_dbg(worker, "[%s] Added suspended %s", id_str(), fiber->id_str());
112 } else {
113 m_runnableQueue.push_front(*fiber);
114 zth_dbg(worker, "[%s] Added runnable %s", id_str(), fiber->id_str());
115 }
116
117 dbgStats();
118 }
119
121 {
122 add(fiber);
123 return *this;
124 }
125
126 void contextSwitchEnable(bool enable = true) noexcept
127 {
128 if(enable) {
129 zth_assert(m_disableContextSwitch > 0);
130 m_disableContextSwitch--;
131 } else
132 m_disableContextSwitch++;
133 }
134
135 void contextSwitchDisable() noexcept
136 {
137 contextSwitchEnable(false);
138 }
139
140 bool contextSwitchEnabled() const noexcept
141 {
142 return m_disableContextSwitch == 0;
143 }
144
145 void release(Fiber& fiber) noexcept
146 {
147 if(unlikely(fiber.state() == Fiber::Suspended)) {
148 m_suspendedQueue.erase(fiber);
149 zth_dbg(worker, "[%s] Removed %s from suspended queue", id_str(),
150 fiber.id_str());
151 } else {
152 if(&fiber == m_currentFiber)
153 m_runnableQueue.rotate(*fiber.listNext());
154
155 m_runnableQueue.erase(fiber);
156 zth_dbg(worker, "[%s] Removed %s from runnable queue", id_str(),
157 fiber.id_str());
158 }
159 dbgStats();
160 }
161
162 bool schedule(Fiber* preferFiber = nullptr, Timestamp const& now = Timestamp::now())
163 {
165 // Don't switch, immediately continue.
166 return true;
167
168 if(preferFiber)
169 zth_dbg(worker, "[%s] Schedule to %s", id_str(), preferFiber->id_str());
170 else
171 zth_dbg(worker, "[%s] Schedule", id_str());
172
173 dbgStats();
174
175 // Check if fiber is within the runnable queue.
177 !preferFiber || preferFiber == &m_workerFiber
178 || m_runnableQueue.contains(*preferFiber));
179
180 if(unlikely(!runEnd().isNull() && runEnd().isBefore(now))) {
181 // Stop worker and return to its run1() call.
182 zth_dbg(worker, "[%s] Time is up", id_str());
183 preferFiber = &m_workerFiber;
184 }
185
186 Fiber* nextFiber = preferFiber;
187 bool didSchedule = false;
188reschedule:
189 if(likely(!nextFiber)) {
190 if(likely(!m_runnableQueue.empty()))
191 // Use first of the queue.
192 nextFiber = &m_runnableQueue.front();
193 else
194 // No fiber to switch to.
195 nextFiber = &m_workerFiber;
196 }
197
198 zth_assert(nextFiber == &m_workerFiber || nextFiber->listPrev());
199 zth_assert(nextFiber == &m_workerFiber || nextFiber->listNext());
200
201 {
202 // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDelete)
203 Fiber* prevFiber = m_currentFiber;
204 m_currentFiber = nextFiber;
205
206 if(unlikely(nextFiber != &m_workerFiber))
207 m_runnableQueue.rotate(*nextFiber->listNext());
208
209 int res =
210 nextFiber->run(likely(prevFiber) ? *prevFiber : m_workerFiber, now);
211 // Warning! When res == 0, fiber might already have been deleted.
212 m_currentFiber = prevFiber;
213
214 switch(res) {
215 case 0:
216 // Ok, just returned to this fiber. Continue execution.
217 return true;
218 case EAGAIN:
219 // Switching to the same fiber.
220 return didSchedule;
221 case EPERM:
222 // fiber just died.
223 cleanup(*nextFiber);
224 // Retry to find a fiber.
225 nextFiber = nullptr;
226 didSchedule = true;
227 goto reschedule;
228 default:
229 zth_abort("Unhandled Fiber::run() error: %s", err(res).c_str());
230 }
231 }
232 }
233
235 {
236 zth_assert(fiber.state() == Fiber::Dead);
237
238 if(unlikely(m_currentFiber == &fiber)) {
239 // It seems that the current fiber is dead.
240 // In this case, we cannot delete the fiber and its context,
241 // as that is the context we are currently using.
242 // Return to the worker's context and sort it out from there.
243
244 zth_dbg(worker, "[%s] Current fiber %s just died; switch to worker",
245 id_str(), fiber.id_str());
247 schedule(&m_workerFiber);
248
249 // We should not get here, as we are dead.
250 zth_abort("[Worker %p] Failed to switch to worker", this);
251 }
252
253 zth_dbg(worker, "[%s] Fiber %s is dead; cleanup", id_str(), fiber.id_str());
254 // Remove from runnable queue
255 m_runnableQueue.erase(fiber);
256 zth_assert(&fiber != &m_workerFiber);
257 // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDelete)
258 delete &fiber;
259
261 }
262
264 {
265 switch(fiber.state()) {
266 case Fiber::New:
267 case Fiber::Ready:
268 release(fiber);
269 fiber.suspend();
270 add(&fiber);
271 break;
272 case Fiber::Running:
273 release(fiber);
274 fiber.suspend();
275 add(&fiber);
278 // Current fiber suspended, immediate reschedule.
279 schedule();
280 break;
281 case Fiber::Waiting:
282 // Not on my queue...
283 fiber.suspend();
284 break;
286 case Fiber::Dead:
287 case Fiber::Suspended:
288 default:; // Ignore.
289 }
290 }
291
292 void resume(Fiber& fiber) noexcept
293 {
294 if(fiber.state() != Fiber::Suspended)
295 return;
296
297 release(fiber);
298 fiber.resume();
299 add(&fiber);
300 }
301
302 Timestamp const& runEnd() const noexcept
303 {
304 return m_end;
305 }
306
307 void run(TimeInterval const& duration = TimeInterval())
308 {
309 if(duration <= 0) {
310 zth_dbg(worker, "[%s] Run", id_str());
311 m_end = Timestamp::null();
312 } else {
313 zth_dbg(worker, "[%s] Run for %s", id_str(), duration.str().c_str());
314 m_end = Timestamp::now() + duration;
315 }
316
317 while(!m_runnableQueue.empty()
318 && (runEnd().isNull() || Timestamp::now() < runEnd())) {
319 schedule();
321 }
322
324 zth_dbg(worker, "[%s] Stopped", id_str());
325 }
326
328
329 Load_type& load() noexcept
330 {
331 return m_load;
332 }
333
334 Load_type const& load() const noexcept
335 {
336 return m_load;
337 }
338
339protected:
340 static void dummyWorkerEntry(void*)
341 {
342 zth_abort("The worker fiber should not be executed.");
343 }
344
345 bool isInWorkerContext() const noexcept
346 {
347 return m_currentFiber == nullptr || m_currentFiber == &m_workerFiber;
348 }
349
350 void dbgStats() noexcept
351 {
353 return;
354
355 zth_dbg(list, "[%s] Run queue:", id_str());
356 if(m_runnableQueue.empty())
357 zth_dbg(list, "[%s] <empty>", id_str());
358 else
359 for(decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
360 it != m_runnableQueue.end(); ++it)
361 zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
362
363 zth_dbg(list, "[%s] Suspended queue:", id_str());
364 if(m_suspendedQueue.empty())
365 zth_dbg(list, "[%s] <empty>", id_str());
366 else
367 for(decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
368 it != m_suspendedQueue.end(); ++it)
369 zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
370 }
371
372private:
373 Fiber* m_currentFiber;
374 List<Fiber> m_runnableQueue;
375 List<Fiber> m_suspendedQueue;
376 Fiber m_workerFiber;
377 Waiter m_waiter;
378 Timestamp m_end;
379 int m_disableContextSwitch;
380 Load_type m_load;
381
382 friend void worker_global_init();
383};
384
389ZTH_EXPORT __attribute__((pure)) inline Worker& currentWorker() noexcept
390{
391 // Dereference is guarded by the safe_ptr.
392 return *Worker::instance();
393}
394
399ZTH_EXPORT __attribute__((pure)) inline Fiber& currentFiber() noexcept
400{
401 Worker const& w = currentWorker();
402 Fiber* f = w.currentFiber();
403 zth_assert(f);
404 // cppcheck-suppress nullPointerRedundantCheck
405 return *f;
406}
407
408__attribute__((pure)) inline UniqueID<Fiber> const& currentFiberID() noexcept
409{
410 return currentFiber();
411}
412
413inline void getContext(Worker** worker, Fiber** fiber) noexcept
414{
415 Worker& current_worker = currentWorker();
416 if(likely(worker))
417 *worker = &current_worker;
418
419 if(likely(fiber)) {
420 Fiber const* const currentFiber_ = *fiber = current_worker.currentFiber();
421 if(unlikely(!currentFiber_))
422 zth_abort("Not within fiber context");
423 }
424}
425
435ZTH_EXPORT inline void
436yield(Fiber* preferFiber = nullptr, bool alwaysYield = false,
437 Timestamp const& now = Timestamp::now())
438{
439 Fiber const& f = currentFiber();
440
441 perf_syscall("yield()", now);
442 if(unlikely(!alwaysYield && !f.allowYield(now)))
443 return;
444
445 currentWorker().schedule(preferFiber, now);
446}
447
458ZTH_EXPORT inline void outOfWork()
459{
460 yield(nullptr, true);
461}
462
463inline void suspend()
464{
465 Worker* worker = nullptr;
466 Fiber* f = nullptr;
467 getContext(&worker, &f);
468 worker->suspend(*f);
469}
470
471inline void resume(Fiber& fiber)
472{
473 Worker* worker = nullptr;
474 getContext(&worker, nullptr);
475 worker->resume(fiber);
476}
477
478int startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr);
479int execlp(char const* file, char const* arg, ... /*, nullptr */);
480int execvp(char const* file, char* const arg[]);
481
482} // namespace zth
483
489EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_yield() noexcept
490{
491 zth::yield();
492}
493
499EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_outOfWork() noexcept
500{
502}
503
511EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_worker_create() noexcept
512{
514 return EINVAL;
515
516 try {
517 new zth::Worker();
518 return 0;
519 } catch(std::bad_alloc const&) {
520 return ENOMEM;
521 } catch(...) {
522 }
523 return EAGAIN;
524}
525
531EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_run(struct timespec const* ts = nullptr) noexcept
532{
534 if(unlikely(!w))
535 return;
536
537 w->run(ts ? zth::TimeInterval(ts->tv_sec, ts->tv_nsec) : zth::TimeInterval());
538}
539
545EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_destroy() noexcept
546{
548 if(unlikely(!w))
549 return;
550 delete w;
551}
552
558EXTERN_C ZTH_EXPORT ZTH_INLINE int
559zth_startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr) noexcept
560{
561 return zth::startWorkerThread(f, stack, name);
562}
563
569EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_execvp(char const* file, char* const arg[]) noexcept
570{
571 return zth::execvp(file, arg);
572}
573
574#else // !__cplusplus
575
576# include <time.h>
577
578ZTH_EXPORT void zth_yield();
579ZTH_EXPORT void zth_outOfWork();
580
581ZTH_EXPORT int zth_worker_create();
582ZTH_EXPORT void zth_worker_run(struct timespec const* ts);
583ZTH_EXPORT int zth_worker_destroy();
584
585ZTH_EXPORT int zth_startWorkerThread(void (*f)(), size_t stack, char const* name);
586ZTH_EXPORT int zth_execvp(char const* file, char* const arg[]);
587
588#endif // __cplusplus
589#endif // ZTH_WORKER_H
The fiber.
Definition fiber.h:49
int init(Timestamp const &now=Timestamp::now())
Definition fiber.h:174
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
Definition fiber.h:265
int setStackSize(size_t size) noexcept
Definition fiber.h:115
@ Suspended
Definition fiber.h:68
@ Waiting
Definition fiber.h:68
@ Ready
Definition fiber.h:68
@ Running
Definition fiber.h:68
@ Uninitialized
Definition fiber.h:68
int run(Fiber &from, Timestamp now=Timestamp::now())
Definition fiber.h:196
void kill() noexcept
Definition fiber.h:270
type * listPrev() const noexcept
Definition list.h:73
type * listNext() const noexcept
Definition list.h:68
Measure the load of some activity.
Definition perf.h:378
Singleton pattern, but only per-thread.
Definition util.h:1012
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Definition util.h:1047
Convenient wrapper around struct timespec that contains a time interval.
Definition time.h:55
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition time.h:568
static Timestamp now()
Definition time.h:595
static constexpr Timestamp null() noexcept
Definition time.h:715
Keeps track of a process-wide unique ID within the type T.
Definition util.h:715
virtual char const * id_str() const override
Definition util.h:809
void setName(string const &name)
Definition util.h:788
A single fiber per Worker that manages sleeping and blocked fibers.
Definition waiter.h:194
The class that manages the fibers within this thread.
Definition worker.h:35
static void dummyWorkerEntry(void *)
Definition worker.h:340
void contextSwitchDisable() noexcept
Definition worker.h:135
void dbgStats() noexcept
Definition worker.h:350
friend void worker_global_init()
Definition worker.cpp:31
Load Load_type
Definition worker.h:327
Load_type const & load() const noexcept
Definition worker.h:334
void run(TimeInterval const &duration=TimeInterval())
Definition worker.h:307
void release(Fiber &fiber) noexcept
Definition worker.h:145
Load_type & load() noexcept
Definition worker.h:329
Waiter & waiter() noexcept
Definition worker.h:99
Fiber * currentFiber() const noexcept
Definition worker.h:94
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition worker.h:162
void contextSwitchEnable(bool enable=true) noexcept
Definition worker.h:126
void cleanup(Fiber &fiber)
Definition worker.h:234
void suspend(Fiber &fiber)
Definition worker.h:263
void resume(Fiber &fiber) noexcept
Definition worker.h:292
bool contextSwitchEnabled() const noexcept
Definition worker.h:140
bool isInWorkerContext() const noexcept
Definition worker.h:345
virtual ~Worker() override
Definition worker.h:76
void add(Fiber *fiber) noexcept
Definition worker.h:104
Worker & operator<<(Fiber *fiber) noexcept
Definition worker.h:120
Timestamp const & runEnd() const noexcept
Definition worker.h:302
void zth_outOfWork() noexcept
Force a context switch.
Definition worker.h:499
void zth_worker_destroy() noexcept
Cleanup the worker.
Definition worker.h:545
int zth_worker_create() noexcept
Create a Worker.
Definition worker.h:511
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
Definition worker.h:569
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
Definition worker.h:559
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
Definition worker.h:531
void zth_yield() noexcept
Allow a context switch.
Definition worker.h:489
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition util.cpp:334
void outOfWork()
Force a context switch.
Definition worker.h:458
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
Definition worker.cpp:62
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:389
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition worker.h:436
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition worker.h:399
int execvp(char const *file, char *const arg[])
Start an external program.
Definition worker.cpp:153
fiber_type< F >::factory fiber(F f, char const *name=nullptr)
Create a new fiber.
Definition async.h:754
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
Definition perf.h:294
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:189
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition allocator.h:143
void zth_init()
Perform one-time global initialization of the Zth library.
Definition init.cpp:25
#define ZTH_INLINE
Definition macros.h:129
void getContext(Worker **worker, Fiber **fiber) noexcept
Definition worker.h:413
void perf_deinit()
Definition perf.cpp:492
int perf_init()
Definition perf.cpp:478
void resume(Fiber &fiber)
Definition worker.h:471
int context_init() noexcept
One-time context mechanism initialization.
Definition context.cpp:29
void sigchld_check()
Definition worker.cpp:189
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:675
void suspend()
Definition worker.h:463
UniqueID< Fiber > const & currentFiberID() noexcept
Definition worker.h:408
void context_deinit() noexcept
Final cleanup.
Definition context.cpp:40
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition perf.h:361
static int const Print_worker
Definition config.h:126
static bool const SupportDebugPrint
Add support to enable debug output prints.
Definition config.h:109
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:212
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:40
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:55