Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
pipeline_loop.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_pipeline/pipeline_loop.h
10//! @brief Base class for pipelines.
11
12#ifndef ROC_PIPELINE_PIPELINE_LOOP_H_
13#define ROC_PIPELINE_PIPELINE_LOOP_H_
14
15#include "roc_audio/frame.h"
17#include "roc_core/atomic.h"
18#include "roc_core/mpsc_queue.h"
19#include "roc_core/mutex.h"
21#include "roc_core/optional.h"
23#include "roc_core/seqlock.h"
24#include "roc_core/time.h"
25#include "roc_packet/units.h"
26#include "roc_pipeline/config.h"
30
31namespace roc {
32namespace pipeline {
33
34//! Base class for task-based pipelines.
35//!
36//! Frames, tasks, and threads
37//! --------------------------
38//!
39//! The pipeline processes frames and tasks. This processing is serialized. At every
40//! moment, the pipeline is either processing a frame, processing a task, or doing
41//! nothing.
42//!
43//! The pipeline does not have its own thread. Both frame and task processing happens
44//! when the user calls one of the pipeline methods, in the context of the caller thread.
45//! Methods may be called from different threads, concurrently. This complicates the
46//! implementation, but allows to have different thread layouts for different use cases.
47//!
48//! Precise task scheduling
49//! -----------------------
50//!
51//! This class implements "precise task scheduling" feature, which tries to schedule task
52//! processing intervals smartly, to prevent time collisions with frame processing and
53//! keep frame processing timings unaffected.
54//!
55//! Precise task scheduling is enabled by default, but can be disabled via config. When
56//! disabled, no special scheduling is performed and frame and task processing compete
57//! each other for the exclusive access to the pipeline.
58//!
59//! The sections below describe various aspects of the implementation.
60//!
61//! Task processing time slices
62//! ---------------------------
63//!
64//! Tasks are processed between frames on dedicated time slices, to ensure that the
65//! task processing wont delay frame processing, which should be as close to real-time
66//! as possible.
67//!
68//! If frame is too large, it's split into sub-frames, to allow task processing between
69//! these sub-frames. This is needed to ensure that the task processing delay would not
70//! be too large, at least while there are not too much tasks.
71//!
72//! If frames are too small, tasks are processing only after some of the frames instead
73//! of after every frame. This is needed to reduce task processing overhead when using
74//! tiny frames.
75//!
76//! There are two types of time slices dedicated for task processing:
77//! - in-frame task processing: short intervals between sub-frames
78//! (inside process_frame_and_tasks())
79//! - inter-frame longer intervals between frames
80//! (inside process_tasks())
81//!
82//! process_frame_and_tasks() calls are to be driven by the user-defined pipeline
83//! clock. It should be called exactly when it's time to process more samples. Our
84//! goal is to provide it exclusive access to the pipeline as fast as possible
85//! immediately after it's called.
86//!
87//! process_tasks() should be called by user when there are pending tasks that should
88//! be processed and when no concurrent process_frame_and_tasks() call is running.
89//! Our goal is to notify the user if and when it should be called.
90//!
91//! Asynchronous task processing
92//! ----------------------------
93//!
94//! Since pipeline does not have its own thread, it can't schedule process_tasks()
95//! invocation by its own. Instead, it relies on the user-provided IPipelineTaskScheduler
96//! object.
97//!
98//! When the pipeline wants to schedule asychronous process_tasks() invocation, it
99//! calls IPipelineTaskScheduler::schedule_task_processing(). It's up to the user when and
100//! on which thread to invoke process_tasks(), but pipeline gives a hint with the ideal
101//! invocation time.
102//!
103//! The pipeline may also cancel the scheduled task processing by invoking
104//! IPipelineTaskScheduler::cancel_task_processing().
105//!
106//! In-place task processing
107//! ------------------------
108//!
109//! If schedule() or schedule_and_wait() is called when the task queue is empty and the
110//! current time point belongs to the task processing time slice, the new task is
111//! processed in-place without waiting for the next process_frame_and_tasks() or
112//! process_tasks() invocation. This allows to avoid extra delays and thread switches
113//! when possible.
114//!
115//! Processing priority
116//! -------------------
117//!
118//! When process_frame_and_tasks() is called, it increments pending_frame_ atomic
119//! and blocks on pipeline_mutex_. The non-zero atomic indicates that a frame needs
120//! to be processed as soon as possible and other methods should give it a way.
121//!
122//! When process_frame_and_tasks() is called, it also cancels any scheduled
123//! asynchronous task processing before starting processing the frame and tasks.
124//! Before exiting, process_frame_and_tasks() checks if there are still some pending
125//! tasks and if necessary, schedules asynchronous execution again.
126//!
127//! When process_tasks() is processing asynchronous tasks, but detects that
128//! process_frame_and_tasks() was invoked concurrently from another thread, it gives
129//! it a way and exits. process_frame_and_tasks() will process the frame and some of
130//! the remaning tasks, and if there are even more tasks remaining, it will invoke
131//! schedule_task_processing() to allow process_tasks() to continue.
132//!
133//! When schedule() and process_tasks() want to invoke schedule_task_processing(), but
134//! detect that process_frame_and_tasks() was invoked concurrently from another thread,
135//! they give it a way and don't call schedule_task_processing(), assuming that
136//! process_frame_and_tasks() will either process all tasks or call
137//! schedule_task_processing() by itself.
138//!
139//! Locking rules
140//! -------------
141//!
142//! pipeline_mutex_ protects the internal pipeline state. It should be acquired to
143//! process a frame or a task.
144//!
145//! scheduler_mutex_ protects IPipelineTaskScheduler invocations. It should be acquired to
146//! schedule or cancel asycnrhonous task processing.
147//!
148//! If pipeline_mutex_ is locked, it's guaranteed that the thread locking it will
149//! check pending tasks after unlocking the mutex and will either process them or
150//! scheduler asynchronous processing.
151//!
152//! If scheduler_mutex_ is locked, it's guaranteed that the thread locking it will
153//! either schedule or cancel asynchronous task processing, depending on whether
154//! there are pending tasks and frames.
155//!
156//! Lock-free operations
157//! --------------------
158//!
159//! schedule() and process_tasks() methods are lock-free. Also, they're either completely
160//! wait-free or "mostly" wait-free (i.e. on the fast path), depending on the hardware
161//! architecture (see comments for core::MpscQueue).
162//!
163//! In practice it means that when running concurrently with other PipelineLoop method
164//! invocations, they never block waiting for other threads, and usually even don't spin.
165//!
166//! This is archived by using a lock-free queue for tasks, atomics for 32-bit counters,
167//! seqlocks for 64-bit counters (which are reduced to atomics on 64-bit CPUs), always
168//! using try_lock() for mutexes and delaying the work if the mutex can't be acquired,
169//! and using semaphores instead of condition variables for signaling (which don't
170//! require blocking on mutex, at least on modern plarforms; e.g. on glibc they're
171//! implemented using an atomic and a futex).
172//!
173//! process_frame_and_tasks() is not lock-free because it has to acquire the pipeline
174//! mutex and can't delay its work. However, the precise task scheduling feature does it
175//! best to ensure that the pipeline mutex will be unlocked when process_frame_and_tasks()
176//! is invoked, thus in most cases it wont block or wait too.
177//!
178//! This approach helps us with our global goal of making all inter-thread interactions
179//! mostly wait-free, so that one thread is never or almost never blocked when another
180//! thead is blocked, preempted, or busy.
181//!
182//! Benchmarks
183//! ----------
184//!
185//! PipelineLoop is covered with two groups of benchmarks:
186//! - bench_pipeline_loop_peak_load.cpp measures frame and task processing delays with
187//! or without task load and with or without precise task scheduling feature;
188//! - bench_pipeline_loop_contention.cpp measures scheduling times under different
189//! contention levels.
190//!
191//! You can run them using "roc-bench-pipeline" command. For further details, see
192//! comments in the source code of the benchmarks.
194public:
195 //! Enqueue a task for asynchronous execution.
197
198 //! Enqueue a task for asynchronous execution and wait until it finishes.
199 //! @returns false if the task fails.
201
202 //! Process some of the enqueued tasks, if any.
204
205protected:
206 //! Task processing statistics.
207 struct Stats {
208 //! Total number of tasks processed.
210
211 //! Number of tasks processed directly in schedule() or schedule_and_wait().
213
214 //! Number of tasks processed in process_frame_and_tasks().
216
217 //! Number of times when other method was preempted by process_frame_and_tasks().
218 uint64_t preemptions;
219
220 //! Number of time when schedule_task_processing() was called.
222
223 //! Number of time when cancel_task_processing() was called.
225
226 Stats()
230 , preemptions(0)
231 , scheduler_calls(0)
233 }
234 };
235
236 //! Initialization.
238 const TaskConfig& config,
239 const audio::SampleSpec& sample_spec);
240
241 virtual ~PipelineLoop();
242
243 //! How much pending tasks are there.
244 size_t num_pending_tasks() const;
245
246 //! How much pending frames are there.
247 size_t num_pending_frames() const;
248
249 //! Get task processing statistics.
250 //! Returned object can't be accessed concurrently with other methods.
251 const Stats& get_stats_ref() const;
252
253 //! Split frame and process subframes and some of the enqueued tasks.
255
256 //! Get current time.
258
259 //! Process subframe.
260 virtual bool process_subframe_imp(audio::Frame& frame) = 0;
261
262 //! Process task.
263 virtual bool process_task_imp(PipelineTask& task) = 0;
264
265private:
266 enum ProcState { ProcNotScheduled, ProcScheduled, ProcRunning };
267
268 bool process_subframes_and_tasks_simple_(audio::Frame& frame);
269 bool process_subframes_and_tasks_precise_(audio::Frame& frame);
270
271 bool schedule_and_maybe_process_task_(PipelineTask& task);
272 bool maybe_process_tasks_();
273
274 void schedule_async_task_processing_();
275 void cancel_async_task_processing_();
276
277 void process_task_(PipelineTask& task, bool notify);
278 bool process_next_subframe_(audio::Frame& frame, size_t* frame_pos);
279
280 bool start_subframe_task_processing_();
281 bool subframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
282
283 core::nanoseconds_t update_next_frame_deadline_(core::nanoseconds_t frame_start_time,
284 size_t frame_size);
285 bool
286 interframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
287
288 void report_stats_();
289
290 // configuration
291 const TaskConfig config_;
292
293 const audio::SampleSpec sample_spec_;
294
295 const size_t min_samples_between_tasks_;
296 const size_t max_samples_between_tasks_;
297
298 const core::nanoseconds_t no_task_proc_half_interval_;
299
300 // used to schedule asynchronous work
301 IPipelineTaskScheduler& scheduler_;
302
303 // protects pipeline state
304 core::Mutex pipeline_mutex_;
305
306 // protects IPipelineTaskScheduler
307 core::Mutex scheduler_mutex_;
308
309 // lock-free queue of pending tasks
311
312 // counter of pending tasks
313 core::Atomic<int> pending_tasks_;
314
315 // counter of pending process_frame_and_tasks() calls blocked on pipeline_mutex_
316 core::Atomic<int> pending_frames_;
317
318 // asynchronous processing state
319 core::Atomic<int> processing_state_;
320
321 // when next frame is expected to be started
322 core::Seqlock<core::nanoseconds_t> next_frame_deadline_;
323
324 // when task processing before next sub-frame ends
325 core::nanoseconds_t subframe_tasks_deadline_;
326
327 // number of samples processed since last in-frame task processing
328 size_t samples_processed_;
329
330 // did we accumulate enough samples in samples_processed_
331 bool enough_samples_to_process_tasks_;
332
333 // task processing statistics
334 core::RateLimiter rate_limiter_;
335 Stats stats_;
336};
337
338} // namespace pipeline
339} // namespace roc
340
341#endif // ROC_PIPELINE_PIPELINE_LOOP_H_
Atomic.
Audio frame.
Definition: frame.h:22
Sample stream specification. Defines sample rate and channel layout.
Definition: sample_spec.h:24
Atomic integer. Provides sequential consistency. For a fine-grained memory order control,...
Definition: atomic.h:26
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
Mutex.
Definition: mutex.h:30
Base class for non-copyable objects.
Definition: noncopyable.h:23
Seqlock.
Definition: seqlock.h:43
Pipeline task completion handler.
Pipeline task scheduler interface. PipelineLoop uses this interface to schedule asynchronous work....
Base class for task-based pipelines.
size_t num_pending_frames() const
How much pending frames are there.
virtual core::nanoseconds_t timestamp_imp() const =0
Get current time.
void schedule(PipelineTask &task, IPipelineTaskCompleter &completer)
Enqueue a task for asynchronous execution.
virtual bool process_task_imp(PipelineTask &task)=0
Process task.
virtual bool process_subframe_imp(audio::Frame &frame)=0
Process subframe.
PipelineLoop(IPipelineTaskScheduler &scheduler, const TaskConfig &config, const audio::SampleSpec &sample_spec)
Initialization.
size_t num_pending_tasks() const
How much pending tasks are there.
void process_tasks()
Process some of the enqueued tasks, if any.
const Stats & get_stats_ref() const
Get task processing statistics. Returned object can't be accessed concurrently with other methods.
bool process_subframes_and_tasks(audio::Frame &frame)
Split frame and process subframes and some of the enqueued tasks.
bool schedule_and_wait(PipelineTask &task)
Enqueue a task for asynchronous execution and wait until it finishes.
Base class for pipeline tasks.
Definition: pipeline_task.h:27
Audio frame.
Pipeline task completion handler.
Pipeline task scheduler interface.
Multi-producer single-consumer queue.
Mutex.
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
Root namespace.
Non-copyable object.
Optionally constructed object.
Base class for pipeline tasks.
Rate limiter.
Pipeline config.
Sample specifications.
Seqlock.
Task processing statistics.
uint64_t task_processed_in_frame
Number of tasks processed in process_frame_and_tasks().
uint64_t preemptions
Number of times when other method was preempted by process_frame_and_tasks().
uint64_t scheduler_cancellations
Number of time when cancel_task_processing() was called.
uint64_t task_processed_in_place
Number of tasks processed directly in schedule() or schedule_and_wait().
uint64_t scheduler_calls
Number of time when schedule_task_processing() was called.
uint64_t task_processed_total
Total number of tasks processed.
Task processing parameters.
Definition: config.h:57
Time definitions.
Various units used in packets.