Zth (libzth)
Loading...
Searching...
No Matches
worker.h
Go to the documentation of this file.
1#ifndef ZTH_WORKER_H
2#define ZTH_WORKER_H
3/*
4 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 */
8
9#include <libzth/macros.h>
10
11#ifdef __cplusplus
12# include <libzth/allocator.h>
13# include <libzth/config.h>
14# include <libzth/context.h>
15# include <libzth/fiber.h>
16# include <libzth/init.h>
17# include <libzth/list.h>
18# include <libzth/perf.h>
19# include <libzth/waiter.h>
20
21# include <cstring>
22# include <limits>
23# include <time.h>
24
25namespace zth {
26
27void sigchld_check();
28
29class Worker;
30
35class Worker : public UniqueID<Worker>, public ThreadLocalSingleton<Worker> {
37public:
39 : UniqueID("Worker")
40 , m_currentFiber()
41 , m_workerFiber(&dummyWorkerEntry)
42 , m_waiter(*this)
43 , m_disableContextSwitch()
44 {
45 zth_init();
46
47 int res = 0;
48 zth_dbg(worker, "[%s] Created", id_str());
49
50 if(instance() != this)
51 zth_abort("Only one worker allowed per thread");
52
53 // cppcheck-suppress knownConditionTrueFalse
54 if((res = context_init()))
55 goto error;
56
57 m_workerFiber.setName("zth::Worker");
58 m_workerFiber.setStackSize(0); // no stack
59 if((res = m_workerFiber.init()))
60 goto error;
61
62 if((res = perf_init()))
63 goto error;
64
65 zth_perf_event(m_workerFiber);
66
67 if((res = waiter().run()))
68 goto error;
69
70 return;
71
72error:
73 zth_abort("Cannot create Worker; %s", err(res).c_str());
74 }
75
76 virtual ~Worker() noexcept override
77 {
78 zth_dbg(worker, "[%s] Destruct", id_str());
79
80 while(!m_suspendedQueue.empty()) {
81 Fiber& f = m_suspendedQueue.front();
82 resume(f);
83 f.kill();
84 }
85 while(!m_runnableQueue.empty()) {
86 m_runnableQueue.front().kill();
87 cleanup(m_runnableQueue.front());
88 }
89
92 }
93
94 Fiber* currentFiber() const noexcept
95 {
96 return m_currentFiber;
97 }
98
99 Waiter& waiter() noexcept
100 {
101 return m_waiter;
102 }
103
104 void add(Fiber* fiber, bool front = false) noexcept
105 {
107 zth_assert(fiber->state() != Fiber::Waiting); // We don't manage 'Waiting' here.
108
109 if(unlikely(fiber->state() == Fiber::Suspended)) {
110 m_suspendedQueue.push_back(*fiber);
111 zth_dbg(worker, "[%s] Added suspended %s", id_str(), fiber->id_str());
112 } else {
113 if(unlikely(front))
114 m_runnableQueue.push_front(*fiber);
115 else
116 m_runnableQueue.push_back(*fiber);
117 zth_dbg(worker, "[%s] Added runnable %s", id_str(), fiber->id_str());
118 }
119
120 dbgStats();
121 }
122
123 void hatch(Fiber& fiber) noexcept
124 {
125 zth_assert(fiber.state() == Fiber::New);
126 fiber.used();
127 add(&fiber);
128 }
129
131 {
133 // cppcheck-suppress nullPointerRedundantCheck
134 if(fiber->state() == Fiber::New)
135 hatch(*fiber);
136 else
137 add(fiber);
138
139 return *this;
140 }
141
142 void contextSwitchEnable(bool enable = true) noexcept
143 {
144 if(enable) {
145 zth_assert(m_disableContextSwitch > 0);
146 m_disableContextSwitch--;
147 } else
148 m_disableContextSwitch++;
149 }
150
151 void contextSwitchDisable() noexcept
152 {
153 contextSwitchEnable(false);
154 }
155
156 bool contextSwitchEnabled() const noexcept
157 {
158 return m_disableContextSwitch == 0;
159 }
160
161 void release(Fiber& fiber) noexcept
162 {
163 if(unlikely(fiber.state() == Fiber::Suspended)) {
164 m_suspendedQueue.erase(fiber);
165 zth_dbg(worker, "[%s] Removed %s from suspended queue", id_str(),
166 fiber.id_str());
167 } else {
168 m_runnableQueue.erase(fiber);
169 zth_dbg(worker, "[%s] Removed %s from runnable queue", id_str(),
170 fiber.id_str());
171 }
172 dbgStats();
173 }
174
175 bool schedule(Fiber* preferFiber = nullptr, Timestamp const& now = Timestamp::now())
176 {
178 // Don't switch, immediately continue.
179 return true;
180
181 if(preferFiber)
182 zth_dbg(worker, "[%s] Schedule to %s", id_str(), preferFiber->id_str());
183 else
184 zth_dbg(worker, "[%s] Schedule", id_str());
185
186 dbgStats();
187
188 // Check if fiber is within the runnable queue.
190 !preferFiber || preferFiber == &m_workerFiber
191 || m_runnableQueue.contains(*preferFiber));
192
193 if(unlikely(!runEnd().isNull() && runEnd().isBefore(now))) {
194 // Stop worker and return to its run1() call.
195 zth_dbg(worker, "[%s] Time is up", id_str());
196 preferFiber = &m_workerFiber;
197 }
198
199 Fiber* nextFiber = preferFiber;
200 bool didSchedule = false;
201reschedule:
202 if(likely(!nextFiber)) {
203 if(likely(!m_runnableQueue.empty()))
204 // Use first of the queue.
205 nextFiber = &m_runnableQueue.front();
206 else
207 // No fiber to switch to.
208 nextFiber = &m_workerFiber;
209 }
210
211 {
212 // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDelete)
213 Fiber* prevFiber = m_currentFiber;
214 m_currentFiber = nextFiber;
215
216 if(unlikely(nextFiber != &m_workerFiber))
217 m_runnableQueue.rotate(*++m_runnableQueue.cyclic(*nextFiber));
218
219 int res =
220 nextFiber->run(likely(prevFiber) ? *prevFiber : m_workerFiber, now);
221 // Warning! When res == 0, fiber might already have been deleted.
222 m_currentFiber = prevFiber;
223
224 switch(res) {
225 case 0:
226 // Ok, just returned to this fiber. Continue execution.
227 return true;
228 case EAGAIN:
229 // Switching to the same fiber.
230 return didSchedule;
231 case EPERM:
232 // fiber just died.
233 cleanup(*nextFiber);
234 // Retry to find a fiber.
235 nextFiber = nullptr;
236 didSchedule = true;
237 goto reschedule;
238 default:
239 zth_abort("Unhandled Fiber::run() error: %s", err(res).c_str());
240 }
241 }
242 }
243
245 {
246 zth_assert(fiber.state() == Fiber::Dead);
247
248 if(unlikely(m_currentFiber == &fiber)) {
249 // It seems that the current fiber is dead.
250 // In this case, we cannot delete the fiber and its context,
251 // as that is the context we are currently using.
252 // Return to the worker's context and sort it out from there.
253
254 zth_dbg(worker, "[%s] Current fiber %s just died; switch to worker",
255 id_str(), fiber.id_str());
257 schedule(&m_workerFiber);
258
259 // We should not get here, as we are dead.
260 zth_abort("[Worker %p] Failed to switch to worker", this);
261 }
262
263 zth_dbg(worker, "[%s] Fiber %s is dead; cleanup", id_str(), fiber.id_str());
264 // Remove from runnable queue
265 m_runnableQueue.erase(fiber);
266 zth_assert(&fiber != &m_workerFiber);
267 fiber.unused();
268
270 }
271
273 {
274 switch(fiber.state()) {
275 case Fiber::New:
276 case Fiber::Ready:
277 release(fiber);
278 fiber.suspend();
279 add(&fiber);
280 break;
281 case Fiber::Running:
282 release(fiber);
283 fiber.suspend();
284 add(&fiber);
287 // Current fiber suspended, immediate reschedule.
288 schedule();
289 break;
290 case Fiber::Waiting:
291 // Not on my queue...
292 fiber.suspend();
293 break;
295 case Fiber::Dead:
296 case Fiber::Suspended:
297 default:; // Ignore.
298 }
299 }
300
301 void resume(Fiber& fiber) noexcept
302 {
303 if(fiber.state() != Fiber::Suspended)
304 return;
305
306 release(fiber);
307 fiber.resume();
308 add(&fiber);
309 }
310
311 Timestamp const& runEnd() const noexcept
312 {
313 return m_end;
314 }
315
316 void run(TimeInterval const& duration = TimeInterval())
317 {
318 if(duration <= 0) {
319 zth_dbg(worker, "[%s] Run", id_str());
320 m_end = Timestamp::null();
321 } else {
322 zth_dbg(worker, "[%s] Run for %s", id_str(), duration.str().c_str());
323 m_end = Timestamp::now() + duration;
324 }
325
326 while(!m_runnableQueue.empty()
327 && (runEnd().isNull() || Timestamp::now() < runEnd())) {
328 schedule();
330 }
331
333 zth_dbg(worker, "[%s] Stopped", id_str());
334 }
335
337
338 Load_type& load() noexcept
339 {
340 return m_load;
341 }
342
343 Load_type const& load() const noexcept
344 {
345 return m_load;
346 }
347
348protected:
349 static void dummyWorkerEntry(void*)
350 {
351 zth_abort("The worker fiber should not be executed.");
352 }
353
354 bool isInWorkerContext() const noexcept
355 {
356 return m_currentFiber == nullptr || m_currentFiber == &m_workerFiber;
357 }
358
359 void dbgStats() noexcept
360 {
362 return;
363
364 zth_dbg(list, "[%s] Run queue:", id_str());
365 if(m_runnableQueue.empty())
366 zth_dbg(list, "[%s] <empty>", id_str());
367 else
368 for(decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
369 it != m_runnableQueue.end(); ++it)
370 zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
371
372 zth_dbg(list, "[%s] Suspended queue:", id_str());
373 if(m_suspendedQueue.empty())
374 zth_dbg(list, "[%s] <empty>", id_str());
375 else
376 for(decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
377 it != m_suspendedQueue.end(); ++it)
378 zth_dbg(list, "[%s] %s", id_str(), it->str().c_str());
379 }
380
381protected:
382 Stack& workerStack() noexcept
383 {
384 return m_stack;
385 }
386
387 friend class Context;
388
389private:
390 Fiber* m_currentFiber;
391 List<Fiber> m_runnableQueue;
392 List<Fiber> m_suspendedQueue;
393 Fiber m_workerFiber;
394 Waiter m_waiter;
395 Timestamp m_end;
396 int m_disableContextSwitch;
397 Load_type m_load;
398 Stack m_stack;
399
400 friend void worker_global_init();
401};
402
407ZTH_EXPORT __attribute__((pure)) inline Worker& currentWorker() noexcept
408{
409 // Dereference is guarded by the safe_ptr.
410 return *Worker::instance();
411}
412
417ZTH_EXPORT __attribute__((pure)) inline Fiber& currentFiber() noexcept
418{
419 Worker const& w = currentWorker();
420 Fiber* f = w.currentFiber();
421 zth_assert(f);
422 // cppcheck-suppress nullPointerRedundantCheck
423 return *f;
424}
425
426__attribute__((pure)) inline UniqueID<Fiber> const& currentFiberID() noexcept
427{
428 return currentFiber();
429}
430
431inline void getContext(Worker** worker, Fiber** fiber) noexcept
432{
433 Worker& current_worker = currentWorker();
434 if(likely(worker))
435 *worker = &current_worker;
436
437 if(likely(fiber)) {
438 Fiber const* const currentFiber_ = *fiber = current_worker.currentFiber();
439 if(unlikely(!currentFiber_))
440 zth_abort("Not within fiber context");
441 }
442}
443
453ZTH_EXPORT inline void
454yield(Fiber* preferFiber = nullptr, bool alwaysYield = false,
455 Timestamp const& now = Timestamp::now())
456{
457 Fiber const& f = currentFiber();
458
459 perf_syscall("yield()", now);
460 if(unlikely(!alwaysYield && !f.allowYield(now)))
461 return;
462
463 currentWorker().schedule(preferFiber, now);
464}
465
476ZTH_EXPORT inline void outOfWork()
477{
478 yield(nullptr, true);
479}
480
481inline void suspend()
482{
483 Worker* worker = nullptr;
484 Fiber* f = nullptr;
485 getContext(&worker, &f);
486 worker->suspend(*f);
487}
488
489inline void resume(Fiber& fiber)
490{
491 Worker* worker = nullptr;
492 getContext(&worker, nullptr);
493 worker->resume(fiber);
494}
495
496int startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr);
497int execlp(char const* file, char const* arg, ... /*, nullptr */);
498int execvp(char const* file, char* const arg[]);
499
500} // namespace zth
501
507EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_yield() noexcept
508{
509 zth::yield();
510}
511
517EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_outOfWork() noexcept
518{
520}
521
529EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_worker_create() noexcept
530{
532 return EINVAL;
533
534 try {
535 new zth::Worker();
536 return 0;
537 } catch(std::bad_alloc const&) {
538 return ENOMEM;
539 } catch(...) {
540 }
541 return EAGAIN;
542}
543
549EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_run(struct timespec const* ts = nullptr) noexcept
550{
552 if(unlikely(!w))
553 return;
554
555 w->run(ts ? zth::TimeInterval(ts->tv_sec, ts->tv_nsec) : zth::TimeInterval());
556}
557
563EXTERN_C ZTH_EXPORT ZTH_INLINE void zth_worker_destroy() noexcept
564{
566 if(unlikely(!w))
567 return;
568 delete w;
569}
570
576EXTERN_C ZTH_EXPORT ZTH_INLINE int
577zth_startWorkerThread(void (*f)(), size_t stack = 0, char const* name = nullptr) noexcept
578{
579 return zth::startWorkerThread(f, stack, name);
580}
581
587EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_execvp(char const* file, char* const arg[]) noexcept
588{
589 return zth::execvp(file, arg);
590}
591
592#else // !__cplusplus
593
594# include <time.h>
595
596ZTH_EXPORT void zth_yield();
597ZTH_EXPORT void zth_outOfWork();
598
599ZTH_EXPORT int zth_worker_create();
600ZTH_EXPORT void zth_worker_run(struct timespec const* ts);
601ZTH_EXPORT int zth_worker_destroy();
602
603ZTH_EXPORT int zth_startWorkerThread(void (*f)(), size_t stack, char const* name);
604ZTH_EXPORT int zth_execvp(char const* file, char* const arg[]);
605
606#endif // __cplusplus
607#endif // ZTH_WORKER_H
The fiber.
Definition fiber.h:49
int init(Timestamp const &now=Timestamp::now())
Definition fiber.h:182
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
Definition fiber.h:275
int setStackSize(size_t size) noexcept
Definition fiber.h:116
@ Suspended
Definition fiber.h:69
@ Waiting
Definition fiber.h:69
@ Ready
Definition fiber.h:69
@ Running
Definition fiber.h:69
@ Uninitialized
Definition fiber.h:69
int run(Fiber &from, Timestamp now=Timestamp::now())
Definition fiber.h:206
void kill() noexcept
Definition fiber.h:280
Measure the load of some activity.
Definition perf.h:378
virtual char const * id_str() const noexcept override
Definition util.h:772
void setName(string const &name)
Definition util.h:751
Singleton pattern, but only per-thread.
Definition util.h:1146
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Definition util.h:1181
Convenient wrapper around struct timespec that contains a time interval.
Definition time.h:55
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition time.h:568
static Timestamp now()
Definition time.h:595
static constexpr Timestamp null() noexcept
Definition time.h:715
Keeps track of a process-wide unique ID within the type T.
Definition util.h:860
A single fiber per Worker that manages sleeping and blocked fibers.
Definition waiter.h:194
The class that manages the fibers within this thread.
Definition worker.h:35
static void dummyWorkerEntry(void *)
Definition worker.h:349
void contextSwitchDisable() noexcept
Definition worker.h:151
void dbgStats() noexcept
Definition worker.h:359
friend void worker_global_init()
Definition worker.cpp:31
Load Load_type
Definition worker.h:336
Load_type const & load() const noexcept
Definition worker.h:343
void hatch(Fiber &fiber) noexcept
Definition worker.h:123
void run(TimeInterval const &duration=TimeInterval())
Definition worker.h:316
void release(Fiber &fiber) noexcept
Definition worker.h:161
Stack & workerStack() noexcept
Definition worker.h:382
Load_type & load() noexcept
Definition worker.h:338
Waiter & waiter() noexcept
Definition worker.h:99
Fiber * currentFiber() const noexcept
Definition worker.h:94
void add(Fiber *fiber, bool front=false) noexcept
Definition worker.h:104
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition worker.h:175
void contextSwitchEnable(bool enable=true) noexcept
Definition worker.h:142
void cleanup(Fiber &fiber)
Definition worker.h:244
void suspend(Fiber &fiber)
Definition worker.h:272
void resume(Fiber &fiber) noexcept
Definition worker.h:301
bool contextSwitchEnabled() const noexcept
Definition worker.h:156
bool isInWorkerContext() const noexcept
Definition worker.h:354
virtual ~Worker() noexcept override
Definition worker.h:76
Worker & operator<<(Fiber *fiber) noexcept
Definition worker.h:130
Timestamp const & runEnd() const noexcept
Definition worker.h:311
void zth_outOfWork() noexcept
Force a context switch.
Definition worker.h:517
void zth_worker_destroy() noexcept
Cleanup the worker.
Definition worker.h:563
int zth_worker_create() noexcept
Create a Worker.
Definition worker.h:529
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
Definition worker.h:587
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
Definition worker.h:577
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
Definition worker.h:549
void zth_yield() noexcept
Allow a context switch.
Definition worker.h:507
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
Definition util.cpp:337
void outOfWork()
Force a context switch.
Definition worker.h:476
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
Definition worker.cpp:62
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
Definition worker.h:407
fiber_type< F >::fiber fiber(F &&f, Args &&... args)
Create and start a new fiber.
Definition async.h:1192
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition worker.h:454
Fiber & currentFiber() noexcept
Return the currently executing fiber.
Definition worker.h:417
int execvp(char const *file, char *const arg[])
Start an external program.
Definition worker.cpp:153
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
Definition perf.h:294
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:194
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition allocator.h:159
void zth_init()
Perform one-time global initialization of the Zth library.
Definition init.cpp:25
#define ZTH_INLINE
Definition macros.h:129
void getContext(Worker **worker, Fiber **fiber) noexcept
Definition worker.h:431
void perf_deinit()
Definition perf.cpp:492
int perf_init()
Definition perf.cpp:478
void resume(Fiber &fiber)
Definition worker.h:489
int context_init() noexcept
One-time context mechanism initialization.
Definition context.cpp:29
void sigchld_check()
Definition worker.cpp:189
string err(int e)
Return a string like strerror() does, but as a zth::string.
Definition util.h:686
void suspend()
Definition worker.h:481
UniqueID< Fiber > const & currentFiberID() noexcept
Definition worker.h:426
void context_deinit() noexcept
Final cleanup.
Definition context.cpp:40
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
Definition perf.h:361
static int const Print_worker
Definition config.h:126
static bool const SupportDebugPrint
Add support to enable debug output prints.
Definition config.h:109
Stack information.
Definition context.h:89
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:217
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:45
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition util.h:60