Zth (libzth)
Loading...
Searching...
No Matches
coro.h
Go to the documentation of this file.
1#ifndef ZTH_CORO_H
2#define ZTH_CORO_H
3/*
4 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 */
8
15#if defined(__cplusplus) && __cplusplus >= 202002L
16# define ZTH_HAVE_CORO
17#endif
18
19#ifdef ZTH_HAVE_CORO
20# include <libzth/macros.h>
21
22# include <libzth/exception.h>
23# include <libzth/sync.h>
24
25# include <coroutine>
26
27namespace zth {
28namespace coro {
29
30
31
33// Awaitable types
34//
35
36template <typename T>
37static inline decltype(auto) awaitable(T&& awaitable) noexcept
38{
39 return std::forward<T>(awaitable);
40}
41
42static inline decltype(auto) awaitable(zth::Mutex& mutex) noexcept
43{
44 struct impl {
45 zth::Mutex& mutex;
46
47 char const* id_str() const noexcept
48 {
49 return mutex.id_str();
50 }
51
52 bool await_ready() const noexcept
53 {
54 return true;
55 }
56
57 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
58
59 zth::Locked await_resume() const
60 {
61 return zth::Locked{mutex};
62 }
63 };
64
65 return impl{mutex};
66}
67
68static inline decltype(auto) awaitable(zth::Semaphore& sem) noexcept
69{
70 struct impl {
71 zth::Semaphore& sem;
72
73 char const* id_str() const noexcept
74 {
75 return sem.id_str();
76 }
77
78 bool await_ready() const noexcept
79 {
80 return true;
81 }
82
83 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
84
85 decltype(auto) await_resume() const
86 {
87 return sem.acquire();
88 }
89 };
90
91 return impl{sem};
92}
93
94static inline decltype(auto) awaitable(zth::Signal& signal) noexcept
95{
96 struct impl {
97 zth::Signal& signal;
98
99 char const* id_str() const noexcept
100 {
101 return signal.id_str();
102 }
103
104 bool await_ready() const noexcept
105 {
106 return true;
107 }
108
109 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
110
111 decltype(auto) await_resume() const
112 {
113 return signal.wait();
114 }
115 };
116
117 return impl{signal};
118}
119
120template <typename T>
121static inline decltype(auto) awaitable(zth::Future<T>& future) noexcept
122{
123 struct impl {
125
126 char const* id_str() const noexcept
127 {
128 return future.id_str();
129 }
130
131 bool await_ready() const noexcept
132 {
133 return true;
134 }
135
136 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
137
138 decltype(auto) await_resume() const
139 {
140 return *future;
141 }
142 };
143
144 return impl{future};
145}
146
147template <typename T>
148static inline decltype(auto) awaitable(zth::Mailbox<T>& mailbox) noexcept
149{
150 struct impl {
151 zth::Mailbox<T>& mailbox;
152
153 char const* id_str() const noexcept
154 {
155 return mailbox.id_str();
156 }
157
158 bool await_ready() const noexcept
159 {
160 return true;
161 }
162
163 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
164
165 decltype(auto) await_resume() const
166 {
167 return mailbox.take();
168 }
169 };
170
171 return impl{mailbox};
172}
173
174static inline decltype(auto) awaitable(zth::Gate& gate) noexcept
175{
176 struct impl {
177 zth::Gate& gate;
178
179 char const* id_str() const noexcept
180 {
181 return gate.id_str();
182 }
183
184 bool await_ready() const noexcept
185 {
186 return true;
187 }
188
189 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
190
191 void await_resume() const
192 {
193 gate.wait();
194 }
195 };
196
197 return impl{gate};
198}
199
200
201
203// Promise base classes
204//
205
206template <typename Promise, typename Awaitable>
208 Promise& promise;
209 Awaitable awaitable;
210 bool suspended = false;
211
212 char const* id_str() const noexcept requires(requires(Awaitable a) { a.id_str(); })
213 {
214 return awaitable.id_str();
215 }
216
217 decltype(auto) await_ready() noexcept
218 {
219 return awaitable.await_ready();
220 }
221
222 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
223 {
224 zth_assert(promise.typedHandle().address() == h.address());
225
226 if constexpr(requires(Awaitable a) { a.id_str(); })
227 zth_dbg(coro, "[%s] Await suspend by %s", promise.id_str(),
228 awaitable.id_str());
229 else
230 zth_dbg(coro, "[%s] Await suspend", promise.id_str());
231
232 suspended = true;
233 promise.running(false);
234
235 if constexpr(std::is_void_v<decltype(awaitable.await_suspend(h))>) {
236 awaitable.await_suspend(h);
237 return h;
238 } else if constexpr(std::is_same_v<bool, decltype(awaitable.await_suspend(h))>) {
239 awaitable.await_suspend(h);
240 return h;
241 } else if constexpr(std::is_convertible_v<
242 decltype(awaitable.await_suspend(h)),
243 std::coroutine_handle<>>) {
244 auto c = awaitable.await_suspend(h);
245 return c ? c : h;
246 } else {
247 static_assert(
248 (sizeof(Awaitable), false),
249 "await_suspend must return void, bool, or std::coroutine_handle<>");
250 }
251 }
252
253 decltype(auto) await_resume()
254 {
255 if(suspended) {
256 zth_dbg(coro, "[%s] Await resume", promise.id_str());
257 promise.running(true);
258 }
259 return awaitable.await_resume();
260 }
261};
262
263template <typename Promise, typename Awaitable>
265
266class promise_base : public RefCounted, public UniqueID<promise_base> {
267public:
268 static ZTH_MALLOC_INLINE void operator delete(void* ptr, std::size_t n) noexcept
269 {
270 ::zth::deallocate<char>(static_cast<char*>(ptr), n);
271 }
272
273 ZTH_MALLOC_ATTR((malloc, alloc_size(1)))
274 __attribute__((returns_nonnull, warn_unused_result)) static void*
275 operator new(std::size_t n)
276 {
277 return ::zth::allocate<char>(n);
278 }
279
280 virtual ~promise_base() noexcept override = default;
281
282 virtual bool running() const noexcept = 0;
283 virtual std::coroutine_handle<> handle() noexcept = 0;
284
285 std::coroutine_handle<> continuation() noexcept
286 {
287 auto c = m_continuation;
289 return c ? c : std::noop_coroutine();
290 }
291
292 bool has_continuation() const noexcept
293 {
294 return static_cast<bool>(m_continuation);
295 }
296
297 void set_continuation(std::coroutine_handle<> cont = {}) noexcept
298 {
299 zth_assert(!m_continuation || !cont || m_continuation == cont);
300 m_continuation = cont;
301 }
302
303protected:
304 explicit promise_base(cow_string const& name)
305 : UniqueID{name}
306 {}
307
308private:
309 std::coroutine_handle<> m_continuation;
310};
311
312template <typename Promise>
313class promise : public promise_base {
314public:
315 using promise_type = Promise;
316
317protected:
318 explicit promise(cow_string const& name)
320 {}
321
322public:
323 virtual ~promise() noexcept override
324 {
326 }
327
328 __attribute__((pure)) auto typedHandle() noexcept
329 {
330 return std::coroutine_handle<promise_type>::from_promise(self());
331 }
332
333 virtual std::coroutine_handle<> handle() noexcept final
334 {
335 return typedHandle();
336 }
337
338 decltype(auto) initial_suspend() noexcept
339 {
340 struct awaiter {
341 promise& p;
342
343 bool await_ready() noexcept
344 {
345 return false;
346 }
347
348 void await_suspend(std::coroutine_handle<>) noexcept {}
349
350 void await_resume() noexcept
351 {
352 p.running(true);
353 zth_dbg(coro, "[%s] Initial resume", p.id_str());
354 }
355 };
356
357 return awaiter{*this};
358 }
359
360 decltype(auto) final_suspend() noexcept
361 {
362 struct impl {
363 promise_type& p;
364
365 bool await_ready() noexcept
366 {
367 return false;
368 }
369
370 std::coroutine_handle<> await_suspend(std::coroutine_handle<>) noexcept
371 {
372 p.running(false);
373 return p.continuation();
374 }
375
376 void await_resume() noexcept {}
377 };
378
379 return impl{self()};
380 }
381
382 template <typename A>
383 decltype(auto) await_transform(A&& a) noexcept
384 {
385 return promise_awaitable{self(), awaitable(std::forward<A>(a))};
386 }
387
388 template <typename P, typename A>
389 friend class promise_awaitable;
390
391 virtual bool running() const noexcept final
392 {
393 return m_running || this->has_continuation();
394 }
395
396 void run()
397 {
398 if(running())
399 // Runs in another fiber.
401
402 auto h = handle();
403 zth_assert(h);
404
405 zth_dbg(coro, "[%s] Run", id_str());
406
407 while(!self().completed())
408 h();
409 }
410
411protected:
412 void running(bool set) noexcept
413 {
414 zth_assert(m_running != set);
415 m_running = set;
416 }
417
418 promise_type const& self() const noexcept
419 {
420 return static_cast<promise_type const&>(*this);
421 }
422
423 promise_type& self() noexcept
424 {
425 return static_cast<promise_type&>(*this);
426 }
427
428private:
429 // Invoked when RefCounted count reaches zero.
430 virtual void cleanup() noexcept final
431 {
432 zth_dbg(coro, "[%s] Cleanup", id_str());
433 handle().destroy();
434 }
435
436private:
437 bool m_running = false;
438};
439
440
441
443// Task
444//
445// A task is a coroutine that produces a single result value (or void).
446//
447
448template <typename T>
449class task_promise;
450
451template <typename Task, typename Fiber>
453 Task task;
455
456 decltype(auto) operator*()
457 {
458 return *task.future();
459 }
460
461 decltype(auto) operator->()
462 {
463 return &*task.future();
464 }
465
466 template <typename M>
467 requires(std::is_base_of_v<FiberManipulator, M>) task_fiber& operator<<(M const& m) &
468 {
470 fiber << m;
471 return *this;
472 }
473
474 template <typename M>
475 requires(std::is_base_of_v<FiberManipulator, M>) task_fiber&& operator<<(M const& m) &&
476 {
477 *this << m;
478 return std::move(*this);
479 }
480
481 operator zth::Fiber&() const noexcept
482 {
484 return fiber;
485 }
486
487 operator typename Task::Future_type &() noexcept
488 {
489 return task.future();
490 }
491
492 decltype(auto) operator<<(asFuture const&)
493 {
495 return task.future();
496 }
497};
498
499template <typename Task, typename Fiber>
501
502template <typename T, typename F>
503static inline void joinable(task_fiber<T, F> const& tf, Gate& g, Hook<Gate&>& join) noexcept
504{
505 (void)join;
506 static_cast<zth::Fiber&>(tf) << passOnExit(g);
507}
508
514template <typename T = void>
515class task {
516public:
517 using return_type = T;
520
521 explicit task(std::coroutine_handle<promise_type> h) noexcept
522 : m_promise{h.promise()}
523 {}
524
525 explicit task(zth::SharedPointer<promise_type> const& p) noexcept
526 : m_promise{p}
527 {}
528
530 : m_promise{std::move(p)}
531 {}
532
533 explicit task(promise_type* p = nullptr) noexcept
534 : m_promise{p}
535 {}
536
537 promise_type* promise() const noexcept
538 {
539 return m_promise.get();
540 }
541
542 char const* id_str() const noexcept
543 {
544 auto* p = promise();
545 return p ? p->id_str() : "coro::task";
546 }
547
548 void setName(string name) noexcept
549 {
550 auto* p = promise();
551 if(p)
552 p->setName(std::move(name));
553 }
554
555 bool completed() const noexcept
556 {
557 auto* p = promise();
558 return !p || p->completed();
559 }
560
562 {
563 auto* p = promise();
564 if(!p)
566
567 // cppcheck-suppress nullPointerRedundantCheck
568 return p->future();
569 }
570
571 decltype(auto) result()
572 {
573 return *future();
574 }
575
576 decltype(auto) run()
577 {
578 auto* p = promise();
579 if(!p)
581
582 // cppcheck-suppress nullPointerRedundantCheck
583 p->run();
584 return result();
585 }
586
587 decltype(auto) operator()()
588 {
589 return run();
590 }
591
592 decltype(auto) fiber(char const* name = nullptr)
593 {
594 if(!name) {
595 name = "coro::fiber";
596 } else {
597 auto* p = promise();
598 if(p)
599 p->setName(name);
600 }
601
602 return task_fiber{*this, zth::factory([](task t) { t.run(); }, name)(*this)};
603 }
604
605 auto& operator co_await() noexcept
606 {
607 return *this;
608 }
609
610 bool await_ready() const noexcept
611 {
612 return completed() || promise()->running();
613 }
614
615 std::coroutine_handle<promise_type> await_suspend(std::coroutine_handle<> h) noexcept
616 {
617 auto* p = promise();
618 zth_assert(p && !p->running());
619 p->set_continuation(h);
620 return p->typedHandle();
621 }
622
623 decltype(auto) await_resume()
624 {
625 return result();
626 }
627
628private:
630};
631
632template <typename T>
633class task_promise_base : public promise<task_promise<T>> {
634public:
635 using return_type = T;
637 using typename base::promise_type;
640
642 : base{"coro::task"}
643 {}
644
645 virtual ~task_promise_base() noexcept override = default;
646
647 Future_type& future() noexcept
648 {
649 return m_future;
650 }
651
652 bool completed() const noexcept
653 {
654 return m_future.valid();
655 }
656
657 auto get_return_object() noexcept
658 {
659 zth_dbg(coro, "[%s] New %p", this->id_str(), this->handle().address());
660 return task{static_cast<promise_type*>(this)};
661 }
662
663 void unhandled_exception() noexcept
664 {
665 zth_dbg(coro, "[%s] Exit with exception", this->id_str());
666 m_future.set(std::current_exception());
667 }
668
669private:
670 Future_type m_future;
671};
672
673template <typename T>
674class task_promise final : public task_promise_base<T> {
675public:
677 using typename base::Future_type;
678 using typename base::return_type;
679
680 virtual ~task_promise() noexcept override = default;
681
682 template <typename T_>
683 void return_value(T_&& v) noexcept
684 {
685 zth_dbg(coro, "[%s] Returned", this->id_str());
686 this->future().set(std::forward<T_>(v));
687 }
688};
689
690template <>
691class task_promise<void> final : public task_promise_base<void> {
692public:
694 using return_type = void;
695 using typename base::Future_type;
696
697 virtual ~task_promise() noexcept override = default;
698
699 void return_void() noexcept
700 {
701 zth_dbg(coro, "[%s] Returned", this->id_str());
702 this->future().set();
703 }
704};
705
706
707
709// Generators
710//
711// A generator is a coroutine that produces a sequence of values. It does not produce a
712// return value.
713//
714
715template <typename T>
716class Mailbox;
717
718template <typename T>
719class Mailbox : public zth::Mailbox<T> {
721public:
723 using type = typename base::type;
724
725 explicit Mailbox(promise_base& owner, cow_string const& name = "coro::Mailbox")
726 : base{name}
727 , m_owner{&owner}
728 {}
729
730 virtual ~Mailbox() noexcept override = default;
731
732 promise_base& owner() const noexcept
733 {
735 return *m_owner;
736 }
737
738 std::coroutine_handle<> waiter() const noexcept
739 {
740 return m_waiter;
741 }
742
743 bool valid(std::coroutine_handle<> waiter) const noexcept
744 {
745 return base::valid(waiter ? waiter.address() : typename base::assigned_type{});
746 }
747
748 using base::valid;
749
750 void wait(Listable& item, std::coroutine_handle<> waiter) noexcept
751 {
752 this->enqueue(item, this->Queue_Take).user() = waiter.address();
753 zth_dbg(coro, "[%s] Block coro %p", this->id_str(), waiter.address());
754 }
755
756 using base::wait;
757
758 type take(std::coroutine_handle<> waiter)
759 {
760 if(m_waiter == waiter)
761 m_waiter = std::coroutine_handle<>{};
762
763 return base::take(waiter.address());
764 }
765
766 using base::take;
767
768 void abandon()
769 {
771 m_owner = nullptr;
772 }
773
774 bool abandoned() const noexcept
775 {
776 return !m_owner;
777 }
778
779protected:
780 virtual void wait(typename base::assigned_type assigned) override
781 {
782 while(!valid(assigned)) {
783 if(abandoned())
785 this->block((size_t)base::Queue_Take);
786 }
787 }
788
789 virtual typename base::assigned_type
790 wakeup(Listable& item, typename base::queue_type::user_type user,
791 bool prio) noexcept override
792 {
793 if(!user) {
794 // A fiber woke up. Pass through, we are not waiting for fibers
795 // here.
796 return base::wakeup(item, user, prio);
797 }
798
799 zth_dbg(coro, "[%s] Unblock coro %s", this->id_str(), str(user).c_str());
800 m_waiter = std::coroutine_handle<>::from_address(user);
801 owner().set_continuation(m_waiter);
802 return user;
803 }
804
805private:
806 promise_base* m_owner = nullptr;
807 std::coroutine_handle<> m_waiter;
808};
809
810template <typename T>
811static inline decltype(auto) awaitable(zth::coro::Mailbox<T>& mb) noexcept
812{
813 struct impl {
814 Mailbox<T>& mailbox;
815 Listable item{};
816 std::coroutine_handle<> waiter{};
817
818 char const* id_str() const noexcept
819 {
820 return mailbox.owner().id_str();
821 }
822
823 bool await_ready() noexcept
824 {
825 return mailbox.abandoned()
826 || mailbox.valid(typename Mailbox<T>::assigned_type{})
827 || mailbox.owner().running();
828 }
829
830 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
831 {
832 waiter = h;
833 mailbox.wait(item, waiter);
834 return mailbox.owner().handle();
835 }
836
837 decltype(auto) await_resume()
838 {
839 if(mailbox.valid(waiter)) {
840 // Got it.
841 return mailbox.take(waiter);
842 } else if(mailbox.abandoned()) {
843 // The generator has completed. Can't resume.
844 zth_throw(coro_already_completed{});
845 } else {
846 // The generator runs in another fiber. Just suspend our
847 // fiber while waiting.
848 return mailbox.take();
849 }
850 }
851 };
852
853 return impl{mb};
854}
855
856template <typename Generator, typename Fiber>
858 Generator generator;
860
861 template <typename M>
862 requires(std::is_base_of_v<FiberManipulator, M>) generator_fiber& operator<<(M const& m) &
863 {
865 fiber << m;
866 return *this;
867 }
868
869 template <typename M>
870 requires(std::is_base_of_v<FiberManipulator, M>) generator_fiber&& operator<<(M const& m) &&
871 {
872 *this << m;
873 return std::move(*this);
874 }
875
876 operator zth::Fiber&() const noexcept
877 {
879 return fiber;
880 }
881
882private:
883 void operator<<(asFuture const&);
884};
885
886template <typename Generator, typename Fiber>
888
889template <typename G, typename F>
890static inline void joinable(generator_fiber<G, F> const& gf, Gate& g, Hook<Gate&>& join) noexcept
891{
892 (void)join;
893 static_cast<zth::Fiber&>(gf) << passOnExit(g);
894}
895
896template <typename... T>
897class generator_promise;
898
905template <typename... T>
906requires(sizeof...(T) >= 1, (!std::is_void_v<T> && ...)) class generator {
907public:
910
911 explicit generator(std::coroutine_handle<promise_type> h) noexcept
912 : m_promise{h.promise()}
913 {}
914
915 explicit generator(zth::SharedPointer<promise_type> const& p) noexcept
916 : m_promise{p}
917 {}
918
920 : m_promise{std::move(p)}
921 {}
922
923 explicit generator(promise_type* p = nullptr) noexcept
924 : m_promise{p}
925 {}
926
927 promise_type* promise() const noexcept
928 {
929 return m_promise.get();
930 }
931
932 char const* id_str() const noexcept
933 {
934 auto* p = promise();
935 return p ? p->id_str() : "coro::generator";
936 }
937
938 void setName(string name) noexcept
939 {
940 auto* p = promise();
941 if(p)
942 p->setName(std::move(name));
943 }
944
945 bool completed() const noexcept
946 {
947 auto* p = m_promise.get();
948 return !p || p->completed();
949 }
950
951 template <typename U = typename promise_type::first_yield_type>
952 auto& mailbox() const
953 {
954 auto* p = m_promise.get();
955 if(!p)
957
958 // cppcheck-suppress nullPointerRedundantCheck
959 return p->template mailbox<U>();
960 }
961
962 template <typename U>
963 decltype(auto) as() const
964 {
965 return mailbox<U>();
966 }
967
968 template <typename U = typename promise_type::first_yield_type>
969 bool valid() const noexcept
970 {
971 auto* p = m_promise.get();
972 return p && p->template mailbox<U>().valid();
973 }
974
975 template <typename U = typename promise_type::first_yield_type>
976 decltype(auto) value()
977 {
978 auto* p = m_promise.get();
979 if(!p)
981
982 // cppcheck-suppress nullPointerRedundantCheck
983 return p->template value<U>();
984 }
985
986 template <typename U = typename promise_type::first_yield_type>
987 void generate()
988 {
989 auto* p = m_promise.get();
990 if(!p)
992
993 // cppcheck-suppress nullPointerRedundantCheck
994 p->template generate<U>();
995 }
996
997 void run()
998 {
999 auto* p = m_promise.get();
1000 if(!p)
1002
1003 // cppcheck-suppress nullPointerRedundantCheck
1004 p->run();
1005
1006 if(!completed())
1007 return;
1008
1009# ifdef ZTH_FUTURE_EXCEPTION
1010 auto exc = p->mailbox().exception();
1011 if(exc)
1012 std::rethrow_exception(exc);
1013# endif // ZTH_FUTURE_EXCEPTION
1014 }
1015
1016 decltype(auto) fiber(char const* name = nullptr)
1017 {
1018 if(!name) {
1019 name = "coro::fiber";
1020 } else {
1021 auto* p = promise();
1022 if(p)
1023 p->setName(name);
1024 }
1025
1026 return generator_fiber{
1027 *this, zth::factory([](generator g) { g.run(); }, name)(*this)};
1028 }
1029
1031 // Range support
1032 //
1033
1034 decltype(auto) begin()
1035 {
1036 auto* p = m_promise.get();
1037 if(!p)
1039
1040 // cppcheck-suppress nullPointerRedundantCheck
1041 return p->begin();
1042 }
1043
1045 {
1046 return {};
1047 }
1048
1049private:
1051};
1052
1053template <template <typename...> class G, typename... T>
1054requires(std::is_same_v<std::remove_reference_t<G<T...>>, generator<T...>>) //
1055 static inline decltype(auto) awaitable(G<T...> g) noexcept
1056{
1057 static_assert(
1058 sizeof...(T) == 1, "Use generator.as<type>() to select the yield type to await on");
1059 return awaitable(g.mailbox());
1060}
1061
1062namespace impl {
1063
1064template <typename T>
1065using generator_Mailbox_type = Mailbox<std::conditional_t<
1066 std::is_reference_v<T>, std::reference_wrapper<std::remove_reference_t<T>>, T>>;
1067
1068template <typename A, typename B>
1069inline constexpr bool is_identical_v = std::is_same_v<A, B>;
1070
1071template <typename A, typename B>
1072inline constexpr bool is_similar_v =
1073 is_identical_v<A, B> || std::is_same_v<std::decay_t<A>, std::decay_t<B>>;
1074
1075template <typename A, typename B>
1076inline constexpr bool is_convertible_v = is_similar_v<A, B> || std::is_convertible_v<A, B&>;
1077
1078template <typename Needle, typename... Haystack>
1079struct has_identical_type : std::false_type {};
1080
1081template <typename Needle, typename First, typename... Rest>
1082struct has_identical_type<Needle, First, Rest...>
1083 : std::conditional_t<
1084 is_identical_v<Needle, First>, std::true_type,
1085 has_identical_type<Needle, Rest...>> {};
1086
1087template <typename Needle, typename... Haystack>
1088struct find_identical_type {};
1089
1090template <typename Needle, typename First, typename... Rest>
1091struct find_identical_type<Needle, First, Rest...> {
1092 using type = std::conditional_t<
1093 is_identical_v<Needle, First>, First,
1094 typename find_identical_type<Needle, Rest...>::type>;
1095};
1096
1097template <typename Needle, typename... Haystack>
1098struct has_similar_type : std::false_type {};
1099
1100template <typename Needle, typename First, typename... Rest>
1101struct has_similar_type<Needle, First, Rest...>
1102 : std::conditional_t<
1103 is_similar_v<Needle, First> && !has_identical_type<Needle, Rest...>::value,
1104 std::true_type, has_similar_type<Needle, Rest...>> {};
1105
1106template <typename Needle, typename... Haystack>
1107struct find_similar_type {
1108 using type = void;
1109};
1110
1111template <typename Needle, typename First, typename... Rest>
1112struct find_similar_type<Needle, First, Rest...> {
1113 using type = std::conditional_t<
1114 is_similar_v<Needle, First> && !has_identical_type<Needle, Rest...>::value, First,
1115 typename find_similar_type<Needle, Rest...>::type>;
1116};
1117
1118template <typename Needle, typename... Haystack>
1119struct find_convertible_type {
1120 using type = void;
1121};
1122
1123template <typename Needle, typename First, typename... Rest>
1124struct find_convertible_type<Needle, First, Rest...> {
1125 using type = std::conditional_t<
1126 is_convertible_v<Needle, First> && !has_similar_type<Needle, Rest...>::value, First,
1127 typename find_convertible_type<Needle, Rest...>::type>;
1128};
1129
1130template <typename A, typename T>
1131static inline A repeat(A a)
1132{
1133 return a;
1134}
1135
1136} // namespace impl
1137
1138template <typename Needle, typename... Haystack>
1139struct find_type : impl::find_convertible_type<Needle, Haystack...> {};
1140
1141template <typename... T>
1142class generator_promise final : public promise<generator_promise<T...>> {
1143public:
1144 using Mailbox_types = std::tuple<impl::generator_Mailbox_type<T>...>;
1146 using first_yield_type = typename std::tuple_element_t<0, std::tuple<T...>>;
1148
1150 // Administration
1151 //
1152
1154 : base{"coro::generator"}
1155 , m_mailbox{*impl::repeat<generator_promise*, T>(this)...}
1156 {}
1157
1158 virtual ~generator_promise() noexcept override = default;
1159
1160 template <typename U = first_yield_type>
1161 auto const& mailbox() const noexcept
1162 {
1163 return std::get<impl::generator_Mailbox_type<U>>(m_mailbox);
1164 }
1165
1166 template <typename U = first_yield_type>
1167 auto& mailbox() noexcept
1168 {
1169 return std::get<impl::generator_Mailbox_type<U>>(m_mailbox);
1170 }
1171
1172 bool completed() const noexcept
1173 {
1174 return m_completed;
1175 }
1176
1177 template <typename U = first_yield_type>
1178 bool valid() const noexcept
1179 {
1180 return mailbox<U>().valid() && !completed();
1181 }
1182
1183 template <typename U = first_yield_type>
1184 decltype(auto) value()
1185 {
1186 if(completed())
1188
1189 return mailbox<U>().take();
1190 }
1191
1192 template <typename U = first_yield_type>
1194 {
1195 if(completed())
1197 if(mailbox<U>().valid())
1198 return;
1199 if(this->running())
1200 // Just wait for another fiber to fill the mailbox.
1201 return;
1202
1203 auto h = this->typedHandle();
1204 zth_assert(h && !h.done());
1205
1206 zth_dbg(coro, "[%s] Generate", this->id_str());
1207 while(!mailbox<U>().valid() && !completed()) {
1208 // Always return at the next yield.
1209 this->set_continuation(std::noop_coroutine());
1210 h();
1211 }
1212 }
1213
1214 auto get_return_object() noexcept
1215 {
1216 zth_dbg(coro, "[%s] New %p", this->id_str(), this->handle().address());
1217 return generator{this};
1218 }
1219
1220 std::coroutine_handle<> waiter() const noexcept
1221 {
1222 std::array<std::coroutine_handle<>, sizeof...(T)> waiters = {waiter<T>()...};
1223
1224 for(size_t i = 0; i < sizeof...(T); ++i)
1225 if(waiters[i])
1226 return waiters[i];
1227
1228 return {};
1229 }
1230
1231 template <typename U>
1232 std::coroutine_handle<> waiter() const noexcept
1233 {
1234 return mailbox<U>().waiter();
1235 }
1236
1237 template <typename T_>
1238 decltype(auto) yield_value(T_&& v) noexcept
1239 {
1240 using M = typename find_type<T_, T...>::type;
1241 zth_dbg(coro, "[%s] Yield", this->id_str());
1242 mailbox<M>().put(std::forward<T_>(v));
1243
1244 struct impl {
1245 generator_promise& self;
1246
1247 char const* id_str() const noexcept
1248 {
1249 return "yield";
1250 }
1251
1252 bool await_ready() noexcept
1253 {
1254 return !self.has_continuation();
1255 }
1256
1257 std::coroutine_handle<>
1258 await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept
1259 {
1260 return self.continuation();
1261 }
1262
1263 void await_resume() noexcept {}
1264 };
1265
1266 return this->await_transform(impl{*this});
1267 }
1268
1269 // cppcheck-suppress duplInheritedMember
1270 decltype(auto) final_suspend() noexcept
1271 {
1272 struct impl {
1273 generator_promise& self;
1274 decltype(std::declval<base>().final_suspend()) awaitable;
1275
1276 bool await_ready() noexcept
1277 {
1278 return awaitable.await_ready();
1279 }
1280
1281 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
1282 {
1283 (void)(..., std::get<zth::coro::impl::generator_Mailbox_type<T>>(
1284 self.m_mailbox)
1285 .abandon());
1286
1287 return awaitable.await_suspend(h);
1288 }
1289
1290 decltype(auto) await_resume() noexcept
1291 {
1292 return awaitable.await_resume();
1293 }
1294 };
1295
1296 return impl{*this, base::final_suspend()};
1297 }
1298
1299 void return_void() noexcept
1300 {
1301 zth_dbg(coro, "[%s] Done", this->id_str());
1302 m_completed = true;
1303 }
1304
1305 void unhandled_exception() noexcept
1306 {
1307 zth_dbg(coro, "[%s] Exit with exception", this->id_str());
1308 m_completed = true;
1309 }
1310
1312 // Range support
1313 //
1314
1315 struct end_type {};
1316
1317 template <typename U>
1318 class iterator {
1319 public:
1321 : m_promise{p}
1322 {}
1323
1324 bool operator==(end_type end) const noexcept
1325 {
1326 return m_promise == end;
1327 }
1328
1329 bool operator!=(end_type end) const noexcept
1330 {
1331 return m_promise != end;
1332 }
1333
1335 {
1336 ++m_promise;
1337 return *this;
1338 }
1339
1340 decltype(auto) operator++(int)
1341 {
1342 return m_promise++;
1343 }
1344
1345 decltype(auto) operator*()
1346 {
1347 return *m_promise;
1348 }
1349
1350 decltype(auto) operator->()
1351 {
1352 return &(*m_promise);
1353 }
1354
1355 private:
1356 generator_promise& m_promise;
1357 };
1358
1359 template <typename U = first_yield_type>
1361 {
1362 if(!completed() && !valid<U>())
1363 generate<U>();
1364 return iterator<U>{*this};
1365 }
1366
1367 end_type end() noexcept
1368 {
1369 return {};
1370 }
1371
1372 bool operator==(end_type) const noexcept
1373 {
1374 return completed();
1375 }
1376
1377 bool operator!=(end_type) const noexcept
1378 {
1379 return !completed();
1380 }
1381
1382 template <typename U = first_yield_type>
1384 {
1385 generate<U>();
1386 return *this;
1387 }
1388
1389 template <typename U = first_yield_type>
1390 decltype(auto) operator++(int)
1391 {
1392 auto temp = *this;
1393 generate<U>();
1394 return temp;
1395 }
1396
1397 template <typename U = first_yield_type>
1398 decltype(auto) operator*()
1399 {
1400 zth_assert(valid<U>());
1401 return value<U>();
1402 }
1403
1404 template <typename U = first_yield_type>
1405 decltype(auto) operator->()
1406 {
1407 zth_assert(valid<U>());
1408 return &value<U>();
1409 }
1410
1411private:
1412 Mailbox_types m_mailbox;
1413 bool m_completed = false;
1414};
1415
1416} // namespace coro
1417} // namespace zth
1418
1419#endif // ZTH_HAVE_CORO
1420#endif // ZTH_CORO_H
#define ZTH_MALLOC_INLINE
Definition allocator.h:29
The fiber.
Definition fiber.h:62
Fiber-aware future.
Definition sync.h:1068
void set(type const &value=type()) noexcept
Definition sync.h:1103
bool valid() const noexcept
Definition sync.h:1087
Fiber-aware barrier/gate.
Definition sync.h:1487
void wait()
Definition sync.h:1518
user_type const & user() const noexcept
Definition list.h:193
Mutex RAII, that locks and unlocks the mutex automatically.
Definition sync.h:566
Fiber-aware mailbox.
Definition sync.h:1299
void wait()
Definition sync.h:1349
bool valid() const noexcept
Definition sync.h:1322
type take()
Definition sync.h:1416
void * assigned_type
Definition sync.h:1306
virtual void wakeup(Listable &item, queue_type::user_type user, bool prio, size_t q) noexcept final
Definition sync.h:1440
Fiber-aware mutex.
Definition sync.h:505
virtual char const * id_str() const noexcept override
Definition util.h:787
string const & name() const noexcept
Definition util.h:761
Fiber-aware semaphore.
Definition sync.h:614
void acquire(count_type count=1)
Definition sync.h:635
Fiber-aware signal.
Definition sync.h:692
void wait()
Definition sync.h:709
queue_type::iterator enqueue(Listable &item, size_t q=0) noexcept
Definition sync.h:296
void block(size_t q=0)
Definition sync.h:301
bool unblockAll(size_t q=0, bool prio=false) noexcept
Definition sync.h:340
friend cow_string str(UniqueIDBase const &)
Keeps track of a process-wide unique ID within the type T.
Definition util.h:875
void wait(Listable &item, std::coroutine_handle<> waiter) noexcept
Definition coro.h:750
Mailbox(promise_base &owner, cow_string const &name="coro::Mailbox")
Definition coro.h:725
std::coroutine_handle waiter() const noexcept
Definition coro.h:738
bool abandoned() const noexcept
Definition coro.h:774
void abandon()
Definition coro.h:768
virtual void wait(typename base::assigned_type assigned) override
Definition coro.h:780
typename base::type type
Definition coro.h:723
virtual ~Mailbox() noexcept override=default
virtual base::assigned_type wakeup(Listable &item, typename base::queue_type::user_type user, bool prio) noexcept override
Definition coro.h:790
promise_base & owner() const noexcept
Definition coro.h:732
bool valid(std::coroutine_handle<> waiter) const noexcept
Definition coro.h:743
bool valid() const noexcept
Definition sync.h:1322
type take(std::coroutine_handle<> waiter)
Definition coro.h:758
iterator(generator_promise &p)
Definition coro.h:1320
bool operator!=(end_type end) const noexcept
Definition coro.h:1329
bool operator==(end_type end) const noexcept
Definition coro.h:1324
auto & mailbox() noexcept
Definition coro.h:1167
bool operator!=(end_type) const noexcept
Definition coro.h:1377
decltype(auto) yield_value(T_ &&v) noexcept
Definition coro.h:1238
void unhandled_exception() noexcept
Definition coro.h:1305
bool operator==(end_type) const noexcept
Definition coro.h:1372
decltype(auto) final_suspend() noexcept
Definition coro.h:1270
end_type end() noexcept
Definition coro.h:1367
bool valid() const noexcept
Definition coro.h:1178
void return_void() noexcept
Definition coro.h:1299
virtual ~generator_promise() noexcept override=default
std::coroutine_handle waiter() const noexcept
Definition coro.h:1220
iterator< U > begin()
Definition coro.h:1360
bool completed() const noexcept
Definition coro.h:1172
typename std::tuple_element_t< 0, std::tuple< T... > > first_yield_type
Definition coro.h:1146
auto get_return_object() noexcept
Definition coro.h:1214
std::tuple< impl::generator_Mailbox_type< T >... > Mailbox_types
Definition coro.h:1144
decltype(auto) value()
Definition coro.h:1184
generator_promise & operator++()
Definition coro.h:1383
std::coroutine_handle waiter() const noexcept
Definition coro.h:1232
auto const & mailbox() const noexcept
Definition coro.h:1161
A coroutine generator producing a sequence of values.
Definition coro.h:906
decltype(auto) fiber(char const *name=nullptr)
Definition coro.h:1016
decltype(auto) begin()
Definition coro.h:1034
void setName(string name) noexcept
Definition coro.h:938
decltype(auto) as() const
Definition coro.h:963
generator(std::coroutine_handle< promise_type > h) noexcept
Definition coro.h:911
char const * id_str() const noexcept
Definition coro.h:932
decltype(auto) value()
Definition coro.h:976
auto & mailbox() const
Definition coro.h:952
bool completed() const noexcept
Definition coro.h:945
generator(zth::SharedPointer< promise_type > const &p) noexcept
Definition coro.h:915
bool valid() const noexcept
Definition coro.h:969
promise_type::end_type end()
Definition coro.h:1044
generator(promise_type *p=nullptr) noexcept
Definition coro.h:923
generator(zth::SharedPointer< promise_type > &&p) noexcept
Definition coro.h:919
typename promise_type::Mailbox_types Mailbox_types
Definition coro.h:909
promise_type * promise() const noexcept
Definition coro.h:927
std::coroutine_handle continuation() noexcept
Definition coro.h:285
promise_base(cow_string const &name)
Definition coro.h:304
virtual bool running() const noexcept=0
void set_continuation(std::coroutine_handle<> cont={}) noexcept
Definition coro.h:297
ZTH_MALLOC_ATTR((malloc, alloc_size(1))) static void *operator new(std
Definition coro.h:273
virtual std::coroutine_handle handle() noexcept=0
virtual ~promise_base() noexcept override=default
bool has_continuation() const noexcept
Definition coro.h:292
decltype(auto) final_suspend() noexcept
Definition coro.h:360
promise_type & self() noexcept
Definition coro.h:423
decltype(auto) initial_suspend() noexcept
Definition coro.h:338
decltype(auto) await_transform(A &&a) noexcept
Definition coro.h:383
Promise promise_type
Definition coro.h:315
virtual std::coroutine_handle handle() noexcept final
Definition coro.h:333
virtual bool running() const noexcept final
Definition coro.h:391
promise_type const & self() const noexcept
Definition coro.h:418
virtual ~promise() noexcept override
Definition coro.h:323
promise(cow_string const &name)
Definition coro.h:318
void running(bool set) noexcept
Definition coro.h:412
auto typedHandle() noexcept
Definition coro.h:328
virtual ~task_promise() noexcept override=default
void unhandled_exception() noexcept
Definition coro.h:663
bool completed() const noexcept
Definition coro.h:652
Future< return_type > Future_type
Definition coro.h:638
virtual ~task_promise_base() noexcept override=default
Future_type & future() noexcept
Definition coro.h:647
auto get_return_object() noexcept
Definition coro.h:657
Future< return_type > Future_type
Definition coro.h:638
void return_value(T_ &&v) noexcept
Definition coro.h:683
virtual ~task_promise() noexcept override=default
A coroutine task producing a single result value.
Definition coro.h:515
decltype(auto) await_resume()
Definition coro.h:623
task(zth::SharedPointer< promise_type > const &p) noexcept
Definition coro.h:525
char const * id_str() const noexcept
Definition coro.h:542
void setName(string name) noexcept
Definition coro.h:548
decltype(auto) result()
Definition coro.h:571
task(promise_type *p=nullptr) noexcept
Definition coro.h:533
typename promise_type::Future_type Future_type
Definition coro.h:519
decltype(auto) run()
Definition coro.h:576
promise_type * promise() const noexcept
Definition coro.h:537
std::coroutine_handle< promise_type > await_suspend(std::coroutine_handle<> h) noexcept
Definition coro.h:615
bool await_ready() const noexcept
Definition coro.h:610
task(zth::SharedPointer< promise_type > &&p) noexcept
Definition coro.h:529
decltype(auto) fiber(char const *name=nullptr)
Definition coro.h:592
Future_type & future() const
Definition coro.h:561
bool completed() const noexcept
Definition coro.h:555
task(std::coroutine_handle< promise_type > h) noexcept
Definition coro.h:521
Copy-on-write string.
Definition util.h:339
fiber_type< F >::factory factory(F &&f, char const *name=nullptr)
Create a new fiber.
Definition async.h:1160
@ awaitable
Start fiber, returning a future to await.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:194
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition allocator.h:160
#define zth_throw(...)
Definition macros.h:376
#define UNUSED_PAR(name)
Definition macros.h:79
STL namespace.
::std::future< zth::type< T > > future
Definition future.h:40
Forces the fiber to have a future that outlives the fiber.
Definition async.h:311
generator_fiber & operator<<(M const &m) &
Definition coro.h:862
decltype(auto) await_resume()
Definition coro.h:253
std::coroutine_handle await_suspend(std::coroutine_handle<> h) noexcept
Definition coro.h:222
char const * id_str() const noexcept
Definition coro.h:212
decltype(auto) await_ready() noexcept
Definition coro.h:217
Exception thrown when a coroutine has already completed.
Definition exception.h:36
Exception thrown when an operation cannot be performed due to an invalid coroutine state.
Definition exception.h:31
Makes the fiber pass the given gate upon exit.
Definition async.h:267
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:217