Zth (libzth)
Loading...
Searching...
No Matches
coro.h
Go to the documentation of this file.
1#ifndef ZTH_CORO_H
2#define ZTH_CORO_H
3/*
4 * SPDX-FileCopyrightText: 2019-2026 Jochem Rutgers
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 */
8
15#if defined(__cplusplus) && __cplusplus >= 202002L
16# define ZTH_HAVE_CORO
17#endif
18
19#ifdef ZTH_HAVE_CORO
20# include <libzth/macros.h>
21
22# include <libzth/exception.h>
23# include <libzth/sync.h>
24
25# include <coroutine>
26
27namespace zth {
28namespace coro {
29
30
31
33// Awaitable types
34//
35
36template <typename T>
37static inline decltype(auto) awaitable(T&& awaitable) noexcept
38{
39 return std::forward<T>(awaitable);
40}
41
42static inline decltype(auto) awaitable(zth::Mutex& mutex) noexcept
43{
44 struct impl {
45 zth::Mutex& mutex;
46
47 char const* id_str() const noexcept
48 {
49 return mutex.id_str();
50 }
51
52 bool await_ready() const noexcept
53 {
54 return true;
55 }
56
57 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
58
59 zth::Locked await_resume() const
60 {
61 return zth::Locked{mutex};
62 }
63 };
64
65 return impl{mutex};
66}
67
68static inline decltype(auto) awaitable(zth::Semaphore& sem) noexcept
69{
70 struct impl {
71 zth::Semaphore& sem;
72
73 char const* id_str() const noexcept
74 {
75 return sem.id_str();
76 }
77
78 bool await_ready() const noexcept
79 {
80 return true;
81 }
82
83 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
84
85 decltype(auto) await_resume() const
86 {
87 return sem.acquire();
88 }
89 };
90
91 return impl{sem};
92}
93
94static inline decltype(auto) awaitable(zth::Signal& signal) noexcept
95{
96 struct impl {
97 zth::Signal& signal;
98
99 char const* id_str() const noexcept
100 {
101 return signal.id_str();
102 }
103
104 bool await_ready() const noexcept
105 {
106 return true;
107 }
108
109 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
110
111 decltype(auto) await_resume() const
112 {
113 return signal.wait();
114 }
115 };
116
117 return impl{signal};
118}
119
120template <typename T>
121static inline decltype(auto) awaitable(zth::Future<T>& future) noexcept
122{
123 struct impl {
125
126 char const* id_str() const noexcept
127 {
128 return future.id_str();
129 }
130
131 bool await_ready() const noexcept
132 {
133 return true;
134 }
135
136 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
137
138 decltype(auto) await_resume() const
139 {
140 return *future;
141 }
142 };
143
144 return impl{future};
145}
146
147template <typename T>
148static inline decltype(auto) awaitable(zth::Mailbox<T>& mailbox) noexcept
149{
150 struct impl {
151 zth::Mailbox<T>& mailbox;
152
153 char const* id_str() const noexcept
154 {
155 return mailbox.id_str();
156 }
157
158 bool await_ready() const noexcept
159 {
160 return true;
161 }
162
163 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
164
165 decltype(auto) await_resume() const
166 {
167 return mailbox.take();
168 }
169 };
170
171 return impl{mailbox};
172}
173
174static inline decltype(auto) awaitable(zth::Gate& gate) noexcept
175{
176 struct impl {
177 zth::Gate& gate;
178
179 char const* id_str() const noexcept
180 {
181 return gate.id_str();
182 }
183
184 bool await_ready() const noexcept
185 {
186 return true;
187 }
188
189 void await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept {}
190
191 void await_resume() const
192 {
193 gate.wait();
194 }
195 };
196
197 return impl{gate};
198}
199
200
201
203// Promise base classes
204//
205
206template <typename Promise, typename Awaitable>
208 Promise& promise;
209 Awaitable awaitable;
210 bool suspended = false;
211
212 char const* id_str() const noexcept requires(requires(Awaitable a) { a.id_str(); })
213 {
214 return awaitable.id_str();
215 }
216
217 decltype(auto) await_ready() noexcept
218 {
219 return awaitable.await_ready();
220 }
221
222 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
223 {
224 zth_assert(promise.typedHandle().address() == h.address());
225
226 if constexpr(requires(Awaitable a) { a.id_str(); })
227 zth_dbg(coro, "[%s] Await suspend by %s", promise.id_str(),
228 awaitable.id_str());
229 else
230 zth_dbg(coro, "[%s] Await suspend", promise.id_str());
231
232 suspended = true;
233 promise.running(false);
234
235 if constexpr(std::is_void_v<decltype(awaitable.await_suspend(h))>) {
236 awaitable.await_suspend(h);
237 return h;
238 } else if constexpr(std::is_same_v<bool, decltype(awaitable.await_suspend(h))>) {
239 awaitable.await_suspend(h);
240 return h;
241 } else if constexpr(std::is_convertible_v<
242 decltype(awaitable.await_suspend(h)),
243 std::coroutine_handle<>>) {
244 auto c = awaitable.await_suspend(h);
245 return c ? c : h;
246 } else {
247 static_assert(
248 (sizeof(Awaitable), false),
249 "await_suspend must return void, bool, or std::coroutine_handle<>");
250 }
251 }
252
253 decltype(auto) await_resume()
254 {
255 if(suspended) {
256 zth_dbg(coro, "[%s] Await resume", promise.id_str());
257 promise.running(true);
258 }
259 return awaitable.await_resume();
260 }
261};
262
263template <typename Promise, typename Awaitable>
265
266class promise_base : public RefCounted, public UniqueID<promise_base> {
267public:
268 static ZTH_MALLOC_INLINE void operator delete(void* ptr, std::size_t n) noexcept
269 {
270 ::zth::deallocate<char>(static_cast<char*>(ptr), n);
271 }
272
273 ZTH_MALLOC_ATTR((malloc, alloc_size(1)))
274 __attribute__((returns_nonnull, warn_unused_result)) static void*
275 operator new(std::size_t n)
276 {
277 return ::zth::allocate<char>(n);
278 }
279
280 virtual ~promise_base() noexcept override = default;
281
282 virtual bool running() const noexcept = 0;
283 virtual std::coroutine_handle<> handle() noexcept = 0;
284
285 std::coroutine_handle<> continuation() noexcept
286 {
287 auto c = m_continuation;
289 return c ? c : std::noop_coroutine();
290 }
291
292 bool has_continuation() const noexcept
293 {
294 return static_cast<bool>(m_continuation);
295 }
296
297 void set_continuation(std::coroutine_handle<> cont = {}) noexcept
298 {
299 zth_assert(!m_continuation || !cont || m_continuation == cont);
300 m_continuation = cont;
301 }
302
303protected:
304 explicit promise_base(cow_string const& name)
305 : UniqueID{name}
306 {}
307
308private:
309 std::coroutine_handle<> m_continuation;
310};
311
312template <typename Promise>
313class promise : public promise_base {
314public:
315 using promise_type = Promise;
316
317protected:
318 explicit promise(cow_string const& name)
320 {}
321
322public:
323 virtual ~promise() noexcept override
324 {
326 }
327
328 __attribute__((pure)) auto typedHandle() noexcept
329 {
330 return std::coroutine_handle<promise_type>::from_promise(self());
331 }
332
333 virtual std::coroutine_handle<> handle() noexcept final
334 {
335 return typedHandle();
336 }
337
338 decltype(auto) initial_suspend() noexcept
339 {
340 struct awaiter {
341 promise& p;
342
343 bool await_ready() noexcept
344 {
345 return false;
346 }
347
348 void await_suspend(std::coroutine_handle<>) noexcept {}
349
350 void await_resume() noexcept
351 {
352 p.running(true);
353 zth_dbg(coro, "[%s] Initial resume", p.id_str());
354 }
355 };
356
357 return awaiter{*this};
358 }
359
360 decltype(auto) final_suspend() noexcept
361 {
362 struct impl {
363 promise_type& p;
364
365 bool await_ready() noexcept
366 {
367 return false;
368 }
369
370 std::coroutine_handle<> await_suspend(std::coroutine_handle<>) noexcept
371 {
372 p.running(false);
373 return p.continuation();
374 }
375
376 void await_resume() noexcept {}
377 };
378
379 return impl{self()};
380 }
381
382 template <typename A>
383 decltype(auto) await_transform(A&& a) noexcept
384 {
385 return promise_awaitable{self(), awaitable(std::forward<A>(a))};
386 }
387
388 template <typename P, typename A>
389 friend class promise_awaitable;
390
391 virtual bool running() const noexcept final
392 {
393 return m_running || this->has_continuation();
394 }
395
396 void run()
397 {
398 if(running())
399 // Runs in another fiber.
401
402 auto h = handle();
403 zth_assert(h);
404
405 zth_dbg(coro, "[%s] Run", id_str());
406
407 while(!self().completed())
408 h();
409 }
410
411protected:
412 void running(bool set) noexcept
413 {
414 zth_assert(m_running != set);
415 m_running = set;
416 }
417
418 promise_type const& self() const noexcept
419 {
420 return static_cast<promise_type const&>(*this);
421 }
422
423 promise_type& self() noexcept
424 {
425 return static_cast<promise_type&>(*this);
426 }
427
428private:
429 // Invoked when RefCounted count reaches zero.
430 virtual void cleanup() noexcept final
431 {
432 zth_dbg(coro, "[%s] Cleanup", id_str());
433 handle().destroy();
434 }
435
436private:
437 bool m_running = false;
438};
439
440
441
443// Task
444//
445// A task is a coroutine that produces a single result value (or void).
446//
447
448template <typename T>
449class task_promise;
450
451template <typename Task, typename Fiber>
453 Task task;
455
456 decltype(auto) operator*()
457 {
458 return *task.future();
459 }
460
461 decltype(auto) operator->()
462 {
463 return &*task.future();
464 }
465
466 template <typename M>
467 requires(std::is_base_of_v<FiberManipulator, M>) task_fiber& operator<<(M const& m) &
468 {
470 fiber << m;
471 return *this;
472 }
473
474 template <typename M>
475 requires(std::is_base_of_v<FiberManipulator, M>) task_fiber&& operator<<(M const& m) &&
476 {
477 *this << m;
478 return std::move(*this);
479 }
480
481 operator zth::Fiber&() const noexcept
482 {
484 return fiber;
485 }
486
487 operator typename Task::Future_type &() noexcept
488 {
489 return task.future();
490 }
491
492 decltype(auto) operator<<(asFuture const&)
493 {
495 return task.future();
496 }
497};
498
499template <typename Task, typename Fiber>
501
502template <typename T, typename F>
503static inline void joinable(task_fiber<T, F> const& tf, Gate& g, Hook<Gate&>& join) noexcept
504{
505 (void)join;
506 static_cast<zth::Fiber&>(tf) << passOnExit(g);
507}
508
514template <typename T = void>
515class task {
516public:
517 using return_type = T;
520
521 explicit task(std::coroutine_handle<promise_type> h) noexcept
522 : m_promise{h.promise()}
523 {}
524
525 explicit task(zth::SharedPointer<promise_type> const& p) noexcept
526 : m_promise{p}
527 {}
528
530 : m_promise{std::move(p)}
531 {}
532
533 explicit task(promise_type* p = nullptr) noexcept
534 : m_promise{p}
535 {}
536
537 promise_type* promise() const noexcept
538 {
539 return m_promise.get();
540 }
541
542 char const* id_str() const noexcept
543 {
544 auto* p = promise();
545 return p ? p->id_str() : "coro::task";
546 }
547
548 void setName(string name) noexcept
549 {
550 auto* p = promise();
551 if(p)
552 p->setName(std::move(name));
553 }
554
555 bool completed() const noexcept
556 {
557 auto* p = promise();
558 return !p || p->completed();
559 }
560
562 {
563 auto* p = promise();
564 if(!p)
566
567 return p->future();
568 }
569
570 decltype(auto) result()
571 {
572 return *future();
573 }
574
575 decltype(auto) run()
576 {
577 auto* p = promise();
578 if(!p)
580
581 p->run();
582 return result();
583 }
584
585 decltype(auto) operator()()
586 {
587 return run();
588 }
589
590 decltype(auto) fiber(char const* name = nullptr)
591 {
592 if(!name) {
593 name = "coro::fiber";
594 } else {
595 auto* p = promise();
596 if(p)
597 p->setName(name);
598 }
599
600 return task_fiber{*this, zth::factory([](task t) { t.run(); }, name)(*this)};
601 }
602
603 auto& operator co_await() noexcept
604 {
605 return *this;
606 }
607
608 bool await_ready() const noexcept
609 {
610 return completed() || promise()->running();
611 }
612
613 std::coroutine_handle<promise_type> await_suspend(std::coroutine_handle<> h) noexcept
614 {
615 auto* p = promise();
616 zth_assert(p && !p->running());
617 p->set_continuation(h);
618 return p->typedHandle();
619 }
620
621 decltype(auto) await_resume()
622 {
623 return result();
624 }
625
626private:
628};
629
630template <typename T>
631class task_promise_base : public promise<task_promise<T>> {
632public:
633 using return_type = T;
635 using typename base::promise_type;
638
640 : base{"coro::task"}
641 {}
642
643 virtual ~task_promise_base() noexcept override = default;
644
645 Future_type& future() noexcept
646 {
647 return m_future;
648 }
649
650 bool completed() const noexcept
651 {
652 return m_future.valid();
653 }
654
655 auto get_return_object() noexcept
656 {
657 zth_dbg(coro, "[%s] New %p", this->id_str(), this->handle().address());
658 return task{static_cast<promise_type*>(this)};
659 }
660
661 void unhandled_exception() noexcept
662 {
663 zth_dbg(coro, "[%s] Exit with exception", this->id_str());
664 m_future.set(std::current_exception());
665 }
666
667private:
668 Future_type m_future;
669};
670
671template <typename T>
672class task_promise final : public task_promise_base<T> {
673public:
675 using typename base::Future_type;
676 using typename base::return_type;
677
678 virtual ~task_promise() noexcept override = default;
679
680 template <typename T_>
681 void return_value(T_&& v) noexcept
682 {
683 zth_dbg(coro, "[%s] Returned", this->id_str());
684 this->future().set(std::forward<T_>(v));
685 }
686};
687
688template <>
689class task_promise<void> final : public task_promise_base<void> {
690public:
692 using return_type = void;
693 using typename base::Future_type;
694
695 virtual ~task_promise() noexcept override = default;
696
697 void return_void() noexcept
698 {
699 zth_dbg(coro, "[%s] Returned", this->id_str());
700 this->future().set();
701 }
702};
703
704
705
707// Generators
708//
709// A generator is a coroutine that produces a sequence of values. It does not produce a
710// return value.
711//
712
713template <typename T>
714class Mailbox;
715
716template <typename T>
717class Mailbox : public zth::Mailbox<T> {
719public:
721 using type = typename base::type;
722
723 explicit Mailbox(promise_base& owner, cow_string const& name = "coro::Mailbox")
724 : base{name}
725 , m_owner{&owner}
726 {}
727
728 virtual ~Mailbox() noexcept override = default;
729
730 promise_base& owner() const noexcept
731 {
733 return *m_owner;
734 }
735
736 std::coroutine_handle<> waiter() const noexcept
737 {
738 return m_waiter;
739 }
740
741 bool valid(std::coroutine_handle<> waiter) const noexcept
742 {
743 return base::valid(waiter ? waiter.address() : typename base::assigned_type{});
744 }
745
746 using base::valid;
747
748 void wait(Listable& item, std::coroutine_handle<> waiter) noexcept
749 {
750 this->enqueue(item, this->Queue_Take).user() = waiter.address();
751 zth_dbg(coro, "[%s] Block coro %p", this->id_str(), waiter.address());
752 }
753
754 using base::wait;
755
756 type take(std::coroutine_handle<> waiter)
757 {
758 if(m_waiter == waiter)
759 m_waiter = std::coroutine_handle<>{};
760
761 return base::take(waiter.address());
762 }
763
764 using base::take;
765
766 void abandon()
767 {
769 m_owner = nullptr;
770 }
771
772 bool abandoned() const noexcept
773 {
774 return !m_owner;
775 }
776
777protected:
778 virtual void wait(typename base::assigned_type assigned) override
779 {
780 while(!valid(assigned)) {
781 if(abandoned())
783 this->block((size_t)base::Queue_Take);
784 }
785 }
786
787 virtual typename base::assigned_type
788 wakeup(Listable& item, typename base::queue_type::user_type user,
789 bool prio) noexcept override
790 {
791 if(!user) {
792 // A fiber woke up. Pass through, we are not waiting for fibers
793 // here.
794 return base::wakeup(item, user, prio);
795 }
796
797 zth_dbg(coro, "[%s] Unblock coro %s", this->id_str(), str(user).c_str());
798 m_waiter = std::coroutine_handle<>::from_address(user);
799 owner().set_continuation(m_waiter);
800 return user;
801 }
802
803private:
804 promise_base* m_owner = nullptr;
805 std::coroutine_handle<> m_waiter;
806};
807
808template <typename T>
809static inline decltype(auto) awaitable(zth::coro::Mailbox<T>& mb) noexcept
810{
811 struct impl {
812 Mailbox<T>& mailbox;
813 Listable item{};
814 std::coroutine_handle<> waiter{};
815
816 char const* id_str() const noexcept
817 {
818 return mailbox.owner().id_str();
819 }
820
821 bool await_ready() noexcept
822 {
823 return mailbox.abandoned()
824 || mailbox.valid(typename Mailbox<T>::assigned_type{})
825 || mailbox.owner().running();
826 }
827
828 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
829 {
830 waiter = h;
831 mailbox.wait(item, waiter);
832 return mailbox.owner().handle();
833 }
834
835 decltype(auto) await_resume()
836 {
837 if(mailbox.valid(waiter)) {
838 // Got it.
839 return mailbox.take(waiter);
840 } else if(mailbox.abandoned()) {
841 // The generator has completed. Can't resume.
842 zth_throw(coro_already_completed{});
843 } else {
844 // The generator runs in another fiber. Just suspend our
845 // fiber while waiting.
846 return mailbox.take();
847 }
848 }
849 };
850
851 return impl{mb};
852}
853
854template <typename Generator, typename Fiber>
856 Generator generator;
858
859 template <typename M>
860 requires(std::is_base_of_v<FiberManipulator, M>) generator_fiber& operator<<(M const& m) &
861 {
863 fiber << m;
864 return *this;
865 }
866
867 template <typename M>
868 requires(std::is_base_of_v<FiberManipulator, M>) generator_fiber&& operator<<(M const& m) &&
869 {
870 *this << m;
871 return std::move(*this);
872 }
873
874 operator zth::Fiber&() const noexcept
875 {
877 return fiber;
878 }
879
880private:
881 void operator<<(asFuture const&);
882};
883
884template <typename Generator, typename Fiber>
886
887template <typename G, typename F>
888static inline void joinable(generator_fiber<G, F> const& gf, Gate& g, Hook<Gate&>& join) noexcept
889{
890 (void)join;
891 static_cast<zth::Fiber&>(gf) << passOnExit(g);
892}
893
894template <typename... T>
895class generator_promise;
896
903template <typename... T>
904requires(sizeof...(T) >= 1, (!std::is_void_v<T> && ...)) class generator {
905public:
908
909 explicit generator(std::coroutine_handle<promise_type> h) noexcept
910 : m_promise{h.promise()}
911 {}
912
913 explicit generator(zth::SharedPointer<promise_type> const& p) noexcept
914 : m_promise{p}
915 {}
916
918 : m_promise{std::move(p)}
919 {}
920
921 explicit generator(promise_type* p = nullptr) noexcept
922 : m_promise{p}
923 {}
924
925 promise_type* promise() const noexcept
926 {
927 return m_promise.get();
928 }
929
930 char const* id_str() const noexcept
931 {
932 auto* p = promise();
933 return p ? p->id_str() : "coro::generator";
934 }
935
936 void setName(string name) noexcept
937 {
938 auto* p = promise();
939 if(p)
940 p->setName(std::move(name));
941 }
942
943 bool completed() const noexcept
944 {
945 auto* p = m_promise.get();
946 return !p || p->completed();
947 }
948
949 template <typename U = typename promise_type::first_yield_type>
950 auto& mailbox() const
951 {
952 auto* p = m_promise.get();
953 if(!p)
955
956 return p->template mailbox<U>();
957 }
958
959 template <typename U>
960 decltype(auto) as() const
961 {
962 return mailbox<U>();
963 }
964
965 template <typename U = typename promise_type::first_yield_type>
966 bool valid() const noexcept
967 {
968 auto* p = m_promise.get();
969 return p && p->template mailbox<U>().valid();
970 }
971
972 template <typename U = typename promise_type::first_yield_type>
973 decltype(auto) value()
974 {
975 auto* p = m_promise.get();
976 if(!p)
978
979 return p->template value<U>();
980 }
981
982 template <typename U = typename promise_type::first_yield_type>
983 void generate()
984 {
985 auto* p = m_promise.get();
986 if(!p)
988
989 p->template generate<U>();
990 }
991
992 void run()
993 {
994 auto* p = m_promise.get();
995 if(!p)
997
998 p->run();
999
1000 if(!completed())
1001 return;
1002
1003# ifdef ZTH_FUTURE_EXCEPTION
1004 auto exc = p->mailbox().exception();
1005 if(exc)
1006 std::rethrow_exception(exc);
1007# endif // ZTH_FUTURE_EXCEPTION
1008 }
1009
1010 decltype(auto) fiber(char const* name = nullptr)
1011 {
1012 if(!name) {
1013 name = "coro::fiber";
1014 } else {
1015 auto* p = promise();
1016 if(p)
1017 p->setName(name);
1018 }
1019
1020 return generator_fiber{
1021 *this, zth::factory([](generator g) { g.run(); }, name)(*this)};
1022 }
1023
1025 // Range support
1026 //
1027
1028 decltype(auto) begin()
1029 {
1030 auto* p = m_promise.get();
1031 if(!p)
1033
1034 return p->begin();
1035 }
1036
1038 {
1039 return {};
1040 }
1041
1042private:
1044};
1045
1046template <template <typename...> class G, typename... T>
1047requires(std::is_same_v<std::remove_reference_t<G<T...>>, generator<T...>>) //
1048 static inline decltype(auto) awaitable(G<T...> g) noexcept
1049{
1050 static_assert(
1051 sizeof...(T) == 1, "Use generator.as<type>() to select the yield type to await on");
1052 return awaitable(g.mailbox());
1053}
1054
1055namespace impl {
1056
1057template <typename T>
1058using generator_Mailbox_type = Mailbox<std::conditional_t<
1059 std::is_reference_v<T>, std::reference_wrapper<std::remove_reference_t<T>>, T>>;
1060
1061template <typename A, typename B>
1062inline constexpr bool is_identical_v = std::is_same_v<A, B>;
1063
1064template <typename A, typename B>
1065inline constexpr bool is_similar_v =
1066 is_identical_v<A, B> || std::is_same_v<std::decay_t<A>, std::decay_t<B>>;
1067
1068template <typename A, typename B>
1069inline constexpr bool is_convertible_v = is_similar_v<A, B> || std::is_convertible_v<A, B&>;
1070
1071template <typename Needle, typename... Haystack>
1072struct has_identical_type : std::false_type {};
1073
1074template <typename Needle, typename First, typename... Rest>
1075struct has_identical_type<Needle, First, Rest...>
1076 : std::conditional_t<
1077 is_identical_v<Needle, First>, std::true_type,
1078 has_identical_type<Needle, Rest...>> {};
1079
1080template <typename Needle, typename... Haystack>
1081struct find_identical_type {};
1082
1083template <typename Needle, typename First, typename... Rest>
1084struct find_identical_type<Needle, First, Rest...> {
1085 using type = std::conditional_t<
1086 is_identical_v<Needle, First>, First,
1087 typename find_identical_type<Needle, Rest...>::type>;
1088};
1089
1090template <typename Needle, typename... Haystack>
1091struct has_similar_type : std::false_type {};
1092
1093template <typename Needle, typename First, typename... Rest>
1094struct has_similar_type<Needle, First, Rest...>
1095 : std::conditional_t<
1096 is_similar_v<Needle, First> && !has_identical_type<Needle, Rest...>::value,
1097 std::true_type, has_similar_type<Needle, Rest...>> {};
1098
1099template <typename Needle, typename... Haystack>
1100struct find_similar_type {
1101 using type = void;
1102};
1103
1104template <typename Needle, typename First, typename... Rest>
1105struct find_similar_type<Needle, First, Rest...> {
1106 using type = std::conditional_t<
1107 is_similar_v<Needle, First> && !has_identical_type<Needle, Rest...>::value, First,
1108 typename find_similar_type<Needle, Rest...>::type>;
1109};
1110
1111template <typename Needle, typename... Haystack>
1112struct find_convertible_type {
1113 using type = void;
1114};
1115
1116template <typename Needle, typename First, typename... Rest>
1117struct find_convertible_type<Needle, First, Rest...> {
1118 using type = std::conditional_t<
1119 is_convertible_v<Needle, First> && !has_similar_type<Needle, Rest...>::value, First,
1120 typename find_convertible_type<Needle, Rest...>::type>;
1121};
1122
1123template <typename A, typename T>
1124static inline A repeat(A a)
1125{
1126 return a;
1127}
1128
1129} // namespace impl
1130
1131template <typename Needle, typename... Haystack>
1132struct find_type : impl::find_convertible_type<Needle, Haystack...> {};
1133
1134template <typename... T>
1135class generator_promise final : public promise<generator_promise<T...>> {
1136public:
1137 using Mailbox_types = std::tuple<impl::generator_Mailbox_type<T>...>;
1139 using first_yield_type = typename std::tuple_element_t<0, std::tuple<T...>>;
1141
1143 // Administration
1144 //
1145
1147 : base{"coro::generator"}
1148 , m_mailbox{*impl::repeat<generator_promise*, T>(this)...}
1149 {}
1150
1151 virtual ~generator_promise() noexcept override = default;
1152
1153 template <typename U = first_yield_type>
1154 auto const& mailbox() const noexcept
1155 {
1156 return std::get<impl::generator_Mailbox_type<U>>(m_mailbox);
1157 }
1158
1159 template <typename U = first_yield_type>
1160 auto& mailbox() noexcept
1161 {
1162 return std::get<impl::generator_Mailbox_type<U>>(m_mailbox);
1163 }
1164
1165 bool completed() const noexcept
1166 {
1167 return m_completed;
1168 }
1169
1170 template <typename U = first_yield_type>
1171 bool valid() const noexcept
1172 {
1173 return mailbox<U>().valid() && !completed();
1174 }
1175
1176 template <typename U = first_yield_type>
1177 decltype(auto) value()
1178 {
1179 if(completed())
1181
1182 return mailbox<U>().take();
1183 }
1184
1185 template <typename U = first_yield_type>
1187 {
1188 if(completed())
1190 if(mailbox<U>().valid())
1191 return;
1192 if(this->running())
1193 // Just wait for another fiber to fill the mailbox.
1194 return;
1195
1196 auto h = this->typedHandle();
1197 zth_assert(h && !h.done());
1198
1199 zth_dbg(coro, "[%s] Generate", this->id_str());
1200 while(!mailbox<U>().valid() && !completed()) {
1201 // Always return at the next yield.
1202 this->set_continuation(std::noop_coroutine());
1203 h();
1204 }
1205 }
1206
1207 auto get_return_object() noexcept
1208 {
1209 zth_dbg(coro, "[%s] New %p", this->id_str(), this->handle().address());
1210 return generator{this};
1211 }
1212
1213 std::coroutine_handle<> waiter() const noexcept
1214 {
1215 std::array<std::coroutine_handle<>, sizeof...(T)> waiters = {waiter<T>()...};
1216
1217 for(size_t i = 0; i < sizeof...(T); ++i)
1218 if(waiters[i])
1219 return waiters[i];
1220
1221 return {};
1222 }
1223
1224 template <typename U>
1225 std::coroutine_handle<> waiter() const noexcept
1226 {
1227 return mailbox<U>().waiter();
1228 }
1229
1230 template <typename T_>
1231 decltype(auto) yield_value(T_&& v) noexcept
1232 {
1233 using M = typename find_type<T_, T...>::type;
1234 zth_dbg(coro, "[%s] Yield", this->id_str());
1235 mailbox<M>().put(std::forward<T_>(v));
1236
1237 struct impl {
1238 generator_promise& self;
1239
1240 char const* id_str() const noexcept
1241 {
1242 return "yield";
1243 }
1244
1245 bool await_ready() noexcept
1246 {
1247 return !self.has_continuation();
1248 }
1249
1250 std::coroutine_handle<>
1251 await_suspend(std::coroutine_handle<> UNUSED_PAR(h)) noexcept
1252 {
1253 return self.continuation();
1254 }
1255
1256 void await_resume() noexcept {}
1257 };
1258
1259 return this->await_transform(impl{*this});
1260 }
1261
1262 // cppcheck-suppress duplInheritedMember
1263 decltype(auto) final_suspend() noexcept
1264 {
1265 struct impl {
1266 generator_promise& self;
1267 decltype(std::declval<base>().final_suspend()) awaitable;
1268
1269 bool await_ready() noexcept
1270 {
1271 return awaitable.await_ready();
1272 }
1273
1274 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
1275 {
1276 (void)(..., std::get<zth::coro::impl::generator_Mailbox_type<T>>(
1277 self.m_mailbox)
1278 .abandon());
1279
1280 return awaitable.await_suspend(h);
1281 }
1282
1283 decltype(auto) await_resume() noexcept
1284 {
1285 return awaitable.await_resume();
1286 }
1287 };
1288
1289 return impl{*this, base::final_suspend()};
1290 }
1291
1292 void return_void() noexcept
1293 {
1294 zth_dbg(coro, "[%s] Done", this->id_str());
1295 m_completed = true;
1296 }
1297
1298 void unhandled_exception() noexcept
1299 {
1300 zth_dbg(coro, "[%s] Exit with exception", this->id_str());
1301 m_completed = true;
1302 }
1303
1305 // Range support
1306 //
1307
1308 struct end_type {};
1309
1310 template <typename U>
1311 class iterator {
1312 public:
1314 : m_promise{p}
1315 {}
1316
1317 bool operator==(end_type end) const noexcept
1318 {
1319 return m_promise == end;
1320 }
1321
1322 bool operator!=(end_type end) const noexcept
1323 {
1324 return m_promise != end;
1325 }
1326
1328 {
1329 ++m_promise;
1330 return *this;
1331 }
1332
1333 decltype(auto) operator++(int)
1334 {
1335 return m_promise++;
1336 }
1337
1338 decltype(auto) operator*()
1339 {
1340 return *m_promise;
1341 }
1342
1343 decltype(auto) operator->()
1344 {
1345 return &(*m_promise);
1346 }
1347
1348 private:
1349 generator_promise& m_promise;
1350 };
1351
1352 template <typename U = first_yield_type>
1354 {
1355 if(!completed() && !valid<U>())
1356 generate<U>();
1357 return iterator<U>{*this};
1358 }
1359
1360 end_type end() noexcept
1361 {
1362 return {};
1363 }
1364
1365 bool operator==(end_type) const noexcept
1366 {
1367 return completed();
1368 }
1369
1370 bool operator!=(end_type) const noexcept
1371 {
1372 return !completed();
1373 }
1374
1375 template <typename U = first_yield_type>
1377 {
1378 generate<U>();
1379 return *this;
1380 }
1381
1382 template <typename U = first_yield_type>
1383 decltype(auto) operator++(int)
1384 {
1385 auto temp = *this;
1386 generate<U>();
1387 return temp;
1388 }
1389
1390 template <typename U = first_yield_type>
1391 decltype(auto) operator*()
1392 {
1393 zth_assert(valid<U>());
1394 return value<U>();
1395 }
1396
1397 template <typename U = first_yield_type>
1398 decltype(auto) operator->()
1399 {
1400 zth_assert(valid<U>());
1401 return &value<U>();
1402 }
1403
1404private:
1405 Mailbox_types m_mailbox;
1406 bool m_completed = false;
1407};
1408
1409} // namespace coro
1410} // namespace zth
1411
1412#endif // ZTH_HAVE_CORO
1413#endif // ZTH_CORO_H
#define ZTH_MALLOC_INLINE
Definition allocator.h:28
The fiber.
Definition fiber.h:49
Fiber-aware future.
Definition sync.h:1068
void set(type const &value=type()) noexcept
Definition sync.h:1103
bool valid() const noexcept
Definition sync.h:1087
Fiber-aware barrier/gate.
Definition sync.h:1487
void wait()
Definition sync.h:1518
user_type const & user() const noexcept
Definition list.h:191
Mutex RAII, that locks and unlocks the mutex automatically.
Definition sync.h:566
Fiber-aware mailbox.
Definition sync.h:1299
void wait()
Definition sync.h:1349
bool valid() const noexcept
Definition sync.h:1322
type take()
Definition sync.h:1416
void * assigned_type
Definition sync.h:1306
virtual void wakeup(Listable &item, queue_type::user_type user, bool prio, size_t q) noexcept final
Definition sync.h:1440
Fiber-aware mutex.
Definition sync.h:505
virtual char const * id_str() const noexcept override
Definition util.h:772
string const & name() const noexcept
Definition util.h:746
Fiber-aware semaphore.
Definition sync.h:614
void acquire(count_type count=1)
Definition sync.h:635
Fiber-aware signal.
Definition sync.h:692
void wait()
Definition sync.h:709
queue_type::iterator enqueue(Listable &item, size_t q=0) noexcept
Definition sync.h:296
void block(size_t q=0)
Definition sync.h:301
bool unblockAll(size_t q=0, bool prio=false) noexcept
Definition sync.h:340
friend cow_string str(UniqueIDBase const &)
Keeps track of a process-wide unique ID within the type T.
Definition util.h:860
void wait(Listable &item, std::coroutine_handle<> waiter) noexcept
Definition coro.h:748
Mailbox(promise_base &owner, cow_string const &name="coro::Mailbox")
Definition coro.h:723
std::coroutine_handle waiter() const noexcept
Definition coro.h:736
bool abandoned() const noexcept
Definition coro.h:772
void abandon()
Definition coro.h:766
virtual void wait(typename base::assigned_type assigned) override
Definition coro.h:778
typename base::type type
Definition coro.h:721
virtual ~Mailbox() noexcept override=default
virtual base::assigned_type wakeup(Listable &item, typename base::queue_type::user_type user, bool prio) noexcept override
Definition coro.h:788
promise_base & owner() const noexcept
Definition coro.h:730
bool valid(std::coroutine_handle<> waiter) const noexcept
Definition coro.h:741
bool valid() const noexcept
Definition sync.h:1322
type take(std::coroutine_handle<> waiter)
Definition coro.h:756
iterator(generator_promise &p)
Definition coro.h:1313
bool operator!=(end_type end) const noexcept
Definition coro.h:1322
bool operator==(end_type end) const noexcept
Definition coro.h:1317
auto & mailbox() noexcept
Definition coro.h:1160
bool operator!=(end_type) const noexcept
Definition coro.h:1370
decltype(auto) yield_value(T_ &&v) noexcept
Definition coro.h:1231
void unhandled_exception() noexcept
Definition coro.h:1298
bool operator==(end_type) const noexcept
Definition coro.h:1365
decltype(auto) final_suspend() noexcept
Definition coro.h:1263
end_type end() noexcept
Definition coro.h:1360
bool valid() const noexcept
Definition coro.h:1171
void return_void() noexcept
Definition coro.h:1292
virtual ~generator_promise() noexcept override=default
std::coroutine_handle waiter() const noexcept
Definition coro.h:1213
iterator< U > begin()
Definition coro.h:1353
bool completed() const noexcept
Definition coro.h:1165
typename std::tuple_element_t< 0, std::tuple< T... > > first_yield_type
Definition coro.h:1139
auto get_return_object() noexcept
Definition coro.h:1207
std::tuple< impl::generator_Mailbox_type< T >... > Mailbox_types
Definition coro.h:1137
decltype(auto) value()
Definition coro.h:1177
generator_promise & operator++()
Definition coro.h:1376
std::coroutine_handle waiter() const noexcept
Definition coro.h:1225
auto const & mailbox() const noexcept
Definition coro.h:1154
A coroutine generator producing a sequence of values.
Definition coro.h:904
decltype(auto) fiber(char const *name=nullptr)
Definition coro.h:1010
decltype(auto) begin()
Definition coro.h:1028
void setName(string name) noexcept
Definition coro.h:936
decltype(auto) as() const
Definition coro.h:960
generator(std::coroutine_handle< promise_type > h) noexcept
Definition coro.h:909
char const * id_str() const noexcept
Definition coro.h:930
decltype(auto) value()
Definition coro.h:973
auto & mailbox() const
Definition coro.h:950
bool completed() const noexcept
Definition coro.h:943
generator(zth::SharedPointer< promise_type > const &p) noexcept
Definition coro.h:913
bool valid() const noexcept
Definition coro.h:966
promise_type::end_type end()
Definition coro.h:1037
generator(promise_type *p=nullptr) noexcept
Definition coro.h:921
generator(zth::SharedPointer< promise_type > &&p) noexcept
Definition coro.h:917
typename promise_type::Mailbox_types Mailbox_types
Definition coro.h:907
promise_type * promise() const noexcept
Definition coro.h:925
std::coroutine_handle continuation() noexcept
Definition coro.h:285
promise_base(cow_string const &name)
Definition coro.h:304
virtual bool running() const noexcept=0
void set_continuation(std::coroutine_handle<> cont={}) noexcept
Definition coro.h:297
ZTH_MALLOC_ATTR((malloc, alloc_size(1))) static void *operator new(std
Definition coro.h:273
virtual std::coroutine_handle handle() noexcept=0
virtual ~promise_base() noexcept override=default
bool has_continuation() const noexcept
Definition coro.h:292
decltype(auto) final_suspend() noexcept
Definition coro.h:360
promise_type & self() noexcept
Definition coro.h:423
decltype(auto) initial_suspend() noexcept
Definition coro.h:338
decltype(auto) await_transform(A &&a) noexcept
Definition coro.h:383
Promise promise_type
Definition coro.h:315
virtual std::coroutine_handle handle() noexcept final
Definition coro.h:333
virtual bool running() const noexcept final
Definition coro.h:391
promise_type const & self() const noexcept
Definition coro.h:418
virtual ~promise() noexcept override
Definition coro.h:323
promise(cow_string const &name)
Definition coro.h:318
void running(bool set) noexcept
Definition coro.h:412
auto typedHandle() noexcept
Definition coro.h:328
virtual ~task_promise() noexcept override=default
void unhandled_exception() noexcept
Definition coro.h:661
bool completed() const noexcept
Definition coro.h:650
Future< return_type > Future_type
Definition coro.h:636
virtual ~task_promise_base() noexcept override=default
Future_type & future() noexcept
Definition coro.h:645
auto get_return_object() noexcept
Definition coro.h:655
Future< return_type > Future_type
Definition coro.h:636
void return_value(T_ &&v) noexcept
Definition coro.h:681
virtual ~task_promise() noexcept override=default
A coroutine task producing a single result value.
Definition coro.h:515
decltype(auto) await_resume()
Definition coro.h:621
task(zth::SharedPointer< promise_type > const &p) noexcept
Definition coro.h:525
char const * id_str() const noexcept
Definition coro.h:542
void setName(string name) noexcept
Definition coro.h:548
decltype(auto) result()
Definition coro.h:570
task(promise_type *p=nullptr) noexcept
Definition coro.h:533
typename promise_type::Future_type Future_type
Definition coro.h:519
decltype(auto) run()
Definition coro.h:575
promise_type * promise() const noexcept
Definition coro.h:537
std::coroutine_handle< promise_type > await_suspend(std::coroutine_handle<> h) noexcept
Definition coro.h:613
bool await_ready() const noexcept
Definition coro.h:608
task(zth::SharedPointer< promise_type > &&p) noexcept
Definition coro.h:529
decltype(auto) fiber(char const *name=nullptr)
Definition coro.h:590
Future_type & future() const
Definition coro.h:561
bool completed() const noexcept
Definition coro.h:555
task(std::coroutine_handle< promise_type > h) noexcept
Definition coro.h:521
Copy-on-write string.
Definition util.h:324
fiber_type< F >::factory factory(F &&f, char const *name=nullptr)
Create a new fiber.
Definition async.h:1131
@ awaitable
Start fiber, returning a future to await.
#define zth_dbg(group, fmt, a...)
Debug printf()-like function.
Definition util.h:194
#define ZTH_CLASS_NEW_DELETE(T)
Define new/delete operators for a class, which are allocator-aware.
Definition allocator.h:159
#define zth_throw(...)
Definition macros.h:365
#define UNUSED_PAR(name)
Definition macros.h:78
STL namespace.
::std::future< zth::type< T > > future
Definition future.h:40
Forces the fiber to have a future that outlives the fiber.
Definition async.h:310
generator_fiber & operator<<(M const &m) &
Definition coro.h:860
decltype(auto) await_resume()
Definition coro.h:253
std::coroutine_handle await_suspend(std::coroutine_handle<> h) noexcept
Definition coro.h:222
char const * id_str() const noexcept
Definition coro.h:212
decltype(auto) await_ready() noexcept
Definition coro.h:217
Exception thrown when a coroutine has already completed.
Definition exception.h:36
Exception thrown when an operation cannot be performed due to an invalid coroutine state.
Definition exception.h:31
Makes the fiber pass the given gate upon exit.
Definition async.h:266
#define zth_assert(expr)
assert(), but better integrated in Zth.
Definition util.h:217