46 , m_disableContextSwitch()
54 zth_abort(
"Only one worker allowed per thread");
59 m_workerFiber.
setName(
"zth::Worker");
61 if((res = m_workerFiber.
init()))
82 while(!m_suspendedQueue.empty()) {
83 Fiber& f = m_suspendedQueue.front();
87 while(!m_runnableQueue.empty()) {
88 m_runnableQueue.front().kill();
89 cleanup(m_runnableQueue.front());
98 return m_currentFiber;
112 m_suspendedQueue.push_back(*
fiber);
115 m_runnableQueue.push_front(*
fiber);
132 m_disableContextSwitch--;
134 m_disableContextSwitch++;
144 return m_disableContextSwitch == 0;
150 m_suspendedQueue.erase(
fiber);
151 zth_dbg(worker,
"[%s] Removed %s from suspended queue",
id_str(),
154 if(&
fiber == m_currentFiber)
155 m_runnableQueue.rotate(*
fiber.listNext());
157 m_runnableQueue.erase(
fiber);
158 zth_dbg(worker,
"[%s] Removed %s from runnable queue",
id_str(),
171 zth_dbg(worker,
"[%s] Schedule to %s",
id_str(), preferFiber->id_str());
179 !preferFiber || preferFiber == &m_workerFiber
180 || m_runnableQueue.contains(*preferFiber));
185 preferFiber = &m_workerFiber;
188 Fiber* nextFiber = preferFiber;
189 bool didSchedule =
false;
192 if(
likely(!m_runnableQueue.empty()))
194 nextFiber = &m_runnableQueue.front();
197 nextFiber = &m_workerFiber;
205 Fiber* prevFiber = m_currentFiber;
206 m_currentFiber = nextFiber;
208 if(
unlikely(nextFiber != &m_workerFiber))
209 m_runnableQueue.rotate(*nextFiber->
listNext());
212 nextFiber->
run(
likely(prevFiber) ? *prevFiber : m_workerFiber, now);
214 m_currentFiber = prevFiber;
231 zth_abort(
"Unhandled Fiber::run() error: %s",
err(res).c_str());
246 zth_dbg(worker,
"[%s] Current fiber %s just died; switch to worker",
252 zth_abort(
"[Worker %p] Failed to switch to worker",
this);
257 m_runnableQueue.erase(
fiber);
267 switch(
fiber.state()) {
312 zth_dbg(worker,
"[%s] Run for %s",
id_str(), duration.str().c_str());
316 while(!m_runnableQueue.empty()
341 zth_abort(
"The worker fiber should not be executed.");
346 return m_currentFiber ==
nullptr || m_currentFiber == &m_workerFiber;
355 if(m_runnableQueue.empty())
358 for(decltype(m_runnableQueue.begin()) it = m_runnableQueue.begin();
359 it != m_runnableQueue.end(); ++it)
363 if(m_suspendedQueue.empty())
366 for(decltype(m_suspendedQueue.begin()) it = m_suspendedQueue.begin();
367 it != m_suspendedQueue.end(); ++it)
372 Fiber* m_currentFiber;
378 int m_disableContextSwitch;
416 *worker = ¤t_worker;
434 ZTH_EXPORT
inline void
435 yield(
Fiber* preferFiber =
nullptr,
bool alwaysYield =
false,
459 yield(
nullptr,
true);
477 int startWorkerThread(
void (*f)(),
size_t stack = 0,
char const* name =
nullptr);
478 int execlp(
char const* file,
char const* arg, ... );
479 int execvp(
char const* file,
char*
const arg[]);
518 }
catch(std::bad_alloc
const&) {
585 ZTH_EXPORT
int zth_execvp(
char const* file,
char*
const arg[]);
int init(Timestamp const &now=Timestamp::now())
bool allowYield(Timestamp const &now=Timestamp::now()) const noexcept
int setStackSize(size_t size) noexcept
int run(Fiber &from, Timestamp now=Timestamp::now())
type * listNext() const noexcept
type * listPrev() const noexcept
Measure the load of some activity.
Singleton pattern, but only per-thread.
static safe_ptr< singleton_type >::type instance() noexcept
Return the only instance of T within this thread.
Convenient wrapper around struct timespec that contains a time interval.
Convenient wrapper around struct timespec that contains an absolute timestamp.
static constexpr Timestamp null() noexcept
Keeps track of a process-wide unique ID within the type T.
virtual char const * id_str() const override
void setName(string const &name)
A single fiber per Worker that manages sleeping and blocked fibers.
The class that manages the fibers within this thread.
static void dummyWorkerEntry(void *)
void contextSwitchDisable() noexcept
friend void worker_global_init()
Load_type & load() noexcept
void run(TimeInterval const &duration=TimeInterval())
void release(Fiber &fiber) noexcept
Waiter & waiter() noexcept
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
void contextSwitchEnable(bool enable=true) noexcept
void cleanup(Fiber &fiber)
void suspend(Fiber &fiber)
void resume(Fiber &fiber) noexcept
bool contextSwitchEnabled() const noexcept
bool isInWorkerContext() const noexcept
Fiber * currentFiber() const noexcept
virtual ~Worker() override
void add(Fiber *fiber) noexcept
Timestamp const & runEnd() const noexcept
Load_type const & load() const noexcept
Worker & operator<<(Fiber *fiber) noexcept
void zth_outOfWork() noexcept
Force a context switch.
void zth_worker_destroy() noexcept
Cleanup the worker.
int zth_worker_create() noexcept
Create a Worker.
int zth_execvp(char const *file, char *const arg[]) noexcept
Start an external program.
int zth_startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr) noexcept
Start a new thread, create a Worker, with one fiber, which executes f.
void zth_worker_run(struct timespec const *ts=nullptr) noexcept
Run the worker for the given amount of time.
void zth_yield() noexcept
Allow a context switch.
void zth_abort(char const *fmt,...)
Aborts the process after printing the given printf() formatted message.
void outOfWork()
Force a context switch.
int startWorkerThread(void(*f)(), size_t stack=0, char const *name=nullptr)
Start a new thread, create a Worker, with one fiber, which executes f.
int execlp(char const *file, char const *arg,...)
Start an external program.
Worker & currentWorker() noexcept
Return the (thread-local) singleton Worker instance.
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Fiber & currentFiber() noexcept
Return the currently executing fiber.
fiber_type< F >::factory fiber(F f, char const *name=nullptr)
Create a new fiber.
int execvp(char const *file, char *const arg[])
Start an external program.
#define zth_perf_event(...)
Construct a zth::PerfEvent with provided parameters, and forward it to the perf buffer for later proc...
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
void zth_init()
Perform one-time global initialization of the Zth library.
void getContext(Worker **worker, Fiber **fiber) noexcept
void resume(Fiber &fiber)
int context_init() noexcept
One-time context mechanism initialization.
string err(int e)
Return a string like strerror() does, but as a zth::string.
UniqueID< Fiber > const & currentFiberID() noexcept
void context_deinit() noexcept
Final cleanup.
void perf_syscall(char const *syscall, Timestamp const &t=Timestamp())
Put a syscall into the perf output.
static int const Print_worker
static bool const SupportDebugPrint
Add support to enable debug output prints.
#define zth_assert(expr)
assert(), but better integrated in Zth.
#define likely(expr)
Marks the given expression to likely be evaluated to true.
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.