Zth (libzth)
sync.h
Go to the documentation of this file.
1 #ifndef ZTH_SYNC_H
2 #define ZTH_SYNC_H
3 /*
4  * Zth (libzth), a cooperative userspace multitasking library.
5  * Copyright (C) 2019-2022 Jochem Rutgers
6  *
7  * This Source Code Form is subject to the terms of the Mozilla Public
8  * License, v. 2.0. If a copy of the MPL was not distributed with this
9  * file, You can obtain one at https://mozilla.org/MPL/2.0/.
10  */
11 
21 #ifdef __cplusplus
22 
23 # include <libzth/allocator.h>
24 # include <libzth/fiber.h>
25 # include <libzth/list.h>
26 # include <libzth/util.h>
27 # include <libzth/worker.h>
28 
29 # include <new>
30 
31 # ifdef ZTH_USE_VALGRIND
32 # include <valgrind/memcheck.h>
33 # endif
34 
35 namespace zth {
36 
37 class RefCounted {
39 public:
40  RefCounted() noexcept
41  : m_count()
42  {}
43 
45 
46  void used() noexcept
47  {
48  zth_assert(m_count < std::numeric_limits<size_t>::max());
49  m_count++;
50  }
51 
52  void unused()
53  {
54  zth_assert(m_count > 0);
55  if(--m_count == 0)
56  delete this;
57  }
58 
59 private:
60  size_t m_count;
61 };
62 
63 template <typename T>
65 public:
66  constexpr14 explicit SharedPointer(T* object = nullptr) noexcept
67  : m_object(object)
68  {
69  if(m_object)
70  m_object->used();
71  }
72 
74  : m_object(p.get())
75  {
76  if(m_object)
77  m_object->used();
78  }
79 
80  virtual ~SharedPointer()
81  {
82  reset();
83  }
84 
85  void reset(T* object = nullptr)
86  {
87  if(object)
88  object->used();
89  if(m_object)
90  m_object->unused();
91  m_object = object;
92  }
93 
95  {
96  reset(object);
97  return *this;
98  }
99 
101  {
102  reset(p.get());
103  return *this;
104  }
105 
106 # if __cplusplus >= 201103L
108  : m_object()
109  {
110  *this = std::move(p);
111  }
112 
114  {
115  m_object = p.release();
116  return *this;
117  }
118 # endif
119 
120  constexpr T* get() const noexcept
121  {
122  return m_object;
123  }
124 
125  constexpr operator T*() const noexcept
126  {
127  return get();
128  }
129 
130  constexpr14 T* operator*() const noexcept
131  {
132  zth_assert(get());
133  return get();
134  }
135 
136  constexpr14 T* operator->() const noexcept
137  {
138  zth_assert(get());
139  return get();
140  }
141 
142  constexpr14 T* release() noexcept
143  {
144  T* object = get();
145  m_object = nullptr;
146  return object;
147  }
148 
149 private:
150  T* m_object;
151 };
152 
154  : public RefCounted
155  , public UniqueID<Synchronizer> {
157 public:
158  explicit Synchronizer(cow_string const& name = "Synchronizer")
159  : RefCounted()
160  , UniqueID(Config::NamedSynchronizer ? name.str() : string())
161  {}
162 
163 # if __cplusplus >= 201103L
165  : RefCounted()
166  , UniqueID(Config::NamedSynchronizer ? std::move(name).str() : string())
167  {}
168 # endif
169 
170  virtual ~Synchronizer() override
171  {
172  zth_dbg(sync, "[%s] Destruct", id_str());
173  zth_assert(m_queue.empty());
174  }
175 
176 protected:
177  void block()
178  {
179  Worker* w = nullptr;
180  Fiber* f = nullptr;
181  getContext(&w, &f);
182 
183  zth_dbg(sync, "[%s] Block %s", id_str(), f->id_str());
184  w->release(*f);
185  m_queue.push_back(*f);
186  f->nap(Timestamp::null());
187  w->schedule();
188  }
189 
194  bool unblock(Fiber& f) noexcept
195  {
196  if(!m_queue.contains(f))
197  return false;
198 
199  Worker* w = nullptr;
200  getContext(&w, nullptr);
201 
202  zth_dbg(sync, "[%s] Unblock %s", id_str(), f.id_str());
203  m_queue.erase(f);
204  f.wakeup();
205  w->add(&f);
206  return true;
207  }
208 
209  bool unblockFirst() noexcept
210  {
211  if(m_queue.empty())
212  return false;
213 
214  Worker* w = nullptr;
215  getContext(&w, nullptr);
216 
217  Fiber& f = m_queue.front();
218  zth_dbg(sync, "[%s] Unblock %s", id_str(), f.id_str());
219  m_queue.pop_front();
220  f.wakeup();
221  w->add(&f);
222  return true;
223  }
224 
225  bool unblockAll() noexcept
226  {
227  if(m_queue.empty())
228  return false;
229 
230  Worker* w = nullptr;
231  getContext(&w, nullptr);
232 
233  zth_dbg(sync, "[%s] Unblock all", id_str());
234 
235  while(!m_queue.empty()) {
236  Fiber& f = m_queue.front();
237  m_queue.pop_front();
238  f.wakeup();
239  w->add(&f);
240  }
241  return true;
242  }
243 
244  class AlarmClock : public TimedWaitable {
246  public:
249  Synchronizer& synchronizer, Fiber& fiber, Timestamp const& timeout) noexcept
250  : base(timeout)
251  , m_synchronizer(synchronizer)
252  , m_fiber(fiber)
253  , m_rang()
254  {}
255 
256  virtual ~AlarmClock() override is_default
257 
258  virtual bool poll(Timestamp const& now = Timestamp::now()) noexcept override
259  {
260  if(!base::poll(now))
261  return false;
262 
263  zth_dbg(sync, "[%s] %s timed out", m_synchronizer.id_str(),
264  m_fiber.id_str());
265  m_rang = m_synchronizer.unblock(m_fiber);
266  return true;
267  }
268 
269  bool rang() const noexcept
270  {
271  return m_rang;
272  }
273 
274  private:
275  Synchronizer& m_synchronizer;
276  Fiber& m_fiber;
277  bool m_rang;
278  };
279 
280  friend class AlarmClock;
281 
286  bool block(Timestamp const& timeout, Timestamp const& now = Timestamp::now())
287  {
288  if(timeout <= now)
289  // Immediate timeout.
290  return true;
291 
292  return block_(timeout, now);
293  }
294 
299  bool block(TimeInterval const& timeout, Timestamp const& now = Timestamp::now())
300  {
301  if(timeout.isNegative() || timeout.isNull())
302  // Immediate timeout.
303  return true;
304 
305  return block_(now + timeout, now);
306  }
307 
308 private:
313  bool block_(Timestamp const& timeout, Timestamp const& now)
314  {
315  Worker* w;
316  Fiber* f;
317  getContext(&w, &f);
318 
319  zth_dbg(sync, "[%s] Block %s with timeout", id_str(), f->id_str());
320  w->release(*f);
321  m_queue.push_back(*f);
322  f->nap(Timestamp::null());
323 
324  AlarmClock a(*this, *f, timeout);
325  w->waiter().scheduleTask(a);
326  w->schedule(nullptr, now);
327 
328  w->waiter().unscheduleTask(a);
329  return !a.rang();
330  }
331 
332 private:
333  List<Fiber> m_queue;
334 };
335 
340 class Mutex : public Synchronizer {
342 public:
343  explicit Mutex(cow_string const& name = "Mutex")
344  : Synchronizer(name)
345  , m_locked()
346  {}
347 
348 # if __cplusplus >= 201103L
349  explicit Mutex(cow_string&& name)
350  : Synchronizer(std::move(name))
351  , m_locked()
352  {}
353 # endif
354 
355  virtual ~Mutex() override is_default
356 
357  void lock()
358  {
359  while(unlikely(m_locked))
360  block();
361  m_locked = true;
362  zth_dbg(sync, "[%s] Locked", id_str());
363  }
364 
365  bool trylock() noexcept
366  {
367  if(m_locked)
368  return false;
369  m_locked = true;
370  zth_dbg(sync, "[%s] Locked", id_str());
371  return true;
372  }
373 
374  void unlock() noexcept
375  {
376  zth_assert(m_locked);
377  zth_dbg(sync, "[%s] Unlocked", id_str());
378  m_locked = false;
379  unblockFirst();
380  }
381 
382 private:
383  bool m_locked;
384 };
385 
390 class Semaphore : public Synchronizer {
392 public:
393  explicit Semaphore(size_t init = 0, cow_string const& name = "Semaphore")
394  : Synchronizer(name)
395  , m_count(init)
396  {}
397 
398 # if __cplusplus >= 201103L
399  Semaphore(size_t init, cow_string&& name)
400  : Synchronizer(std::move(name))
401  , m_count(init)
402  {}
403 # endif
404 
405  virtual ~Semaphore() override is_default
406 
407  void acquire(size_t count = 1)
408  {
409  size_t remaining = count;
410  while(remaining > 0) {
411  if(remaining <= m_count) {
412  m_count -= remaining;
413  if(m_count > 0)
414  // There might be another one waiting.
415  unblockFirst();
416  return;
417  } else {
418  remaining -= m_count;
419  m_count = 0;
420  block();
421  }
422  }
423  zth_dbg(sync, "[%s] Acquired %zu", id_str(), count);
424  }
425 
426  void release(size_t count = 1) noexcept
427  {
428  zth_assert(m_count + count >= m_count); // ...otherwise it wrapped around, which is
429  // probably not want you wanted...
430 
431  if(unlikely(m_count + count < m_count))
432  // wrapped around, saturate
433  m_count = std::numeric_limits<size_t>::max();
434  else
435  m_count += count;
436 
437  zth_dbg(sync, "[%s] Released %zu", id_str(), count);
438 
439  if(likely(m_count > 0))
440  unblockFirst();
441  }
442 
443  size_t value() const noexcept
444  {
445  return m_count;
446  }
447 
448 private:
449  size_t m_count;
450 };
451 
456 class Signal : public Synchronizer {
458 public:
459  explicit Signal(cow_string const& name = "Signal")
460  : Synchronizer(name)
461  , m_signalled()
462  {}
463 
464 # if __cplusplus >= 201103L
465  explicit Signal(cow_string&& name)
466  : Synchronizer(std::move(name))
467  , m_signalled()
468  {}
469 # endif
470 
471  virtual ~Signal() override is_default
472 
473  void wait()
474  {
475  if(!m_signalled) {
476  block();
477  } else {
478  // Do a yield() here, as one might rely on the signal to block
479  // regularly when the signal is used in a loop (see daemon
480  // pattern).
481  yield();
482  }
483 
484  if(m_signalled > 0)
485  m_signalled--;
486  }
487 
488  bool wait(Timestamp const& timeout, Timestamp const& now = Timestamp::now())
489  {
490  if(!m_signalled) {
491  if(!block(timeout, now))
492  // Timeout.
493  return false;
494  } else
495  yield();
496 
497  if(m_signalled > 0)
498  m_signalled--;
499 
500  // Got signalled.
501  return true;
502  }
503 
504  bool wait(TimeInterval const& timeout, Timestamp const& now = Timestamp::now())
505  {
506  return wait(now + timeout);
507  }
508 
509  void signal(bool queue = true, bool queueEveryTime = false) noexcept
510  {
511  zth_dbg(sync, "[%s] Signal", id_str());
512  if(!unblockFirst() && queue && m_signalled >= 0) {
513  if(m_signalled == 0 || queueEveryTime)
514  m_signalled++;
515  zth_assert(m_signalled > 0); // Otherwise, it wrapped around, which is
516  // probably not what you want.
517  }
518  }
519 
520  void signalAll(bool queue = true) noexcept
521  {
522  zth_dbg(sync, "[%s] Signal all", id_str());
523  unblockAll();
524  if(queue)
525  m_signalled = -1;
526  }
527 
528  void reset() noexcept
529  {
530  m_signalled = 0;
531  }
532 
533 private:
534  int m_signalled;
535 };
536 
541 template <typename T = void>
542 class Future : public Synchronizer {
544 public:
545  typedef T type;
546 
547  // cppcheck-suppress uninitMemberVar
548  explicit Future(cow_string const& name = "Future")
549  : Synchronizer(name)
550  , m_valid()
551  {
552 # ifdef ZTH_USE_VALGRIND
553  VALGRIND_MAKE_MEM_NOACCESS(m_data, sizeof(m_data));
554 # endif
555  }
556 
557 # if __cplusplus >= 201103L
558  // cppcheck-suppress uninitMemberVar
559  explicit Future(cow_string&& name)
560  : Synchronizer(std::move(name))
561  , m_valid()
562  {
563 # ifdef ZTH_USE_VALGRIND
564  VALGRIND_MAKE_MEM_NOACCESS(m_data, sizeof(m_data));
565 # endif
566  }
567 # endif
568 
569  virtual ~Future() override
570  {
571  if(valid())
572  value().~type();
573 # ifdef ZTH_USE_VALGRIND
574  VALGRIND_MAKE_MEM_UNDEFINED(m_data, sizeof(m_data));
575 # endif
576  }
577 
578  bool valid() const noexcept
579  {
580  return m_valid;
581  }
582 
583  operator bool() const noexcept
584  {
585  return valid();
586  }
587 
588  void wait()
589  {
590  if(!valid())
591  block();
592  }
593 
594  void set()
595  {
596  if(!set_prepare())
597  return;
598 
599  new(m_data) type();
600  set_finalize();
601  }
602 
603  void set(type const& value)
604  {
605  if(!set_prepare())
606  return;
607 
608  new(m_data) type(value);
609  set_finalize();
610  }
611 
613  {
614  set(value);
615  return *this;
616  }
617 
618 # if __cplusplus >= 201103L
619  void set(type&& value)
620  {
621  if(!set_prepare())
622  return;
623 
624  new(m_data) type(std::move(value));
625  set_finalize();
626  }
627 
629  {
630  set(std::move(value));
631  return *this;
632  }
633 # endif
634 
636  {
637  wait();
638  void* p = m_data;
639  return *static_cast<type*>(p);
640  }
641 
642  type const& value() const LREF_QUALIFIED
643  {
644  wait();
645  void const* p = m_data;
646  return *static_cast<type const*>(p);
647  }
648 
649 # if __cplusplus >= 201103L
650  type value() &&
651  {
652  wait();
653  void* p = m_data;
654  return std::move(*static_cast<type*>(p));
655  }
656 # endif
657 
658  type const* operator*() const
659  {
660  return &value();
661  }
662 
664  {
665  return &value();
666  }
667 
668  type const* operator->() const
669  {
670  return &value();
671  }
672 
674  {
675  return &value();
676  }
677 
678 private:
679  bool set_prepare() noexcept
680  {
681  zth_assert(!valid());
682  if(valid())
683  return false;
684 # ifdef ZTH_USE_VALGRIND
685  VALGRIND_MAKE_MEM_UNDEFINED(m_data, sizeof(m_data));
686 # endif
687  return true;
688  }
689 
690  void set_finalize() noexcept
691  {
692  m_valid = true;
693  zth_dbg(sync, "[%s] Set", id_str());
694  unblockAll();
695  }
696 
697 private:
698  alignas(type) char m_data[sizeof(type)];
699  bool m_valid;
700 };
701 
702 template <>
703 // cppcheck-suppress noConstructor
704 class Future<void> : public Synchronizer {
706 public:
707  typedef void type;
708 
709  explicit Future(cow_string const& name = "Future")
710  : Synchronizer(name)
711  , m_valid()
712  {}
713 
714 # if __cplusplus >= 201103L
715  explicit Future(cow_string&& name)
716  : Synchronizer(std::move(name))
717  , m_valid()
718  {}
719 # endif
720 
721  virtual ~Future() override is_default
722 
723  bool valid() const noexcept
724  {
725  return m_valid;
726  }
727  operator bool() const noexcept
728  {
729  return valid();
730  }
731 
732  void wait()
733  {
734  if(!valid())
735  block();
736  }
737 
738  void set() noexcept
739  {
740  zth_assert(!valid());
741  if(valid())
742  return;
743 
744  m_valid = true;
745  zth_dbg(sync, "[%s] Set", id_str());
746  unblockAll();
747  }
748 
749 private:
750  bool m_valid;
751 };
752 
757 class Gate : public Synchronizer {
759 public:
760  explicit Gate(size_t count, cow_string const& name = "Gate")
761  : Synchronizer(name)
762  , m_count(count)
763  , m_current()
764  {}
765 
766 # if __cplusplus >= 201103L
768  : Synchronizer(std::move(name))
769  , m_count(count)
770  , m_current()
771  {}
772 # endif
773 
774  virtual ~Gate() override is_default
775 
776  bool pass() noexcept
777  {
778  zth_dbg(sync, "[%s] Pass", id_str());
779  if(++m_current >= count()) {
780  m_current -= count();
781  unblockAll();
782  return true;
783  } else {
784  return false;
785  }
786  }
787 
788  void wait()
789  {
790  if(!pass())
791  block();
792  }
793 
794  size_t count() const noexcept
795  {
796  return m_count;
797  }
798  size_t current() const noexcept
799  {
800  return m_current;
801  }
802 
803 private:
804  size_t const m_count;
805  size_t m_current;
806 };
807 
808 } // namespace zth
809 
810 struct zth_mutex_t {
811  void* p;
812 };
813 
819 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_mutex_init(zth_mutex_t* mutex) noexcept
820 {
821  if(unlikely(!mutex || mutex->p))
822  return EINVAL;
823 
824  try {
825  mutex->p = (void*)new zth::Mutex();
826  return 0;
827  } catch(std::bad_alloc const&) {
828  return ENOMEM;
829  } catch(...) {
830  }
831  return EAGAIN;
832 }
833 
839 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_mutex_destroy(zth_mutex_t* mutex) noexcept
840 {
841  if(unlikely(!mutex))
842  return EINVAL;
843  if(unlikely(!mutex->p))
844  // Already destroyed.
845  return 0;
846 
847  delete static_cast<zth::Mutex*>(mutex->p);
848  mutex->p = nullptr;
849  return 0;
850 }
851 
857 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_mutex_lock(zth_mutex_t* mutex) noexcept
858 {
859  if(unlikely(!mutex || !mutex->p))
860  return EINVAL;
861 
862  static_cast<zth::Mutex*>(mutex->p)->lock();
863  return 0;
864 }
865 
871 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_mutex_trylock(zth_mutex_t* mutex) noexcept
872 {
873  if(unlikely(!mutex || !mutex->p))
874  return EINVAL;
875 
876  return static_cast<zth::Mutex*>(mutex->p)->trylock() ? 0 : EBUSY;
877 }
878 
884 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_mutex_unlock(zth_mutex_t* mutex) noexcept
885 {
886  if(unlikely(!mutex || !mutex->p))
887  return EINVAL;
888 
889  static_cast<zth::Mutex*>(mutex->p)->unlock();
890  return 0;
891 }
892 
893 struct zth_sem_t {
894  void* p;
895 };
896 
902 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_sem_init(zth_sem_t* sem, size_t value) noexcept
903 {
904  if(unlikely(!sem || sem->p))
905  return EINVAL;
906 
907  try {
908  sem->p = (void*)new zth::Semaphore(value);
909  return 0;
910  } catch(std::bad_alloc const&) {
911  return ENOMEM;
912  } catch(...) {
913  }
914  return EAGAIN;
915 }
916 
922 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_sem_destroy(zth_sem_t* sem) noexcept
923 {
924  if(unlikely(!sem))
925  return EINVAL;
926  if(unlikely(!sem->p))
927  // Already destroyed.
928  return 0;
929 
930  delete static_cast<zth::Semaphore*>(sem->p);
931  sem->p = nullptr;
932  return 0;
933 }
934 
940 EXTERN_C ZTH_EXPORT ZTH_INLINE int
941 zth_sem_getvalue(zth_sem_t* __restrict__ sem, size_t* __restrict__ value) noexcept
942 {
943  if(unlikely(!sem || !sem->p || !value))
944  return EINVAL;
945 
946  *value = static_cast<zth::Semaphore*>(sem->p)->value();
947  return 0;
948 }
949 
950 # ifndef EOVERFLOW
951 # define EOVERFLOW EAGAIN
952 # endif
953 
959 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_sem_post(zth_sem_t* sem) noexcept
960 {
961  if(unlikely(!sem || !sem->p))
962  return EINVAL;
963 
964  zth::Semaphore* s = static_cast<zth::Semaphore*>(sem->p);
965  if(unlikely(s->value() == std::numeric_limits<size_t>::max()))
966  return EOVERFLOW;
967 
968  s->release();
969  return 0;
970 }
971 
977 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_sem_wait(zth_sem_t* sem) noexcept
978 {
979  if(unlikely(!sem || !sem->p))
980  return EINVAL;
981 
982  static_cast<zth::Semaphore*>(sem->p)->acquire();
983  return 0;
984 }
985 
991 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_sem_trywait(zth_sem_t* sem) noexcept
992 {
993  if(unlikely(!sem || !sem->p))
994  return EINVAL;
995 
996  zth::Semaphore* s = static_cast<zth::Semaphore*>(sem->p);
997  if(unlikely(s->value() == 0))
998  return EAGAIN;
999 
1000  s->acquire();
1001  return 0;
1002 }
1003 
1004 struct zth_cond_t {
1005  void* p;
1006 };
1007 
1013 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_cond_init(zth_cond_t* cond) noexcept
1014 {
1015  if(unlikely(!cond || cond->p))
1016  return EINVAL;
1017 
1018  try {
1019  cond->p = (void*)new zth::Signal();
1020  return 0;
1021  } catch(std::bad_alloc const&) {
1022  return ENOMEM;
1023  } catch(...) {
1024  }
1025  return EAGAIN;
1026 }
1027 
1033 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_cond_destroy(zth_cond_t* cond) noexcept
1034 {
1035  if(unlikely(!cond))
1036  return EINVAL;
1037  if(unlikely(!cond->p))
1038  // Already destroyed.
1039  return 0;
1040 
1041  delete static_cast<zth::Signal*>(cond->p);
1042  cond->p = nullptr;
1043  return 0;
1044 }
1045 
1051 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_cond_signal(zth_cond_t* cond) noexcept
1052 {
1053  if(unlikely(!cond || !cond->p))
1054  return EINVAL;
1055 
1056  static_cast<zth::Signal*>(cond->p)->signal();
1057  return 0;
1058 }
1059 
1065 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_cond_broadcast(zth_cond_t* cond) noexcept
1066 {
1067  if(unlikely(!cond || !cond->p))
1068  return EINVAL;
1069 
1070  static_cast<zth::Signal*>(cond->p)->signalAll();
1071  return 0;
1072 }
1073 
1079 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_cond_wait(zth_cond_t* cond) noexcept
1080 {
1081  if(unlikely(!cond || !cond->p))
1082  return EINVAL;
1083 
1084  static_cast<zth::Signal*>(cond->p)->wait();
1085  return 0;
1086 }
1087 
1089  void* p;
1090 };
1091 
1093 
1099 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_future_init(zth_future_t* future) noexcept
1100 {
1101  if(unlikely(!future || future->p))
1102  return EINVAL;
1103 
1104  try {
1105  future->p = (void*)new zth_future_t_type();
1106  return 0;
1107  } catch(std::bad_alloc const&) {
1108  return ENOMEM;
1109  } catch(...) {
1110  }
1111  return EAGAIN;
1112 }
1113 
1119 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_future_destroy(zth_future_t* future) noexcept
1120 {
1121  if(unlikely(!future))
1122  return EINVAL;
1123  if(unlikely(!future->p))
1124  // Already destroyed.
1125  return 0;
1126 
1127  delete static_cast<zth_future_t_type*>(future->p);
1128  future->p = nullptr;
1129  return 0;
1130 }
1131 
1137 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_future_valid(zth_future_t* future) noexcept
1138 {
1139  if(unlikely(!future || !future->p))
1140  return EINVAL;
1141 
1142  return static_cast<zth_future_t_type*>(future->p)->valid() ? 0 : EAGAIN;
1143 }
1144 
1150 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_future_set(zth_future_t* future, uintptr_t value) noexcept
1151 {
1152  if(unlikely(!future || !future->p))
1153  return EINVAL;
1154 
1155  zth_future_t_type* f = static_cast<zth_future_t_type*>(future->p);
1156  if(f->valid())
1157  return EAGAIN;
1158 
1159  f->set(value);
1160  return 0;
1161 }
1162 
1168 EXTERN_C ZTH_EXPORT ZTH_INLINE int
1169 zth_future_get(zth_future_t* __restrict__ future, uintptr_t* __restrict__ value) noexcept
1170 {
1171  if(unlikely(!future || !future->p || !value))
1172  return EINVAL;
1173 
1174  *value = static_cast<zth_future_t_type*>(future->p)->value();
1175  return 0;
1176 }
1177 
1183 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_future_wait(zth_future_t* future) noexcept
1184 {
1185  if(unlikely(!future || !future->p))
1186  return EINVAL;
1187 
1188  static_cast<zth_future_t_type*>(future->p)->wait();
1189  return 0;
1190 }
1191 
1192 struct zth_gate_t {
1193  void* p;
1194 };
1195 
1205 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_gate_init(zth_gate_t* gate, size_t count) noexcept
1206 {
1207  if(unlikely(!gate || gate->p))
1208  return EINVAL;
1209 
1210  try {
1211  gate->p = (void*)new zth::Gate(count);
1212  return 0;
1213  } catch(std::bad_alloc const&) {
1214  return ENOMEM;
1215  } catch(...) {
1216  }
1217  return EAGAIN;
1218 }
1219 
1225 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_gate_destroy(zth_gate_t* gate) noexcept
1226 {
1227  if(unlikely(!gate))
1228  return EINVAL;
1229  if(unlikely(!gate->p))
1230  // Already destroyed.
1231  return 0;
1232 
1233  delete static_cast<zth::Gate*>(gate->p);
1234  gate->p = nullptr;
1235  return 0;
1236 }
1237 
1243 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_gate_pass(zth_gate_t* gate) noexcept
1244 {
1245  if(unlikely(!gate || !gate->p))
1246  return EINVAL;
1247 
1248  return static_cast<zth::Gate*>(gate->p)->pass() ? 0 : EBUSY;
1249 }
1250 
1256 EXTERN_C ZTH_EXPORT ZTH_INLINE int zth_gate_wait(zth_gate_t* gate) noexcept
1257 {
1258  if(unlikely(!gate || !gate->p))
1259  return EINVAL;
1260 
1261  static_cast<zth::Gate*>(gate->p)->wait();
1262  return 0;
1263 }
1264 
1265 #else // !__cplusplus
1266 
1267 # include <stdint.h>
1268 
1269 typedef struct {
1270  void* p;
1271 } zth_mutex_t;
1272 
1273 ZTH_EXPORT int zth_mutex_init(zth_mutex_t* mutex);
1274 ZTH_EXPORT int zth_mutex_destroy(zth_mutex_t* mutex);
1275 ZTH_EXPORT int zth_mutex_lock(zth_mutex_t* mutex);
1276 ZTH_EXPORT int zth_mutex_trylock(zth_mutex_t* mutex);
1277 ZTH_EXPORT int zth_mutex_unlock(zth_mutex_t* mutex);
1278 
1279 typedef struct {
1280  void* p;
1281 } zth_sem_t;
1282 
1283 ZTH_EXPORT int zth_sem_init(zth_sem_t* sem, size_t value);
1284 ZTH_EXPORT int zth_sem_destroy(zth_sem_t* sem);
1285 ZTH_EXPORT int zth_sem_getvalue(zth_sem_t* __restrict__ sem, size_t* __restrict__ value);
1286 ZTH_EXPORT int zth_sem_post(zth_sem_t* sem);
1287 ZTH_EXPORT int zth_sem_wait(zth_sem_t* sem);
1288 ZTH_EXPORT int zth_sem_trywait(zth_sem_t* sem);
1289 
1290 typedef struct {
1291  void* p;
1292 } zth_cond_t;
1293 
1294 ZTH_EXPORT int zth_cond_init(zth_cond_t* cond);
1295 ZTH_EXPORT int zth_cond_destroy(zth_cond_t* cond);
1296 ZTH_EXPORT int zth_cond_signal(zth_cond_t* cond);
1297 ZTH_EXPORT int zth_cond_broadcast(zth_cond_t* cond);
1298 ZTH_EXPORT int zth_cond_wait(zth_cond_t* cond);
1299 
1300 typedef struct {
1301  void* p;
1302 } zth_future_t;
1303 
1304 ZTH_EXPORT int zth_future_init(zth_future_t* future);
1305 ZTH_EXPORT int zth_future_destroy(zth_future_t* future);
1306 ZTH_EXPORT int zth_future_valid(zth_future_t* future);
1307 ZTH_EXPORT int zth_future_set(zth_future_t* future, uintptr_t value);
1308 ZTH_EXPORT int zth_future_get(zth_future_t* __restrict__ future, uintptr_t* __restrict__ value);
1309 ZTH_EXPORT int zth_future_wait(zth_future_t* future);
1310 
1311 typedef struct {
1312  void* p;
1313 } zth_gate_t;
1314 
1315 ZTH_EXPORT int zth_gate_init(zth_gate_t* gate, size_t count);
1316 ZTH_EXPORT int zth_gate_destroy(zth_gate_t* gate);
1317 ZTH_EXPORT int zth_gate_pass(zth_gate_t* gate);
1318 ZTH_EXPORT int zth_gate_wait(zth_gate_t* gate);
1319 
1320 #endif // !__cplusplus
1321 #endif // ZTH_SYNC_H
The fiber.
Definition: fiber.h:52
void wakeup() noexcept
Definition: fiber.h:310
void nap(Timestamp const &sleepUntil=Timestamp::null()) noexcept
Definition: fiber.h:280
Future(cow_string const &name="Future")
Definition: sync.h:709
Future(cow_string &&name)
Definition: sync.h:715
void set() noexcept
Definition: sync.h:738
virtual ~Future() override=default
Fiber-aware future.
Definition: sync.h:542
type * operator*()
Definition: sync.h:663
type * operator->()
Definition: sync.h:673
void set()
Definition: sync.h:594
void set(type &&value)
Definition: sync.h:619
type value() &&
Definition: sync.h:650
void set(type const &value)
Definition: sync.h:603
Future & operator=(type &&value)
Definition: sync.h:628
type & value() &
Definition: sync.h:635
T type
Definition: sync.h:545
virtual ~Future() override
Definition: sync.h:569
type const * operator->() const
Definition: sync.h:668
Future & operator=(type const &value)
Definition: sync.h:612
Future(cow_string &&name)
Definition: sync.h:559
type const & value() const &
Definition: sync.h:642
bool valid() const noexcept
Definition: sync.h:578
type const * operator*() const
Definition: sync.h:658
Future(cow_string const &name="Future")
Definition: sync.h:548
void wait()
Definition: sync.h:588
Fiber-aware barrier/gate.
Definition: sync.h:757
Gate(size_t count, cow_string &&name)
Definition: sync.h:767
size_t current() const noexcept
Definition: sync.h:798
virtual ~Gate() override=default
size_t count() const noexcept
Definition: sync.h:794
void wait()
Definition: sync.h:788
Gate(size_t count, cow_string const &name="Gate")
Definition: sync.h:760
bool pass() noexcept
Definition: sync.h:776
Fiber-aware mutex.
Definition: sync.h:340
Mutex(cow_string const &name="Mutex")
Definition: sync.h:343
void lock()
Definition: sync.h:357
Mutex(cow_string &&name)
Definition: sync.h:349
bool trylock() noexcept
Definition: sync.h:365
virtual ~Mutex() override=default
void unlock() noexcept
Definition: sync.h:374
RefCounted() noexcept
Definition: sync.h:40
virtual ~RefCounted()=default
void unused()
Definition: sync.h:52
void used() noexcept
Definition: sync.h:46
Fiber-aware semaphore.
Definition: sync.h:390
void release(size_t count=1) noexcept
Definition: sync.h:426
virtual ~Semaphore() override=default
Semaphore(size_t init=0, cow_string const &name="Semaphore")
Definition: sync.h:393
size_t value() const noexcept
Definition: sync.h:443
Semaphore(size_t init, cow_string &&name)
Definition: sync.h:399
void acquire(size_t count=1)
Definition: sync.h:407
SharedPointer & operator=(SharedPointer const &p)
Definition: sync.h:100
constexpr SharedPointer(T *object=nullptr) noexcept
Definition: sync.h:66
constexpr T * operator->() const noexcept
Definition: sync.h:136
constexpr T * release() noexcept
Definition: sync.h:142
constexpr SharedPointer(SharedPointer const &p) noexcept
Definition: sync.h:73
void reset(T *object=nullptr)
Definition: sync.h:85
constexpr T * get() const noexcept
Definition: sync.h:120
constexpr SharedPointer & operator=(SharedPointer &&p) noexcept
Definition: sync.h:113
SharedPointer & operator=(T *object)
Definition: sync.h:94
constexpr SharedPointer(SharedPointer &&p) noexcept
Definition: sync.h:107
virtual ~SharedPointer()
Definition: sync.h:80
constexpr T * operator*() const noexcept
Definition: sync.h:130
Fiber-aware signal.
Definition: sync.h:456
void signal(bool queue=true, bool queueEveryTime=false) noexcept
Definition: sync.h:509
void signalAll(bool queue=true) noexcept
Definition: sync.h:520
Signal(cow_string &&name)
Definition: sync.h:465
virtual ~Signal() override=default
void wait()
Definition: sync.h:473
bool wait(TimeInterval const &timeout, Timestamp const &now=Timestamp::now())
Definition: sync.h:504
Signal(cow_string const &name="Signal")
Definition: sync.h:459
bool wait(Timestamp const &timeout, Timestamp const &now=Timestamp::now())
Definition: sync.h:488
void reset() noexcept
Definition: sync.h:528
virtual bool poll(Timestamp const &now=Timestamp::now()) noexcept override
Definition: sync.h:258
TimedWaitable base
Definition: sync.h:247
virtual ~AlarmClock() override=default
bool rang() const noexcept
Definition: sync.h:269
AlarmClock(Synchronizer &synchronizer, Fiber &fiber, Timestamp const &timeout) noexcept
Definition: sync.h:248
virtual ~Synchronizer() override
Definition: sync.h:170
void block()
Definition: sync.h:177
bool unblockFirst() noexcept
Definition: sync.h:209
Synchronizer(cow_string const &name="Synchronizer")
Definition: sync.h:158
bool unblockAll() noexcept
Definition: sync.h:225
bool block(TimeInterval const &timeout, Timestamp const &now=Timestamp::now())
Block, with timeout.
Definition: sync.h:299
Synchronizer(cow_string &&name)
Definition: sync.h:164
bool block(Timestamp const &timeout, Timestamp const &now=Timestamp::now())
Block, with timeout.
Definition: sync.h:286
bool unblock(Fiber &f) noexcept
Unblock the specific fiber.
Definition: sync.h:194
friend class AlarmClock
Definition: sync.h:280
Convenient wrapper around struct timespec that contains a time interval.
Definition: time.h:52
constexpr bool isNegative() const noexcept
Definition: time.h:219
constexpr bool isNull() const noexcept
Definition: time.h:229
virtual bool poll(Timestamp const &now=Timestamp::now()) noexcept override
Definition: waiter.h:84
Timestamp const & timeout() const noexcept
Definition: waiter.h:79
Convenient wrapper around struct timespec that contains an absolute timestamp.
Definition: time.h:527
static Timestamp now()
Definition: time.h:554
static constexpr Timestamp null() noexcept
Definition: time.h:674
friend cow_string str(UniqueIDBase const &)
Keeps track of a process-wide unique ID within the type T.
Definition: util.h:657
virtual char const * id_str() const override
Definition: util.h:751
string const & name() const noexcept
Definition: util.h:725
Fiber & fiber() const noexcept
Definition: waiter.h:36
void scheduleTask(TimedWaitable &w)
Definition: waiter.cpp:62
void unscheduleTask(TimedWaitable &w)
Definition: waiter.cpp:74
The class that manages the fibers within this thread.
Definition: worker.h:38
void release(Fiber &fiber) noexcept
Definition: worker.h:147
Waiter & waiter() noexcept
Definition: worker.h:101
bool schedule(Fiber *preferFiber=nullptr, Timestamp const &now=Timestamp::now())
Definition: worker.h:164
void add(Fiber *fiber) noexcept
Definition: worker.h:106
Copy-on-write string.
Definition: util.h:344
int zth_cond_broadcast(zth_cond_t *cond) noexcept
Signals all fibers waiting for the condition.
Definition: sync.h:1065
int zth_future_valid(zth_future_t *future) noexcept
Checks if a future was already set.
Definition: sync.h:1137
int zth_mutex_trylock(zth_mutex_t *mutex) noexcept
Try to lock a mutex.
Definition: sync.h:871
int zth_gate_pass(zth_gate_t *gate) noexcept
Passes a gate.
Definition: sync.h:1243
int zth_mutex_init(zth_mutex_t *mutex) noexcept
Initializes a mutex.
Definition: sync.h:819
int zth_cond_signal(zth_cond_t *cond) noexcept
Signals one fiber waiting for the condition.
Definition: sync.h:1051
int zth_gate_init(zth_gate_t *gate, size_t count) noexcept
Initializes a gate.
Definition: sync.h:1205
int zth_cond_wait(zth_cond_t *cond) noexcept
Wait for a condition.
Definition: sync.h:1079
int zth_cond_destroy(zth_cond_t *cond) noexcept
Destroys a condition.
Definition: sync.h:1033
int zth_future_get(zth_future_t *__restrict__ future, uintptr_t *__restrict__ value) noexcept
Wait for and return a future's value.
Definition: sync.h:1169
int zth_gate_wait(zth_gate_t *gate) noexcept
Wait for a gate.
Definition: sync.h:1256
int zth_cond_init(zth_cond_t *cond) noexcept
Initializes a condition.
Definition: sync.h:1013
int zth_sem_post(zth_sem_t *sem) noexcept
Increments a semaphore.
Definition: sync.h:959
int zth_mutex_lock(zth_mutex_t *mutex) noexcept
Locks a mutex.
Definition: sync.h:857
int zth_mutex_destroy(zth_mutex_t *mutex) noexcept
Destroys a mutex.
Definition: sync.h:839
int zth_future_destroy(zth_future_t *future) noexcept
Destroys a future.
Definition: sync.h:1119
int zth_future_set(zth_future_t *future, uintptr_t value) noexcept
Sets a future and signals all waiting fibers.
Definition: sync.h:1150
int zth_sem_init(zth_sem_t *sem, size_t value) noexcept
Initializes a semaphore.
Definition: sync.h:902
int zth_sem_destroy(zth_sem_t *sem) noexcept
Destroys a semaphore.
Definition: sync.h:922
int zth_sem_trywait(zth_sem_t *sem) noexcept
Try to decrement a semaphore.
Definition: sync.h:991
int zth_future_wait(zth_future_t *future) noexcept
Wait for a future.
Definition: sync.h:1183
int zth_gate_destroy(zth_gate_t *gate) noexcept
Destroys a gate.
Definition: sync.h:1225
int zth_sem_getvalue(zth_sem_t *__restrict__ sem, size_t *__restrict__ value) noexcept
Returns the value of a semaphore.
Definition: sync.h:941
int zth_future_init(zth_future_t *future) noexcept
Initializes a future.
Definition: sync.h:1099
int zth_mutex_unlock(zth_mutex_t *mutex) noexcept
Unlock a mutex.
Definition: sync.h:884
int zth_sem_wait(zth_sem_t *sem) noexcept
Decrements (or wait for) a semaphore.
Definition: sync.h:977
void yield(Fiber *preferFiber=nullptr, bool alwaysYield=false, Timestamp const &now=Timestamp::now())
Allow a context switch.
Definition: worker.h:435
TimeInterval lock(Fsm &fsm)
A wrapper for zth::Fsm::guardLock().
Definition: fsm.h:138
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition: util.h:210
std::basic_string< char, std::char_traits< char >, Config::Allocator< char >::type > string
std::string type using Config::Allocator::type.
Definition: util.h:335
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition: allocator.h:114
#define constexpr14
Definition: macros.h:201
#define is_default
Definition: macros.h:205
#define LREF_QUALIFIED
Definition: macros.h:208
#define ZTH_INLINE
Definition: macros.h:130
Definition: allocator.h:23
void getContext(Worker **worker, Fiber **fiber) noexcept
Definition: worker.h:412
The configuration of Zth.
Definition: zth_config.h:22
void * p
Definition: sync.h:1005
void * p
Definition: sync.h:1089
void * p
Definition: sync.h:1193
void * p
Definition: sync.h:811
void * p
Definition: sync.h:894
zth::Future< uintptr_t > zth_future_t_type
Definition: sync.h:1092
#define EOVERFLOW
Definition: sync.h:951
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition: util.h:236
#define likely(expr)
Marks the given expression to likely be evaluated to true.
Definition: util.h:42
#define ZTH_CLASS_NOCOPY(Class)
Definition: util.h:254
#define unlikely(expr)
Marks the given expression to likely be evaluated to true.
Definition: util.h:56