43 , m_disableContextSwitch()
51 zth_abort(
"Only one worker allowed per thread");
57 m_workerFiber.
setName(
"zth::Worker");
59 if((res = m_workerFiber.
init()))
80 while(!m_suspendedQueue.empty()) {
81 Fiber& f = m_suspendedQueue.front();
85 while(!m_runnableQueue.empty()) {
86 m_runnableQueue.front().kill();
87 cleanup(m_runnableQueue.front());
96 return m_currentFiber;
110 m_suspendedQueue.push_back(*
fiber);
113 m_runnableQueue.push_front(*
fiber);
130 m_disableContextSwitch--;
132 m_disableContextSwitch++;
142 return m_disableContextSwitch == 0;
148 m_suspendedQueue.erase(
fiber);
149 zth_dbg(worker,
"[%s] Removed %s from suspended queue",
id_str(),
152 if(&
fiber == m_currentFiber)
153 m_runnableQueue.rotate(*
fiber.listNext());
155 m_runnableQueue.erase(
fiber);
156 zth_dbg(worker,
"[%s] Removed %s from runnable queue",
id_str(),
169 zth_dbg(worker,
"[%s] Schedule to %s",
id_str(), preferFiber->id_str());
177 !preferFiber || preferFiber == &m_workerFiber
178 || m_runnableQueue.contains(*preferFiber));
183 preferFiber = &m_workerFiber;
186 Fiber* nextFiber = preferFiber;
187 bool didSchedule =
false;
190 if(
likely(!m_runnableQueue.empty()))
192 nextFiber = &m_runnableQueue.front();
195 nextFiber = &m_workerFiber;
203 Fiber* prevFiber = m_currentFiber;
204 m_currentFiber = nextFiber;
206 if(
unlikely(nextFiber != &m_workerFiber))
207 m_runnableQueue.rotate(*nextFiber->
listNext());
210 nextFiber->
run(
likely(prevFiber) ? *prevFiber : m_workerFiber, now);
212 m_currentFiber = prevFiber;
229 zth_abort(
"Unhandled Fiber::run() error: %s",
err(res).c_str());
244 zth_dbg(worker,
"[%s] Current fiber %s just died; switch to worker",
250 zth_abort(
"[Worker %p] Failed to switch to worker",
this);
255 m_runnableQueue.erase(
fiber);
265 switch(
fiber.state()) {
313 zth_dbg(worker,
"[%s] Run for %s",
id_str(), duration.str().c_str());
317 while(!m_runnableQueue.empty()
342 zth_abort(
"The worker fiber should not be executed.");
347 return m_currentFiber ==
nullptr || m_currentFiber == &m_workerFiber;
356 if(m_runnableQueue.empty())
359 for(
decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
360 it != m_runnableQueue.end(); ++it)
364 if(m_suspendedQueue.empty())
367 for(
decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
368 it != m_suspendedQueue.end(); ++it)
373 Fiber* m_currentFiber;
379 int m_disableContextSwitch;
392 return *Worker::instance();
417 *worker = ¤t_worker;
435ZTH_EXPORT
inline void
460 yield(
nullptr,
true);
478int startWorkerThread(
void (*f)(),
size_t stack = 0,
char const* name =
nullptr);
479int execlp(
char const* file,
char const* arg, ... );
480int execvp(
char const* file,
char*
const arg[]);
519 }
catch(std::bad_alloc
const&) {
586ZTH_EXPORT
int zth_execvp(
char const* file,
char*
const arg[]);
int init(Timestamp const &now=Timestamp::now())
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
int setStackSize(size_t size) noexcept
int run(Fiber &from, Timestamp now=Timestamp::now())
type * listPrev() const noexcept
type * listNext() const noexcept
Measure the load of some activity.
Singleton pattern, but only per-thread.
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Convenient wrapper around struct timespec that contains a time interval.
Convenient wrapper around struct timespec that contains an absolute timestamp.
static constexpr Timestamp null() noexcept
Keeps track of a process-wide unique ID within the type T.
virtual char const * id_str() const override
void setName(string const &name)
A single fiber per Worker that manages sleeping and blocked fibers.
The class that manages the fibers within this thread.
static void dummyWorkerEntry(void *)
void contextSwitchDisable() noexcept
friend void worker_global_init()
Load_type const & load() const noexcept
void run(TimeInterval const &duration=TimeInterval())
void release(Fiber &fiber) noexcept
Load_type & load() noexcept
Waiter & waiter() noexcept
Fiber * currentFiber() const noexcept
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
void contextSwitchEnable(bool enable=true) noexcept
void cleanup(Fiber &fiber)
void suspend(Fiber &fiber)
void resume(Fiber &fiber) noexcept
bool contextSwitchEnabled() const noexcept
bool isInWorkerContext() const noexcept
virtual ~Worker() override
void add(Fiber *fiber) noexcept
Worker & operator<<(Fiber *fiber) noexcept
Timestamp const & runEnd() const noexcept
void zth_outOfWork() noexcept
Force a context switch.
void zth_worker_destroy() noexcept
Cleanup the worker.
int zth_worker_create() noexcept
Create a Worker.
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
void zth_yield() noexcept
Allow a context switch.
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
void outOfWork()
Force a context switch.
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
int execvp(char const *file, char *const arg[])
Start an external program.
fiber_type< F >::factory fiber(F f, char const *name=nullptr)
Create a new fiber.
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
void zth_init()
Perform one-time global initialization of the Zth library.
void getContext(Worker **worker, Fiber **fiber) noexcept
void resume(Fiber &fiber)
int context_init() noexcept
One-time context mechanism initialization.
string err(int e)
Return a string like strerror() does, but as a zth::string.
UniqueID< Fiber > const & currentFiberID() noexcept
void context_deinit() noexcept
Final cleanup.
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
static int const Print_worker
static bool const SupportDebugPrint
Add support to enable debug output prints.
#define zth_assert(expr)
assert(), but better integrated in Zth.
#define likely(expr)
Marks the given expression to likely be evaluated to true.
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.