io_context 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. // <experimental/io_service> -*- C++ -*-
  2. // Copyright (C) 2015-2021 Free Software Foundation, Inc.
  3. //
  4. // This file is part of the GNU ISO C++ Library. This library is free
  5. // software; you can redistribute it and/or modify it under the
  6. // terms of the GNU General Public License as published by the
  7. // Free Software Foundation; either version 3, or (at your option)
  8. // any later version.
  9. // This library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU General Public License for more details.
  13. // Under Section 7 of GPL version 3, you are granted additional
  14. // permissions described in the GCC Runtime Library Exception, version
  15. // 3.1, as published by the Free Software Foundation.
  16. // You should have received a copy of the GNU General Public License and
  17. // a copy of the GCC Runtime Library Exception along with this program;
  18. // see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
  19. // <http://www.gnu.org/licenses/>.
  20. /** @file experimental/io_context
  21. * This is a TS C++ Library header.
  22. * @ingroup networking-ts
  23. */
  24. #ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
  25. #define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1
  26. #pragma GCC system_header
  27. #if __cplusplus >= 201402L
  28. #include <atomic>
  29. #include <chrono>
  30. #include <forward_list>
  31. #include <functional>
  32. #include <system_error>
  33. #include <thread>
  34. #include <vector>
  35. #include <experimental/netfwd>
  36. #include <experimental/executor>
  37. #if _GLIBCXX_HAVE_UNISTD_H
  38. # include <unistd.h>
  39. #endif
  40. #ifdef _GLIBCXX_HAVE_POLL_H
  41. # include <poll.h>
  42. #endif
  43. #ifdef _GLIBCXX_HAVE_FCNTL_H
  44. # include <fcntl.h>
  45. #endif
  46. namespace std _GLIBCXX_VISIBILITY(default)
  47. {
  48. _GLIBCXX_BEGIN_NAMESPACE_VERSION
  49. namespace experimental
  50. {
  51. namespace net
  52. {
  53. inline namespace v1
  54. {
  55. /** @addtogroup networking-ts
  56. * @{
  57. */
  58. class __socket_impl;
  59. /// An ExecutionContext for I/O operations.
  60. class io_context : public execution_context
  61. {
  62. public:
  63. // types:
  64. /// An executor for an io_context.
  65. class executor_type
  66. {
  67. public:
  68. // construct / copy / destroy:
  69. executor_type(const executor_type& __other) noexcept = default;
  70. executor_type(executor_type&& __other) noexcept = default;
  71. executor_type& operator=(const executor_type& __other) noexcept = default;
  72. executor_type& operator=(executor_type&& __other) noexcept = default;
  73. // executor operations:
  74. bool running_in_this_thread() const noexcept
  75. {
  76. #ifdef _GLIBCXX_HAS_GTHREADS
  77. lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
  78. auto __end = _M_ctx->_M_call_stack.end();
  79. return std::find(_M_ctx->_M_call_stack.begin(), __end,
  80. this_thread::get_id()) != __end;
  81. #else
  82. return _M_ctx->_M_run_count != 0;
  83. #endif
  84. }
  85. io_context& context() const noexcept { return *_M_ctx; }
  86. void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
  87. void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }
  88. template<typename _Func, typename _ProtoAllocator>
  89. void
  90. dispatch(_Func&& __f, const _ProtoAllocator& __a) const
  91. {
  92. if (running_in_this_thread())
  93. decay_t<_Func>{std::forward<_Func>(__f)}();
  94. else
  95. post(std::forward<_Func>(__f), __a);
  96. }
  97. template<typename _Func, typename _ProtoAllocator>
  98. void
  99. post(_Func&& __f, const _ProtoAllocator& __a) const
  100. {
  101. lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
  102. // TODO (re-use functionality in system_context)
  103. _M_ctx->_M_reactor._M_notify();
  104. }
  105. template<typename _Func, typename _ProtoAllocator>
  106. void
  107. defer(_Func&& __f, const _ProtoAllocator& __a) const
  108. { post(std::forward<_Func>(__f), __a); }
  109. private:
  110. friend io_context;
  111. explicit
  112. executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }
  113. io_context* _M_ctx;
  114. };
  115. using count_type = size_t;
  116. // construct / copy / destroy:
  117. io_context() : _M_work_count(0) { }
  118. explicit
  119. io_context(int __concurrency_hint) : _M_work_count(0) { }
  120. io_context(const io_context&) = delete;
  121. io_context& operator=(const io_context&) = delete;
  122. // io_context operations:
  123. executor_type get_executor() noexcept { return executor_type(*this); }
  124. count_type
  125. run()
  126. {
  127. count_type __n = 0;
  128. while (run_one())
  129. if (__n != numeric_limits<count_type>::max())
  130. ++__n;
  131. return __n;
  132. }
  133. template<typename _Rep, typename _Period>
  134. count_type
  135. run_for(const chrono::duration<_Rep, _Period>& __rel_time)
  136. { return run_until(chrono::steady_clock::now() + __rel_time); }
  137. template<typename _Clock, typename _Duration>
  138. count_type
  139. run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
  140. {
  141. count_type __n = 0;
  142. while (run_one_until(__abs_time))
  143. if (__n != numeric_limits<count_type>::max())
  144. ++__n;
  145. return __n;
  146. }
  147. count_type
  148. run_one()
  149. { return _M_do_one(chrono::milliseconds{-1}); }
  150. template<typename _Rep, typename _Period>
  151. count_type
  152. run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
  153. { return run_one_until(chrono::steady_clock::now() + __rel_time); }
  154. template<typename _Clock, typename _Duration>
  155. count_type
  156. run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
  157. {
  158. auto __now = _Clock::now();
  159. while (__now < __abs_time)
  160. {
  161. using namespace std::chrono;
  162. auto __ms = duration_cast<milliseconds>(__abs_time - __now);
  163. if (_M_do_one(__ms))
  164. return 1;
  165. __now = _Clock::now();
  166. }
  167. return 0;
  168. }
  169. count_type
  170. poll()
  171. {
  172. count_type __n = 0;
  173. while (poll_one())
  174. if (__n != numeric_limits<count_type>::max())
  175. ++__n;
  176. return __n;
  177. }
  178. count_type
  179. poll_one()
  180. { return _M_do_one(chrono::milliseconds{0}); }
  181. void stop()
  182. {
  183. lock_guard<execution_context::mutex_type> __lock(_M_mtx);
  184. _M_stopped = true;
  185. _M_reactor._M_notify();
  186. }
  187. bool stopped() const noexcept
  188. {
  189. lock_guard<execution_context::mutex_type> __lock(_M_mtx);
  190. return _M_stopped;
  191. }
  192. void restart()
  193. {
  194. _M_stopped = false;
  195. }
  196. private:
  197. template<typename _Clock, typename _WaitTraits>
  198. friend class basic_waitable_timer;
  199. friend __socket_impl;
  200. template<typename _Protocol>
  201. friend class __basic_socket_impl;
  202. template<typename _Protocol>
  203. friend class basic_socket;
  204. template<typename _Protocol>
  205. friend class basic_datagram_socket;
  206. template<typename _Protocol>
  207. friend class basic_stream_socket;
  208. template<typename _Protocol>
  209. friend class basic_socket_acceptor;
  210. count_type
  211. _M_outstanding_work() const
  212. { return _M_work_count + !_M_ops.empty(); }
  213. struct __timer_queue_base : execution_context::service
  214. {
  215. // return milliseconds until next timer expires, or milliseconds::max()
  216. virtual chrono::milliseconds _M_next() const = 0;
  217. virtual bool run_one() = 0;
  218. protected:
  219. explicit
  220. __timer_queue_base(execution_context& __ctx) : service(__ctx)
  221. {
  222. auto& __ioc = static_cast<io_context&>(__ctx);
  223. lock_guard<execution_context::mutex_type> __lock(__ioc._M_mtx);
  224. __ioc._M_timers.push_back(this);
  225. }
  226. mutable execution_context::mutex_type _M_qmtx;
  227. };
  228. template<typename _Timer, typename _Key = typename _Timer::_Key>
  229. struct __timer_queue : __timer_queue_base
  230. {
  231. using key_type = __timer_queue;
  232. explicit
  233. __timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
  234. { }
  235. void shutdown() noexcept { }
  236. io_context& context() noexcept
  237. { return static_cast<io_context&>(service::context()); }
  238. // Start an asynchronous wait.
  239. void
  240. push(const _Timer& __t, function<void(error_code)> __h)
  241. {
  242. context().get_executor().on_work_started();
  243. lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
  244. _M_queue.emplace(__t, _M_next_id++, std::move(__h));
  245. // no need to notify reactor unless this timer went to the front?
  246. }
  247. // Cancel all outstanding waits for __t
  248. size_t
  249. cancel(const _Timer& __t)
  250. {
  251. lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
  252. size_t __count = 0;
  253. auto __last = _M_queue.end();
  254. for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
  255. ++__it)
  256. {
  257. if (__it->_M_key == __t._M_key.get())
  258. {
  259. __it->cancel();
  260. __last = __it;
  261. ++__count;
  262. }
  263. }
  264. if (__count)
  265. _M_queue._M_sort_to(__last);
  266. return __count;
  267. }
  268. // Cancel oldest outstanding wait for __t
  269. bool
  270. cancel_one(const _Timer& __t)
  271. {
  272. lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
  273. const auto __end = _M_queue.end();
  274. auto __oldest = __end;
  275. for (auto __it = _M_queue.begin(); __it != __end; ++__it)
  276. if (__it->_M_key == __t._M_key.get())
  277. if (__oldest == __end || __it->_M_id < __oldest->_M_id)
  278. __oldest = __it;
  279. if (__oldest == __end)
  280. return false;
  281. __oldest->cancel();
  282. _M_queue._M_sort_to(__oldest);
  283. return true;
  284. }
  285. chrono::milliseconds
  286. _M_next() const override
  287. {
  288. typename _Timer::time_point __exp;
  289. {
  290. lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
  291. if (_M_queue.empty())
  292. return chrono::milliseconds::max(); // no pending timers
  293. if (_M_queue.top()._M_key == nullptr)
  294. return chrono::milliseconds::zero(); // cancelled, run now
  295. __exp = _M_queue.top()._M_expiry;
  296. }
  297. auto __dur = _Timer::traits_type::to_wait_duration(__exp);
  298. if (__dur < __dur.zero())
  299. __dur = __dur.zero();
  300. return chrono::duration_cast<chrono::milliseconds>(__dur);
  301. }
  302. private:
  303. bool run_one() override
  304. {
  305. auto __now = _Timer::clock_type::now();
  306. function<void(error_code)> __h;
  307. error_code __ec;
  308. {
  309. lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
  310. if (_M_queue.top()._M_key == nullptr) // cancelled
  311. {
  312. __h = std::move(_M_queue.top()._M_h);
  313. __ec = std::make_error_code(errc::operation_canceled);
  314. _M_queue.pop();
  315. }
  316. else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
  317. {
  318. __h = std::move(_M_queue.top()._M_h);
  319. _M_queue.pop();
  320. }
  321. }
  322. if (__h)
  323. {
  324. __h(__ec);
  325. context().get_executor().on_work_finished();
  326. return true;
  327. }
  328. return false;
  329. }
  330. using __timer_id_type = uint64_t;
  331. struct __pending_timer
  332. {
  333. __pending_timer(const _Timer& __t, uint64_t __id,
  334. function<void(error_code)> __h)
  335. : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
  336. _M_h(std::move(__h))
  337. { }
  338. typename _Timer::time_point _M_expiry;
  339. _Key* _M_key;
  340. __timer_id_type _M_id;
  341. function<void(error_code)> _M_h;
  342. void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }
  343. bool
  344. operator<(const __pending_timer& __rhs) const
  345. { return _M_expiry < __rhs._M_expiry; }
  346. };
  347. struct __queue : priority_queue<__pending_timer>
  348. {
  349. using iterator =
  350. typename priority_queue<__pending_timer>::container_type::iterator;
  351. // expose begin/end/erase for direct access to underlying container
  352. iterator begin() { return this->c.begin(); }
  353. iterator end() { return this->c.end(); }
  354. iterator erase(iterator __it) { return this->c.erase(__it); }
  355. void
  356. _M_sort_to(iterator __it)
  357. { std::stable_sort(this->c.begin(), ++__it); }
  358. };
  359. __queue _M_queue;
  360. __timer_id_type _M_next_id = 0;
  361. };
  362. template<typename _Timer, typename _CompletionHandler>
  363. void
  364. async_wait(const _Timer& __timer, _CompletionHandler&& __h)
  365. {
  366. auto& __queue = use_service<__timer_queue<_Timer>>(*this);
  367. __queue.push(__timer, std::move(__h));
  368. _M_reactor._M_notify();
  369. }
  370. // Cancel all wait operations initiated by __timer.
  371. template<typename _Timer>
  372. size_t
  373. cancel(const _Timer& __timer)
  374. {
  375. if (!has_service<__timer_queue<_Timer>>(*this))
  376. return 0;
  377. auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
  378. if (__c != 0)
  379. _M_reactor._M_notify();
  380. return __c;
  381. }
  382. // Cancel the oldest wait operation initiated by __timer.
  383. template<typename _Timer>
  384. size_t
  385. cancel_one(const _Timer& __timer)
  386. {
  387. if (!has_service<__timer_queue<_Timer>>(*this))
  388. return 0;
  389. if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
  390. {
  391. _M_reactor._M_notify();
  392. return 1;
  393. }
  394. return 0;
  395. }
  396. template<typename _Op>
  397. void
  398. async_wait(int __fd, int __w, _Op&& __op)
  399. {
  400. lock_guard<execution_context::mutex_type> __lock(_M_mtx);
  401. // TODO need push_back, use std::list not std::forward_list
  402. auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
  403. while (__it != _M_ops.end())
  404. {
  405. ++__it;
  406. ++__tail;
  407. }
  408. using __type = __async_operation_impl<_Op>;
  409. _M_ops.emplace_after(__tail,
  410. make_unique<__type>(std::move(__op), __fd, __w));
  411. _M_reactor._M_fd_interest(__fd, __w);
  412. }
  413. void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
  414. void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }
  415. void cancel(int __fd, error_code&)
  416. {
  417. lock_guard<execution_context::mutex_type> __lock(_M_mtx);
  418. const auto __end = _M_ops.end();
  419. auto __it = _M_ops.begin();
  420. auto __prev = _M_ops.before_begin();
  421. while (__it != __end && (*__it)->_M_is_cancelled())
  422. {
  423. ++__it;
  424. ++__prev;
  425. }
  426. auto __cancelled = __prev;
  427. while (__it != __end)
  428. {
  429. if ((*__it)->_M_fd == __fd)
  430. {
  431. (*__it)->cancel();
  432. ++__it;
  433. _M_ops.splice_after(__cancelled, _M_ops, __prev);
  434. ++__cancelled;
  435. }
  436. else
  437. {
  438. ++__it;
  439. ++__prev;
  440. }
  441. }
  442. _M_reactor._M_not_interested(__fd);
  443. }
  444. struct __async_operation
  445. {
  446. __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }
  447. virtual ~__async_operation() = default;
  448. int _M_fd;
  449. short _M_ev;
  450. void cancel() { _M_fd = -1; }
  451. bool _M_is_cancelled() const { return _M_fd == -1; }
  452. virtual void run(io_context&) = 0;
  453. };
  454. template<typename _Op>
  455. struct __async_operation_impl : __async_operation
  456. {
  457. __async_operation_impl(_Op&& __op, int __fd, int __ev)
  458. : __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }
  459. _Op _M_op;
  460. void run(io_context& __ctx)
  461. {
  462. if (_M_is_cancelled())
  463. _M_op(std::make_error_code(errc::operation_canceled));
  464. else
  465. _M_op(error_code{});
  466. }
  467. };
  468. atomic<count_type> _M_work_count;
  469. mutable execution_context::mutex_type _M_mtx;
  470. queue<function<void()>> _M_op;
  471. bool _M_stopped = false;
  472. struct __monitor
  473. {
  474. __monitor(io_context& __c) : _M_ctx(__c)
  475. {
  476. #ifdef _GLIBCXX_HAS_GTHREADS
  477. lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
  478. _M_ctx._M_call_stack.push_back(this_thread::get_id());
  479. #else
  480. _M_ctx._M_run_count++;
  481. #endif
  482. }
  483. ~__monitor()
  484. {
  485. #ifdef _GLIBCXX_HAS_GTHREADS
  486. lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
  487. _M_ctx._M_call_stack.pop_back();
  488. #else
  489. _M_ctx._M_run_count--;
  490. #endif
  491. if (_M_ctx._M_outstanding_work() == 0)
  492. {
  493. _M_ctx._M_stopped = true;
  494. _M_ctx._M_reactor._M_notify();
  495. }
  496. }
  497. __monitor(__monitor&&) = delete;
  498. io_context& _M_ctx;
  499. };
  500. bool
  501. _M_do_one(chrono::milliseconds __timeout)
  502. {
  503. const bool __block = __timeout != chrono::milliseconds::zero();
  504. __reactor::__fdvec __fds;
  505. __monitor __mon{*this};
  506. __timer_queue_base* __timerq = nullptr;
  507. unique_ptr<__async_operation> __async_op;
  508. while (true)
  509. {
  510. if (__timerq)
  511. {
  512. if (__timerq->run_one())
  513. return true;
  514. else
  515. __timerq = nullptr;
  516. }
  517. if (__async_op)
  518. {
  519. __async_op->run(*this);
  520. // TODO need to unregister __async_op
  521. return true;
  522. }
  523. chrono::milliseconds __ms{0};
  524. {
  525. lock_guard<execution_context::mutex_type> __lock(_M_mtx);
  526. if (_M_stopped)
  527. return false;
  528. // find first timer with something to do
  529. for (auto __q : _M_timers)
  530. {
  531. auto __next = __q->_M_next();
  532. if (__next == __next.zero()) // ready to run immediately
  533. {
  534. __timerq = __q;
  535. __ms = __next;
  536. break;
  537. }
  538. else if (__next != __next.max() && __block
  539. && (__next < __ms || __timerq == nullptr))
  540. {
  541. __timerq = __q;
  542. __ms = __next;
  543. }
  544. }
  545. if (__timerq && __ms == __ms.zero())
  546. continue; // restart loop to run a timer immediately
  547. if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
  548. {
  549. _M_ops.front().swap(__async_op);
  550. _M_ops.pop_front();
  551. continue;
  552. }
  553. // TODO run any posted items
  554. if (__block)
  555. {
  556. if (__timerq == nullptr)
  557. __ms = __timeout;
  558. else if (__ms.zero() <= __timeout && __timeout < __ms)
  559. __ms = __timeout;
  560. else if (__ms.count() > numeric_limits<int>::max())
  561. __ms = chrono::milliseconds{numeric_limits<int>::max()};
  562. }
  563. // else __ms == 0 and poll() will return immediately
  564. }
  565. auto __res = _M_reactor.wait(__fds, __ms);
  566. if (__res == __reactor::_S_retry)
  567. continue;
  568. if (__res == __reactor::_S_timeout)
  569. {
  570. if (__timerq == nullptr)
  571. return false;
  572. else
  573. continue; // timed out, so restart loop and process the timer
  574. }
  575. __timerq = nullptr;
  576. if (__fds.empty()) // nothing to do
  577. return false;
  578. lock_guard<execution_context::mutex_type> __lock(_M_mtx);
  579. for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
  580. __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
  581. {
  582. auto& __op = **__it;
  583. auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
  584. __op._M_fd,
  585. [](const auto& __p, int __fd) { return __p.fd < __fd; });
  586. if (__pos != __fds.end() && __pos->fd == __op._M_fd
  587. && __pos->revents & __op._M_ev)
  588. {
  589. __it->swap(__async_op);
  590. _M_ops.erase_after(__prev);
  591. break; // restart loop and run op
  592. }
  593. }
  594. }
  595. }
  596. struct __reactor
  597. {
  598. __reactor() : _M_fds(1)
  599. {
  600. int __pipe[2];
  601. if (::pipe(__pipe) == -1)
  602. __throw_system_error(errno);
  603. if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
  604. || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
  605. {
  606. int __e = errno;
  607. ::close(__pipe[0]);
  608. ::close(__pipe[1]);
  609. __throw_system_error(__e);
  610. }
  611. _M_fds.back().events = POLLIN;
  612. _M_fds.back().fd = __pipe[0];
  613. _M_notify_wr = __pipe[1];
  614. }
  615. ~__reactor()
  616. {
  617. ::close(_M_fds.back().fd);
  618. ::close(_M_notify_wr);
  619. }
  620. // write a notification byte to the pipe (ignoring errors)
  621. void _M_notify()
  622. {
  623. int __n;
  624. do {
  625. __n = ::write(_M_notify_wr, "", 1);
  626. } while (__n == -1 && errno == EINTR);
  627. }
  628. // read all notification bytes from the pipe
  629. void _M_on_notify()
  630. {
  631. // Drain the pipe.
  632. char __buf[64];
  633. ssize_t __n;
  634. do {
  635. __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
  636. } while (__n != -1 || errno == EINTR);
  637. }
  638. void
  639. _M_add_fd(int __fd)
  640. {
  641. auto __pos = _M_lower_bound(__fd);
  642. if (__pos->fd == __fd)
  643. __throw_system_error((int)errc::invalid_argument);
  644. _M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
  645. _M_notify();
  646. }
  647. void
  648. _M_remove_fd(int __fd)
  649. {
  650. auto __pos = _M_lower_bound(__fd);
  651. if (__pos->fd == __fd)
  652. _M_fds.erase(__pos);
  653. // else bug!
  654. _M_notify();
  655. }
  656. void
  657. _M_fd_interest(int __fd, int __w)
  658. {
  659. auto __pos = _M_lower_bound(__fd);
  660. if (__pos->fd == __fd)
  661. __pos->events |= __w;
  662. // else bug!
  663. _M_notify();
  664. }
  665. void
  666. _M_not_interested(int __fd)
  667. {
  668. auto __pos = _M_lower_bound(__fd);
  669. if (__pos->fd == __fd)
  670. __pos->events = 0;
  671. _M_notify();
  672. }
  673. # ifdef _GLIBCXX_HAVE_POLL_H
  674. using __fdvec = vector<::pollfd>;
  675. // Find first element p such that !(p.fd < __fd)
  676. // N.B. always returns a dereferencable iterator.
  677. __fdvec::iterator
  678. _M_lower_bound(int __fd)
  679. {
  680. return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
  681. __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
  682. }
  683. enum __status { _S_retry, _S_timeout, _S_ok, _S_error };
  684. __status
  685. wait(__fdvec& __fds, chrono::milliseconds __timeout)
  686. {
  687. // XXX not thread-safe!
  688. __fds = _M_fds; // take snapshot to pass to poll()
  689. int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());
  690. if (__res == -1)
  691. {
  692. __fds.clear();
  693. if (errno == EINTR)
  694. return _S_retry;
  695. return _S_error; // XXX ???
  696. }
  697. else if (__res == 0)
  698. {
  699. __fds.clear();
  700. return _S_timeout;
  701. }
  702. else if (__fds.back().revents != 0) // something changed, restart
  703. {
  704. __fds.clear();
  705. _M_on_notify();
  706. return _S_retry;
  707. }
  708. auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
  709. [](const __fdvec::value_type& __p) { return __p.revents != 0; });
  710. __fds.erase(__part, __fds.end());
  711. return _S_ok;
  712. }
  713. __fdvec _M_fds; // _M_fds.back() is the read end of the self-pipe
  714. #endif
  715. int _M_notify_wr; // write end of the self-pipe
  716. };
  717. __reactor _M_reactor;
  718. vector<__timer_queue_base*> _M_timers;
  719. forward_list<unique_ptr<__async_operation>> _M_ops;
  720. #ifdef _GLIBCXX_HAS_GTHREADS
  721. vector<thread::id> _M_call_stack;
  722. #else
  723. int _M_run_count = 0;
  724. #endif
  725. };
  726. inline bool
  727. operator==(const io_context::executor_type& __a,
  728. const io_context::executor_type& __b) noexcept
  729. {
  730. // https://github.com/chriskohlhoff/asio-tr2/issues/201
  731. using executor_type = io_context::executor_type;
  732. return std::addressof(executor_type(__a).context())
  733. == std::addressof(executor_type(__b).context());
  734. }
  735. inline bool
  736. operator!=(const io_context::executor_type& __a,
  737. const io_context::executor_type& __b) noexcept
  738. { return !(__a == __b); }
  739. template<> struct is_executor<io_context::executor_type> : true_type {};
  740. /// @}
  741. } // namespace v1
  742. } // namespace net
  743. } // namespace experimental
  744. _GLIBCXX_END_NAMESPACE_VERSION
  745. } // namespace std
  746. #endif // C++14
  747. #endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE