47 , m_disableContextSwitch()
55 zth_abort(
"Only one worker allowed per thread");
61 m_workerFiber.
setName(
"zth::Worker");
63 if((res = m_workerFiber.
init()))
87 while(!m_suspendedQueue.empty()) {
88 Fiber& f = m_suspendedQueue.front();
92 while(!m_runnableQueue.empty()) {
93 m_runnableQueue.front().kill();
94 cleanup(m_runnableQueue.front());
105 return m_currentFiber;
119 m_suspendedQueue.push_back(*
fiber);
123 m_runnableQueue.push_front(*
fiber);
125 m_runnableQueue.push_back(*
fiber);
155 m_disableContextSwitch--;
157 m_disableContextSwitch++;
167 return m_disableContextSwitch == 0;
173 m_suspendedQueue.erase(
fiber);
174 zth_dbg(worker,
"[%s] Removed %s from suspended queue",
id_str(),
177 m_runnableQueue.erase(
fiber);
178 zth_dbg(worker,
"[%s] Removed %s from runnable queue",
id_str(),
191 zth_dbg(worker,
"[%s] Schedule to %s",
id_str(), preferFiber->id_str());
199 !preferFiber || preferFiber == &m_workerFiber
200 || m_runnableQueue.contains(*preferFiber));
205 preferFiber = &m_workerFiber;
208 Fiber* nextFiber = preferFiber;
209 bool didSchedule =
false;
212 if(
likely(!m_runnableQueue.empty()))
214 nextFiber = &m_runnableQueue.front();
217 nextFiber = &m_workerFiber;
222 Fiber* prevFiber = m_currentFiber;
223 m_currentFiber = nextFiber;
225 if(
unlikely(nextFiber != &m_workerFiber))
226 m_runnableQueue.rotate(*++m_runnableQueue.cyclic(*nextFiber));
229 nextFiber->
run(
likely(prevFiber) ? *prevFiber : m_workerFiber,
now);
231 m_currentFiber = prevFiber;
248 zth_abort(
"Unhandled Fiber::run() error: %s",
err(res).c_str());
263 zth_dbg(worker,
"[%s] Current fiber %s just died; switch to worker",
269 zth_abort(
"[Worker %p] Failed to switch to worker",
this);
274 m_runnableQueue.erase(
fiber);
283 switch(
fiber.state()) {
332 zth_dbg(worker,
"[%s] Run for %s",
id_str(), duration.str().c_str());
336 while(!m_runnableQueue.empty()
361 zth_abort(
"The worker fiber should not be executed.");
366 return m_currentFiber ==
nullptr || m_currentFiber == &m_workerFiber;
375 if(m_runnableQueue.empty())
378 for(
decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
379 it != m_runnableQueue.end(); ++it)
383 if(m_suspendedQueue.empty())
386 for(
decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
387 it != m_suspendedQueue.end(); ++it)
400 Fiber* m_currentFiber;
406 int m_disableContextSwitch;
420 return *Worker::instance();
445 *worker = ¤t_worker;
463ZTH_EXPORT
inline void
488 yield(
nullptr,
true);
506int startWorkerThread(
void (*f)(),
size_t stack = 0,
char const* name =
nullptr);
507int execlp(
char const* file,
char const* arg, ... );
508int execvp(
char const* file,
char*
const arg[]);
547 }
catch(std::bad_alloc
const&) {
633ZTH_EXPORT
int zth_execvp(
char const* file,
char*
const arg[]);
int init(Timestamp const &now=Timestamp::now())
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
zth_fiber_t handle() const noexcept
int setStackSize(size_t size) noexcept
int run(Fiber &from, Timestamp now=Timestamp::now())
Measure the load of some activity.
virtual char const * id_str() const noexcept override
void setName(string const &name)
Singleton pattern, but only per-thread.
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Convenient wrapper around struct timespec that contains a time interval.
Convenient wrapper around struct timespec that contains an absolute timestamp.
static constexpr Timestamp null() noexcept
Keeps track of a process-wide unique ID within the type T.
A single fiber per Worker that manages sleeping and blocked fibers.
The class that manages the fibers within this thread.
static void dummyWorkerEntry(void *)
void contextSwitchDisable() noexcept
friend void worker_global_init()
Load_type const & load() const noexcept
void hatch(Fiber &fiber) noexcept
void run(TimeInterval const &duration=TimeInterval())
void release(Fiber &fiber) noexcept
Stack & workerStack() noexcept
Load_type & load() noexcept
Waiter & waiter() noexcept
Fiber * currentFiber() const noexcept
void add(Fiber *fiber, bool front=false) noexcept
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
void contextSwitchEnable(bool enable=true) noexcept
void cleanup(Fiber &fiber)
void suspend(Fiber &fiber)
void resume(Fiber &fiber) noexcept
bool contextSwitchEnabled() const noexcept
bool isInWorkerContext() const noexcept
virtual ~Worker() noexcept override
Worker & operator<<(Fiber *fiber) noexcept
Timestamp const & runEnd() const noexcept
Wrapper for a pointer, which checks validity of the pointer upon dereference.
void zth_outOfWork() noexcept
Force a context switch.
void zth_worker_destroy() noexcept
Cleanup the worker.
int zth_worker_create() noexcept
Create a Worker.
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
zth_fiber_t zth_current_fiber() noexcept
Return the currently executing fiber.
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
void zth_yield() noexcept
Allow a context switch.
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
void outOfWork()
Force a context switch.
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
fiber_type< F >::fiber fiber(F &&f, Args &&... args)
Create and start a new fiber.
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
int execvp(char const *file, char *const arg[])
Start an external program.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
void zth_init()
Perform one-time global initialization of the Zth library.
void now(struct timespec &ts)
Returns the current timestamp.
void getContext(Worker **worker, Fiber **fiber) noexcept
int perf_init()
Initializes the per-thread perf event buffer.
void resume(Fiber &fiber)
int context_init() noexcept
One-time context mechanism initialization.
string err(int e)
Return a string like strerror() does, but as a zth::string.
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp()) noexcept
Put a syscall into the perf output.
UniqueID< Fiber > const & currentFiberID() noexcept
void context_deinit() noexcept
Final cleanup.
void perf_fiber(Fiber &f) noexcept
Write fiber ID/name to the perf buffer.
static bool const EnablePerfEvent
Enable (but not necessarily record) perf.
static int const Print_worker
static bool const SupportDebugPrint
Add support to enable debug output prints.
Opaque fiber handle type.
#define zth_assert(expr)
assert(), but better integrated in Zth.
#define likely(expr)
Marks the given expression to likely be evaluated to true.
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.