Zth (libzth)
Loading...
Searching...
No Matches
worker.h
Go to the documentation of this file.
1#ifndef ZTH_WORKER_H
2#define ZTH_WORKER_H
3/*
4 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 */
8
9#include <libzth/macros.h>
10
11#ifdef __cplusplus
12# include <libzth/allocator.h>
13# include <libzth/config.h>
14# include <libzth/context.h>
15# include <libzth/fiber.h>
16# include <libzth/init.h>
17# include <libzth/list.h>
18# include <libzth/perf.h>
19# include <libzth/waiter.h>
20
21# include <cstring>
22# include <limits>
23# include <time.h>
24
25namespace zth {
26
27void sigchld_check();
28
29class Worker;
30
35class Worker : public UniqueID<Worker>, public ThreadLocalSingleton<Worker> {
37public:
38# if ZTH_SHARED_LIB
39 static safe_ptr<Worker>::type instance() noexcept;
40# endif // ZTH_SHARED_LIB
41
43 : UniqueID("Worker")
44 , m_currentFiber()
45 , m_workerFiber(&dummyWorkerEntry)
46 , m_waiter(*this)
47 , m_disableContextSwitch()
48 {
49 zth_init();
50
51 int res = 0;
52 zth_dbg(worker, "[%s] Created", id_str());
53
54 if(instance() != this)
55 zth_abort("Only one worker allowed per thread");
56
57 // cppcheck-suppress knownConditionTrueFalse
58 if((res = context_init()))
59 goto error;
60
61 m_workerFiber.setName("zth::Worker");
62 m_workerFiber.setStackSize(0); // no stack
63 if((res = m_workerFiber.init()))
64 goto error;
65
67 // cppcheck-suppress knownConditionTrueFalse
68 if((res = perf_init()))
69 goto error;
70
71 perf_fiber(m_workerFiber);
72 }
73
74 if((res = waiter().run()))
75 goto error;
76
77 return;
78
79error:
80 zth_abort("Cannot create Worker; %s", err(res).c_str());
81 }
82
83 virtual ~Worker() noexcept override
84 {
85 zth_dbg(worker, "[%s] Destruct", id_str());
86
87 while(!m_suspendedQueue.empty()) {
88 Fiber& f = m_suspendedQueue.front();
89 resume(f);
90 f.kill();
91 }
92 while(!m_runnableQueue.empty()) {
93 m_runnableQueue.front().kill();
94 cleanup(m_runnableQueue.front());
95 }
96
99
101 }
102
103 Fiber* currentFiber() const noexcept
104 {
105 return m_currentFiber;
106 }
107
108 Waiter& waiter() noexcept
109 {
110 return m_waiter;
111 }
112
113 void add(Fiber* fiber, bool front = false) noexcept
114 {
116 zth_assert(fiber->state() != Fiber::Waiting); // We don't manage 'Waiting' here.
117
118 if(unlikely(fiber->state() == Fiber::Suspended)) {
119 m_suspendedQueue.push_back(*fiber);
120 zth_dbg(worker, "[%s] Added suspended %s", id_str(), fiber->id_str());
121 } else {
122 if(unlikely(front))
123 m_runnableQueue.push_front(*fiber);
124 else
125 m_runnableQueue.push_back(*fiber);
126 zth_dbg(worker, "[%s] Added runnable %s", id_str(), fiber->id_str());
127 }
128
129 dbgStats();
130 }
131
132 void hatch(Fiber& fiber) noexcept
133 {
134 zth_assert(fiber.state() == Fiber::New);
135 fiber.used();
136 add(&fiber);
137 }
138
140 {
142 // cppcheck-suppress nullPointerRedundantCheck
143 if(fiber->state() == Fiber::New)
144 hatch(*fiber);
145 else
146 add(fiber);
147
148 return *this;
149 }
150
151 void contextSwitchEnable(bool enable = true) noexcept
152 {
153 if(enable) {
154 zth_assert(m_disableContextSwitch > 0);
155 m_disableContextSwitch--;
156 } else
157 m_disableContextSwitch++;
158 }
159
160 void contextSwitchDisable() noexcept
161 {
162 contextSwitchEnable(false);
163 }
164
165 bool contextSwitchEnabled() const noexcept
166 {
167 return m_disableContextSwitch == 0;
168 }
169
170 void release(Fiber& fiber) noexcept
171 {
172 if(unlikely(fiber.state() == Fiber::Suspended)) {
173 m_suspendedQueue.erase(fiber);
174 zth_dbg(worker, "[%s] Removed %s from suspended queue", id_str(),
175 fiber.id_str());
176 } else {
177 m_runnableQueue.erase(fiber);
178 zth_dbg(worker, "[%s] Removed %s from runnable queue", id_str(),
179 fiber.id_str());
180 }
181 dbgStats();
182 }
183
184 bool schedule(Fiber* preferFiber = nullptr, Timestamp const& now = Timestamp::now())
185 {
187 // Don't switch, immediately continue.
188 return true;
189
190 if(preferFiber)
191 zth_dbg(worker, "[%s] Schedule to %s", id_str(), preferFiber->id_str());
192 else
193 zth_dbg(worker, "[%s] Schedule", id_str());
194
195 dbgStats();
196
197 // Check if fiber is within the runnable queue.
199 !preferFiber || preferFiber == &m_workerFiber
200 || m_runnableQueue.contains(*preferFiber));
201
202 if(unlikely(!runEnd().isNull() && runEnd().isBefore(now))) {
203 // Stop worker and return to its run1() call.
204 zth_dbg(worker, "[%s] Time is up", id_str());
205 preferFiber = &m_workerFiber;
206 }
207
208 Fiber* nextFiber = preferFiber;
209 bool didSchedule = false;
210reschedule:
211 if(likely(!nextFiber)) {
212 if(likely(!m_runnableQueue.empty()))
213 // Use first of the queue.
214 nextFiber = &m_runnableQueue.front();
215 else
216 // No fiber to switch to.
217 nextFiber = &m_workerFiber;
218 }
219
220 {
221 // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDelete)
222 Fiber* prevFiber = m_currentFiber;
223 m_currentFiber = nextFiber;
224
225 if(unlikely(nextFiber != &m_workerFiber))
226 m_runnableQueue.rotate(*++m_runnableQueue.cyclic(*nextFiber));
227
228 int res =
229 nextFiber->run(likely(prevFiber) ? *prevFiber : m_workerFiber, now);
230 // Warning! When res == 0, fiber might already have been deleted.
231 m_currentFiber = prevFiber;
232
233 switch(res) {
234 case 0:
235 // Ok, just returned to this fiber. Continue execution.
236 return true;
237 case EAGAIN:
238 // Switching to the same fiber.
239 return didSchedule;
240 case EPERM:
241 // fiber just died.
242 cleanup(*nextFiber);
243 // Retry to find a fiber.
244 nextFiber = nullptr;
245 didSchedule = true;
246 goto reschedule;
247 default:
248 zth_abort("Unhandled Fiber::run() error: %s", err(res).c_str());
249 }
250 }
251 }
252
254 {
255 zth_assert(fiber.state() == Fiber::Dead);
256
257 if(unlikely(m_currentFiber == &fiber)) {
258 // It seems that the current fiber is dead.
259 // In this case, we cannot delete the fiber and its context,
260 // as that is the context we are currently using.
261 // Return to the worker's context and sort it out from there.
262
263 zth_dbg(worker, "[%s] Current fiber %s just died; switch to worker",
264 id_str(), fiber.id_str());
266 schedule(&m_workerFiber);
267
268 // We should not get here, as we are dead.
269 zth_abort("[Worker %p] Failed to switch to worker", this);
270 }
271
272 zth_dbg(worker, "[%s] Fiber %s is dead; cleanup", id_str(), fiber.id_str());
273 // Remove from runnable queue
274 m_runnableQueue.erase(fiber);
275 zth_assert(&fiber != &m_workerFiber);
276 fiber.unused();
277
279 }
280
282 {
283 switch(fiber.state()) {
284 case Fiber::New:
285 case Fiber::Ready:
286 release(fiber);
287 fiber.suspend();
288 add(&fiber);
289 break;
290 case Fiber::Running:
291 release(fiber);
292 fiber.suspend();
293 add(&fiber);
296 // Current fiber suspended, immediate reschedule.
297 schedule();
298 break;
299 case Fiber::Waiting:
300 // Not on my queue...
301 fiber.suspend();
302 break;
304 case Fiber::Cancel:
305 case Fiber::Dead:
306 case Fiber::Suspended:
307 default:; // Ignore.
308 }
309 }
310
311 void resume(Fiber& fiber) noexcept
312 {
313 if(fiber.state() != Fiber::Suspended)
314 return;
315
316 release(fiber);
317 fiber.resume();
318 add(&fiber);
319 }
320
321 Timestamp const& runEnd() const noexcept
322 {
323 return m_end;
324 }
325
326 void run(TimeInterval const& duration = TimeInterval())
327 {
328 if(duration <= 0) {
329 zth_dbg(worker, "[%s] Run", id_str());
330 m_end = Timestamp::null();
331 } else {
332 zth_dbg(worker, "[%s] Run for %s", id_str(), duration.str().c_str());
333 m_end = Timestamp::now() + duration;
334 }
335
336 while(!m_runnableQueue.empty()
337 && (runEnd().isNull() || Timestamp::now() < runEnd())) {
338 schedule();
340 }
341
343 zth_dbg(worker, "[%s] Stopped", id_str());
344 }
345
347
348 Load_type& load() noexcept
349 {
350 return m_load;
351 }
352
353 Load_type const& load() const noexcept
354 {
355 return m_load;
356 }
357
358protected:
359 static void dummyWorkerEntry(void*)
360 {
361 zth_abort("The worker fiber should not be executed.");
362 }
363
364 bool isInWorkerContext() const noexcept
365 {
366 return m_currentFiber == nullptr || m_currentFiber == &m_workerFiber;
367 }
368
369 void dbgStats() noexcept
370 {
372 return;
373
374 zth_dbg(list, "[%s] Run queue:", id_str());
375 if(m_runnableQueue.empty())
376 zth_dbg(list, "[%s] <empty>", id_str());
377 else
378 for(decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
379 it != m_runnableQueue.end(); ++it)
380 zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
381
382 zth_dbg(list, "[%s] Suspended queue:", id_str());
383 if(m_suspendedQueue.empty())
384 zth_dbg(list, "[%s] <empty>", id_str());
385 else
386 for(decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
387 it != m_suspendedQueue.end(); ++it)
388 zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
389 }
390
391protected:
392 Stack& workerStack() noexcept
393 {
394 return m_stack;
395 }
396
397 friend class Context;
398
399private:
400 Fiber* m_currentFiber;
401 List<Fiber> m_runnableQueue;
402 List<Fiber> m_suspendedQueue;
403 Fiber m_workerFiber;
404 Waiter m_waiter;
405 Timestamp m_end;
406 int m_disableContextSwitch;
407 Load_type m_load;
408 Stack m_stack;
409
410 friend void worker_global_init();
411};
412
417ZTH_EXPORT __attribute__((pure)) inline Worker& currentWorker() noexcept
418{
419 // Dereference is guarded by the safe_ptr.
420 return *Worker::instance();
421}
422
427ZTH_EXPORT __attribute__((pure)) inline Fiber& currentFiber() noexcept
428{
429 Worker const& w = currentWorker();
430 Fiber* f = w.currentFiber();
431 zth_assert(f);
432 // cppcheck-suppress nullPointerRedundantCheck
433 return *f;
434}
435
436__attribute__((pure)) inline UniqueID<Fiber> const& currentFiberID() noexcept
437{
438 return currentFiber();
439}
440
441inline void getContext(Worker** worker, Fiber** fiber) noexcept
442{
443 Worker& current_worker = currentWorker();
444 if(likely(worker))
445 *worker = &current_worker;
446
447 if(likely(fiber)) {
448 Fiber const* const currentFiber_ = *fiber = current_worker.currentFiber();
449 if(unlikely(!currentFiber_))
450 zth_abort("Not within fiber context");
451 }
452}
453
463ZTH_EXPORT inline void
464yield(Fiber* preferFiber = nullptr, bool alwaysYield = false,
465 Timestamp const& now = Timestamp::now())
466{
467 Fiber const& f = currentFiber();
468
469 perf_syscall("yield()", now);
470 if(unlikely(!alwaysYield && !f.allowYield(now)))
471 return;
472
473 currentWorker().schedule(preferFiber, now);
474}
475
486ZTH_EXPORT inline void outOfWork()
487{
488 yield(nullptr, true);
489}
490
491inline void suspend()
492{
493 Worker* worker = nullptr;
494 Fiber* f = nullptr;
495 getContext(&worker, &f);
496 worker->suspend(*f);
497}
498
499inline void resume(Fiber& fiber)
500{
501 Worker* worker = nullptr;
502 getContext(&worker, nullptr);
503 worker->resume(fiber);
504}
505
506int startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr);
507int execlp(char const* file, char const* arg, ... /*, nullptr */);
508int execvp(char const* file, char* const arg[]);
509
510} // namespace zth
511
517EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_yield() noexcept
518{
519 zth::yield();
520}
521
527EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_outOfWork() noexcept
528{
530}
531
539EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_worker_create() noexcept
540{
542 return EINVAL;
543
544 try {
545 new zth::Worker();
546 return 0;
547 } catch(std::bad_alloc const&) {
548 return ENOMEM;
549 } catch(...) {
550 }
551 return EAGAIN;
552}
553
559EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_run(struct timespec const* ts = nullptr) noexcept
560{
562 if(unlikely(!w))
563 return;
564
565 w->run(ts ? zth::TimeInterval(ts->tv_sec, ts->tv_nsec) : zth::TimeInterval());
566}
567
573EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_destroy() noexcept
574{
576 if(unlikely(!w))
577 return;
578 delete w;
579}
580
586EXTERN_C ZTH_EXPORT ZTH_INLINE int
587zth_startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr) noexcept
588{
589 return zth::startWorkerThread(f, stack, name);
590}
591
597EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_execvp(char const* file, char* const arg[]) noexcept
598{
599 return zth::execvp(file, arg);
600}
601
607EXTERN_C ZTH_EXPORT ZTH_INLINE zth_fiber_t zth_current_fiber() noexcept
608{
609 zth_fiber_t res = {};
611 if(!w)
612 return res;
613
614 zth::Fiber const* f = w->currentFiber();
615 if(!f)
616 return res;
617
618 return f->handle();
619}
620
621#else // !__cplusplus
622
623# include <time.h>
624
625ZTH_EXPORT void zth_yield();
626ZTH_EXPORT void zth_outOfWork();
627
628ZTH_EXPORT int zth_worker_create();
629ZTH_EXPORT void zth_worker_run(struct timespec const* ts);
630ZTH_EXPORT int zth_worker_destroy();
631
632ZTH_EXPORT int zth_startWorkerThread(void (*f)(), size_t stack, char const* name);
633ZTH_EXPORT int zth_execvp(char const* file, char* const arg[]);
634
635ZTH_EXPORT zth_fiber_t zth_current_fiber();
636
637#endif // __cplusplus
638#endif // ZTH_WORKER_H
The fiber.
Definition fiber.h:62
int init(Timestamp const &now=Timestamp::now())
Definition fiber.h:205
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
Definition fiber.h:304
zth_fiber_t handle() const noexcept
Definition fiber.h:127
int setStackSize(size_t size) noexcept
Definition fiber.h:139
@ Suspended
Definition fiber.h:82
@ Waiting
Definition fiber.h:82
@ Cancel
Definition fiber.h:82
@ Ready
Definition fiber.h:82
@ Running
Definition fiber.h:82
@ Uninitialized
Definition fiber.h:82
int run(Fiber &from, Timestamp now=Timestamp::now())
Definition fiber.h:229
void kill() noexcept
Definition fiber.h:309
Measure the load of some activity.
Definition perf.h:91
virtual char const * id_str() const noexcept override
Definition util.h:787
void setName(string const &name)
Definition util.h:766
Singleton pattern, but only per-thread.
Definition util.h:1161
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Definition util.h:1196
Convenient wrapper around struct timespec that contains a time interval.
Definition time.h:82
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition time.h:629
static Timestamp now()
Definition time.h:656
static constexpr Timestamp null() noexcept
Definition time.h:775
Keeps track of a process-wide unique ID within the type T.
Definition util.h:875
A single fiber per Worker that manages sleeping and blocked fibers.
Definition waiter.h:194
The class that manages the fibers within this thread.
Definition worker.h:35
static void dummyWorkerEntry(void *)
Definition worker.h:359
void contextSwitchDisable() noexcept
Definition worker.h:160
void dbgStats() noexcept
Definition worker.h:369
friend void worker_global_init()
Definition worker.cpp:41
Load Load_type
Definition worker.h:346
Load_type const & load() const noexcept
Definition worker.h:353
void hatch(Fiber &fiber) noexcept
Definition worker.h:132
void run(TimeInterval const &duration=TimeInterval())
Definition worker.h:326
void release(Fiber &fiber) noexcept
Definition worker.h:170
Stack & workerStack() noexcept
Definition worker.h:392
Load_type & load() noexcept
Definition worker.h:348
Waiter & waiter() noexcept
Definition worker.h:108
Fiber * currentFiber() const noexcept
Definition worker.h:103
void add(Fiber *fiber, bool front=false) noexcept
Definition worker.h:113
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition worker.h:184
void contextSwitchEnable(bool enable=true) noexcept
Definition worker.h:151
void cleanup(Fiber &fiber)
Definition worker.h:253
void suspend(Fiber &fiber)
Definition worker.h:281
void resume(Fiber &fiber) noexcept
Definition worker.h:311
bool contextSwitchEnabled() const noexcept
Definition worker.h:165
bool isInWorkerContext() const noexcept
Definition worker.h:364
virtual ~Worker() noexcept override
Definition worker.h:83
Worker & operator<<(Fiber *fiber) noexcept
Definition worker.h:139
Timestamp const & runEnd() const noexcept
Definition worker.h:321
Wrapper for a pointer, which checks validity of the pointer upon dereference.
Definition util.h:1028
void zth_outOfWork() noexcept
Force a context switch.
Definition worker.h:527
void zth_worker_destroy() noexcept
Cleanup the worker.
Definition worker.h:573
int zth_worker_create() noexcept
Create a Worker.
Definition worker.h:539
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
Definition worker.h:597
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
Definition worker.h:587
zth_fiber_t zth_current_fiber() noexcept
Return the currently executing fiber.
Definition worker.h:607
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
Definition worker.h:559
void zth_yield() noexcept
Allow a context switch.
Definition worker.h:517
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition util.cpp:342
void outOfWork()
Force a context switch.
Definition worker.h:486
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
Definition worker.cpp:72
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:417
fiber_type< F >::fiber fiber(F &&f, Args &&... args)
Create and start a new fiber.
Definition async.h:1221
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition worker.h:464
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition worker.h:427
int execvp(char const *file, char *const arg[])
Start an external program.
Definition worker.cpp:163
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:194
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition allocator.h:160
void zth_init()
Perform one-time global initialization of the Zth library.
Definition init.cpp:38
#define ZTH_INLINE
Definition macros.h:139
void now(struct timespec &ts)
Returns the current timestamp.
Definition time.h:59
void getContext(Worker **worker, Fiber **fiber) noexcept
Definition worker.h:441
void perf_deinit()
Definition perf.cpp:803
int perf_init()
Initializes the per-thread perf event buffer.
Definition perf.cpp:782
void resume(Fiber &fiber)
Definition worker.h:499
int context_init() noexcept
One-time context mechanism initialization.
Definition context.cpp:29
void sigchld_check()
Definition worker.cpp:199
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:701
void suspend()
Definition worker.h:491
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp()) noexcept
Put a syscall into the perf output.
Definition perf.h:71
UniqueID< Fiber > const & currentFiberID() noexcept
Definition worker.h:436
void context_deinit() noexcept
Final cleanup.
Definition context.cpp:40
void perf_fiber(Fiber &f) noexcept
Write fiber ID/name to the perf buffer.
Definition perf.cpp:500
static bool const EnablePerfEvent
Enable (but not necessarily record) perf.
Definition config.h:265
static int const Print_worker
Definition config.h:155
static bool const SupportDebugPrint
Add support to enable debug output prints.
Definition config.h:131
Stack information.
Definition context.h:89
Opaque fiber handle type.
Definition fiber.h:24
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:217
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:45
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:60