io_service.hpp 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. // MIT License
  2. //
  3. // Copyright (c) 2016-2017 Simon Ninon <simon.ninon@gmail.com>
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  21. // SOFTWARE.
  22. #pragma once
  23. #include <atomic>
  24. #include <condition_variable>
  25. #include <functional>
  26. #include <memory>
  27. #include <mutex>
  28. #include <thread>
  29. #include <unordered_map>
  30. #include <vector>
  31. #ifdef _WIN32
  32. #include <Winsock2.h>
  33. #else
  34. #include <sys/select.h>
  35. #endif /* _WIN32 */
  36. #include <tacopie/network/self_pipe.hpp>
  37. #include <tacopie/network/tcp_socket.hpp>
  38. #include <tacopie/utils/thread_pool.hpp>
  39. #ifndef __TACOPIE_IO_SERVICE_NB_WORKERS
  40. #define __TACOPIE_IO_SERVICE_NB_WORKERS 1
  41. #endif /* __TACOPIE_IO_SERVICE_NB_WORKERS */
  42. namespace tacopie {
  43. //!
  44. //! service that operates IO Handling.
  45. //! It polls sockets for input and output, processes read and write operations and calls the appropriate callbacks.
  46. //!
  47. class io_service {
  48. public:
  49. //!
  50. //! ctor
  51. //!
  52. io_service(void);
  53. //! dtor
  54. ~io_service(void);
  55. //! copy ctor
  56. io_service(const io_service&) = delete;
  57. //! assignment operator
  58. io_service& operator=(const io_service&) = delete;
  59. public:
  60. //!
  61. //! reset number of io_service workers assigned to this io_service
  62. //! this can be safely called at runtime, even if the io_service is currently running
  63. //! it can be useful if you need to re-adjust the number of workers
  64. //!
  65. //! \param nb_threads number of workers
  66. //!
  67. void set_nb_workers(std::size_t nb_threads);
  68. public:
  69. //! callback handler typedef
  70. //! called on new socket event if register to io_service
  71. typedef std::function<void(fd_t)> event_callback_t;
  72. //!
  73. //! track socket
  74. //! add socket to io_service tracking for read/write operation
  75. //! socket is polled only if read or write callback is defined
  76. //!
  77. //! \param socket socket to be tracked
  78. //! \param rd_callback callback to be executed on read event
  79. //! \param wr_callback callback to be executed on write event
  80. //!
  81. void track(const tcp_socket& socket, const event_callback_t& rd_callback = nullptr, const event_callback_t& wr_callback = nullptr);
  82. //!
  83. //! update the read callback
  84. //! if socket is not tracked yet, track it
  85. //!
  86. //! \param socket socket to be tracked
  87. //! \param event_callback callback to be executed on read event
  88. //!
  89. void set_rd_callback(const tcp_socket& socket, const event_callback_t& event_callback);
  90. //!
  91. //! update the write callback
  92. //! if socket is not tracked yet, track it
  93. //!
  94. //! \param socket socket to be tracked
  95. //! \param event_callback callback to be executed on write event
  96. //!
  97. void set_wr_callback(const tcp_socket& socket, const event_callback_t& event_callback);
  98. //!
  99. //! remove socket from io_service tracking
  100. //! socket is marked for untracking and will effectively be removed asynchronously from tracking once
  101. //! * poll wakes up
  102. //! * no callback are being executed for that socket
  103. //!
  104. //! re-adding track while socket is pending for untrack is fine and will simply cancel the untrack operation
  105. //!
  106. //! \param socket socket to be untracked
  107. //!
  108. void untrack(const tcp_socket& socket);
  109. //!
  110. //! wait until the socket has been effectively removed
  111. //! basically wait until all pending callbacks are executed
  112. //!
  113. //! \param socket socket to wait for
  114. //!
  115. void wait_for_removal(const tcp_socket& socket);
  116. private:
  117. //!
  118. //! struct tracked_socket
  119. //! contains information about what a current socket is tracking
  120. //! * rd_callback: callback to be executed on read availability
  121. //! * is_executing_rd_callback: whether the rd callback is currently being executed or not
  122. //! * wr_callback: callback to be executed on write availability
  123. //! * is_executing_wr_callback: whether the wr callback is currently being executed or not
  124. //! * marked_for_untrack: whether the socket is marked for being untrack (that is, will be untracked whenever all the callback completed their execution)
  125. //!
  126. //!
  127. struct tracked_socket {
  128. //! ctor
  129. tracked_socket(void)
  130. : rd_callback(nullptr)
  131. , wr_callback(nullptr) {}
  132. //! rd event
  133. event_callback_t rd_callback;
  134. std::atomic<bool> is_executing_rd_callback = ATOMIC_VAR_INIT(false);
  135. //! wr event
  136. event_callback_t wr_callback;
  137. std::atomic<bool> is_executing_wr_callback = ATOMIC_VAR_INIT(false);
  138. //! marked for untrack
  139. std::atomic<bool> marked_for_untrack = ATOMIC_VAR_INIT(false);
  140. };
  141. private:
  142. //!
  143. //! poll worker function
  144. //! main loop of the background thread in charge of the io_service in charge of polling fds
  145. //!
  146. void poll(void);
  147. //!
  148. //! init m_poll_fds_info
  149. //! simply initialize m_polled_fds variable based on m_tracked_sockets information
  150. //!
  151. //! \return maximum fd value polled
  152. //!
  153. int init_poll_fds_info(void);
  154. //!
  155. //! process poll detected events
  156. //! called whenever select/poll completed to check read and write availablity
  157. //!
  158. void process_events(void);
  159. //!
  160. //! process read event reported by select/poll for a given socket
  161. //!
  162. //! \param fd fd for which a read event has been reported
  163. //! \param socket tracked_socket associated to the given fd
  164. //!
  165. void process_rd_event(const fd_t& fd, tracked_socket& socket);
  166. //!
  167. //! process write event reported by select/poll for a given socket
  168. //!
  169. //! \param fd fd for which a write event has been reported
  170. //! \param socket tracked_socket associated to the given fd
  171. //!
  172. void process_wr_event(const fd_t& fd, tracked_socket& socket);
  173. private:
  174. //!
  175. //! tracked sockets
  176. //!
  177. std::unordered_map<fd_t, tracked_socket> m_tracked_sockets;
  178. //!
  179. //! whether the worker should stop or not
  180. //!
  181. std::atomic<bool> m_should_stop;
  182. //!
  183. //! poll thread
  184. //!
  185. std::thread m_poll_worker;
  186. //!
  187. //! callback workers
  188. //!
  189. utils::thread_pool m_callback_workers;
  190. //!
  191. //! thread safety
  192. //!
  193. std::mutex m_tracked_sockets_mtx;
  194. //!
  195. //! data structure given to select (list of fds to poll)
  196. //!
  197. std::vector<fd_t> m_polled_fds;
  198. //!
  199. //! data structure given to select (list of fds to poll for read)
  200. //!
  201. fd_set m_rd_set;
  202. //!
  203. //! data structure given to select (list of fds to poll for write)
  204. //!
  205. fd_set m_wr_set;
  206. //!
  207. //! condition variable to wait on removal
  208. //!
  209. std::condition_variable m_wait_for_removal_condvar;
  210. //!
  211. //! fd associated to the pipe used to wake up the poll call
  212. //!
  213. tacopie::self_pipe m_notifier;
  214. };
  215. //!
  216. //! default io_service getter & setter
  217. //!
  218. //! \return shared_ptr to the default instance of the io_service
  219. //!
  220. const std::shared_ptr<io_service>& get_default_io_service(void);
  221. //!
  222. //! set the default io_service to be returned by get_default_io_service
  223. //!
  224. //! \param service the service to be used as the default io_service instance
  225. //!
  226. void set_default_io_service(const std::shared_ptr<io_service>& service);
  227. } // namespace tacopie