Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
network_loop.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/network_loop.h
10//! @brief Network event loop thread.
11
12#ifndef ROC_NETIO_NETWORK_LOOP_H_
13#define ROC_NETIO_NETWORK_LOOP_H_
14
15#include <uv.h>
16
18#include "roc_core/atomic.h"
19#include "roc_core/attributes.h"
21#include "roc_core/iarena.h"
22#include "roc_core/list.h"
23#include "roc_core/mpsc_queue.h"
25#include "roc_core/optional.h"
26#include "roc_core/semaphore.h"
27#include "roc_core/thread.h"
30#include "roc_netio/iconn.h"
36#include "roc_netio/resolver.h"
41#include "roc_packet/iwriter.h"
43
44namespace roc {
45namespace netio {
46
47//! Network event loop thread.
48//! @remarks
49//! This class is a task-based facade for the whole roc_netio module.
51 private ICloseHandler,
53 private core::Thread {
54public:
55 //! Opaque port handle.
56 typedef struct PortHandle* PortHandle;
57
58 //! Subclasses for specific tasks.
59 class Tasks {
60 public:
61 //! Add UDP datagram receiver port.
63 public:
64 //! Set task parameters.
65 //! @remarks
66 //! - Updates @p config with the actual bind address.
67 //! - Passes received packets to @p writer. It is called from network thread.
68 //! It should not block the caller.
70
71 //! Get created port handle.
72 //! @pre
73 //! Should be called only if success() is true.
75
76 private:
77 friend class NetworkLoop;
78
79 UdpReceiverConfig* config_;
80 packet::IWriter* writer_;
81 };
82
83 //! Add UDP datagram sender port.
85 public:
86 //! Set task parameters.
87 //! @remarks
88 //! Updates @p config with the actual bind address.
90
91 //! Get created port handle.
92 //! @pre
93 //! Should be called only if success() is true.
95
96 //! Get created port writer;
97 //! @remarks
98 //! The writer can be used to send packets from the port. It may be called
99 //! from any thread. It will not block the caller.
100 //! @pre
101 //! Should be called only if success() is true.
103
104 private:
105 friend class NetworkLoop;
106
107 UdpSenderConfig* config_;
108 packet::IWriter* writer_;
109 };
110
111 //! Add TCP server port.
113 public:
114 //! Set task parameters.
115 //! @remarks
116 //! - Updates @p config with the actual bind address.
117 //! - Listens for incoming connections and passes new connections
118 //! to @p conn_acceptor. It should return handler that will be
119 //! notified when connection state changes.
121
122 //! Get created port handle.
123 //! @pre
124 //! Should be called only if success() is true.
126
127 private:
128 friend class NetworkLoop;
129
130 TcpServerConfig* config_;
131 IConnAcceptor* conn_acceptor_;
132 };
133
134 //! Add TCP client port.
136 public:
137 //! Set task parameters.
138 //! @remarks
139 //! - Updates @p config with the actual bind address.
140 //! - Notofies @p conn_handler when connection state changes.
142
143 //! Get created port handle.
144 //! @pre
145 //! Should be called only if success() is true.
147
148 private:
149 friend class NetworkLoop;
150
151 TcpClientConfig* config_;
152 IConnHandler* conn_handler_;
153 };
154
155 //! Remove port.
156 class RemovePort : public NetworkTask {
157 public:
158 //! Set task parameters.
160
161 private:
162 friend class NetworkLoop;
163 };
164
165 //! Resolve endpoint address.
167 public:
168 //! Set task parameters.
169 //! @remarks
170 //! Gets endpoint hostname, resolves it, and writes the resolved IP address
171 //! and the port from the endpoint to the resulting SocketAddr.
173
174 //! Get resolved address.
175 //! @pre
176 //! Should be called only if success() is true.
178
179 private:
180 friend class NetworkLoop;
181
182 ResolverRequest resolve_req_;
183 };
184 };
185
186 //! Initialize.
187 //! @remarks
188 //! Start background thread if the object was successfully constructed.
190 core::BufferFactory<uint8_t>& buffer_factory,
191 core::IArena& arena);
192
193 //! Destroy. Stop all receivers and senders.
194 //! @remarks
195 //! Wait until background thread finishes.
196 virtual ~NetworkLoop();
197
198 //! Check if the object was successfully constructed.
199 bool is_valid() const;
200
201 //! Get number of receiver and sender ports.
202 size_t num_ports() const;
203
204 //! Enqueue a task for asynchronous execution and return.
205 //! The task should not be destroyed until the callback is called.
206 //! The @p completer will be invoked on event loop thread after the
207 //! task completes.
209
210 //! Enqueue a task for asynchronous execution and wait for its completion.
211 //! The task should not be destroyed until this method returns.
212 //! Should not be called from schedule() callback.
213 //! @returns
214 //! true if the task succeeded or false if it failed.
216
217private:
218 static void task_sem_cb_(uv_async_t* handle);
219 static void stop_sem_cb_(uv_async_t* handle);
220
221 virtual void handle_terminate_completed(IConn&, void*);
222 virtual void handle_close_completed(BasicPort&, void*);
223 virtual void handle_resolved(ResolverRequest& req);
224
225 virtual void run();
226
227 void process_pending_tasks_();
228 void finish_task_(NetworkTask&);
229
230 void async_terminate_conn_port_(const core::SharedPtr<TcpConnectionPort>& port,
231 NetworkTask* task);
232 AsyncOperationStatus async_close_port_(const core::SharedPtr<BasicPort>& port,
233 NetworkTask* task);
234 void finish_closing_port_(const core::SharedPtr<BasicPort>& port, NetworkTask* task);
235
236 void update_num_ports_();
237
238 void close_all_sems_();
239 void close_all_ports_();
240
241 void task_add_udp_receiver_(NetworkTask&);
242 void task_add_udp_sender_(NetworkTask&);
243 void task_remove_port_(NetworkTask&);
244 void task_add_tcp_server_(NetworkTask&);
245 void task_add_tcp_client_(NetworkTask&);
246 void task_resolve_endpoint_address_(NetworkTask&);
247
248 packet::PacketFactory& packet_factory_;
249 core::BufferFactory<uint8_t>& buffer_factory_;
250 core::IArena& arena_;
251
252 bool started_;
253
254 uv_loop_t loop_;
255 bool loop_initialized_;
256
257 uv_async_t stop_sem_;
258 bool stop_sem_initialized_;
259
260 uv_async_t task_sem_;
261 bool task_sem_initialized_;
262
264
265 Resolver resolver_;
266
267 core::List<BasicPort> open_ports_;
268 core::List<BasicPort> closing_ports_;
269
270 core::Atomic<int> num_open_ports_;
271};
272
273} // namespace netio
274} // namespace roc
275
276#endif // ROC_NETIO_NETWORK_LOOP_H_
Atomic.
Compiler attributes.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition: attributes.h:31
Base class for ports.
Buffer factory.
Network endpoint URI.
Definition: endpoint_uri.h:27
Socket address.
Definition: socket_addr.h:26
Memory arena interface.
Definition: iarena.h:23
Intrusive doubly-linked list.
Definition: list.h:35
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
Shared ownership intrusive pointer.
Definition: shared_ptr.h:32
Base class for thread objects.
Definition: thread.h:27
Base class for ports.
Definition: basic_port.h:40
Close handler interface.
Connection acceptor interface.
Connection event handler interface.
Definition: iconn_handler.h:60
Connection interface.
Definition: iconn.h:30
Network task completion handler.
Resolver request result handler interface.
Termination handler interface.
PortHandle get_handle() const
Get created port handle.
AddTcpClientPort(TcpClientConfig &config, IConnHandler &conn_handler)
Set task parameters.
AddTcpServerPort(TcpServerConfig &config, IConnAcceptor &conn_acceptor)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
AddUdpReceiverPort(UdpReceiverConfig &config, packet::IWriter &writer)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
PortHandle get_handle() const
Get created port handle.
AddUdpSenderPort(UdpSenderConfig &config)
Set task parameters.
packet::IWriter * get_writer() const
Get created port writer;.
RemovePort(PortHandle handle)
Set task parameters.
const address::SocketAddr & get_address() const
Get resolved address.
ResolveEndpointAddress(const address::EndpointUri &endpoint_uri)
Set task parameters.
Subclasses for specific tasks.
Definition: network_loop.h:59
Network event loop thread.
Definition: network_loop.h:53
bool is_valid() const
Check if the object was successfully constructed.
size_t num_ports() const
Get number of receiver and sender ports.
ROC_ATTR_NODISCARD bool schedule_and_wait(NetworkTask &task)
Enqueue a task for asynchronous execution and wait for its completion. The task should not be destroy...
void schedule(NetworkTask &task, INetworkTaskCompleter &completer)
Enqueue a task for asynchronous execution and return. The task should not be destroyed until the call...
NetworkLoop(packet::PacketFactory &packet_factory, core::BufferFactory< uint8_t > &buffer_factory, core::IArena &arena)
Initialize.
virtual ~NetworkLoop()
Destroy. Stop all receivers and senders.
struct PortHandle * PortHandle
Opaque port handle.
Definition: network_loop.h:56
Base class for network loop tasks.
Definition: network_task.h:29
Hostname resolver.
Definition: resolver.h:25
Packet writer interface.
Definition: iwriter.h:23
Memory arena interface.
Close handler interface.
Connection interface.
Connection acceptor interface.
Connection event handler interface.
Network task completion handler.
Termination handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
MpscQueue node.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Network task.
Optionally constructed object.
Packet factory.
Hostname resolver.
Socket address.
TCP connection parameters.
TCP server parameters.
UDP receiver parameters.
UDP sender parameters.
TCP connection.
Thread.
UDP receiver.