Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
private_server.cpp
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "../rml/include/rml_tbb.h"
18#include "../rml/server/thread_monitor.h"
19#include "tbb/atomic.h"
21#include "scheduler_common.h"
22#include "governor.h"
23#include "tbb_misc.h"
24
25using rml::internal::thread_monitor;
26
27namespace tbb {
28namespace internal {
29namespace rml {
30
31typedef thread_monitor::handle_type thread_handle;
32
33class private_server;
34
36private:
38
44 enum state_t {
53 };
54 atomic<state_t> my_state;
55
58
60 tbb_client& my_client;
61
63 const size_t my_index;
64
66
68 thread_monitor my_thread_monitor;
69
72
75
76 friend class private_server;
77
79 void run();
80
82 void wake_or_launch();
83
85 void start_shutdown();
86
87 static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
88
89 static void release_handle(thread_handle my_handle, bool join);
90
91protected:
92 private_worker( private_server& server, tbb_client& client, const size_t i ) :
93 my_server(server), my_client(client), my_index(i),
95 {
97 }
98};
99
101
102
103#if _MSC_VER && !defined(__INTEL_COMPILER)
104 // Suppress overzealous compiler warnings about uninstantiable class
105 #pragma warning(push)
106 #pragma warning(disable:4510 4610)
107#endif
110public:
111 padded_private_worker( private_server& server, tbb_client& client, const size_t i )
112 : private_worker(server,client,i) { suppress_unused_warning(pad); }
113};
114#if _MSC_VER && !defined(__INTEL_COMPILER)
115 #pragma warning(pop)
116#endif
117
118class private_server: public tbb_server, no_copy {
119private:
120 tbb_client& my_client;
122
123 const tbb_client::size_type my_n_thread;
124
126 const size_t my_stack_size;
127
129
133 atomic<int> my_slack;
134
136 atomic<int> my_ref_count;
137
139
141 tbb::atomic<private_worker*> my_asleep_list_root;
142
146
147#if TBB_USE_ASSERT
148 atomic<int> my_net_slack_requests;
149#endif /* TBB_USE_ASSERT */
150
152
155 // First test of a double-check idiom. Second test is inside wake_some(0).
157 wake_some(0);
158 }
159
162
164 void wake_some( int additional_slack );
165
166 virtual ~private_server();
167
169 if( --my_ref_count==0 ) {
170 my_client.acknowledge_close_connection();
171 this->~private_server();
173 }
174 }
175
176 friend class private_worker;
177public:
178 private_server( tbb_client& client );
179
180 version_type version() const __TBB_override {
181 return 0;
182 }
183
184 void request_close_connection( bool /*exiting*/ ) __TBB_override {
185 for( size_t i=0; i<my_n_thread; ++i )
186 my_thread_array[i].start_shutdown();
188 }
189
191
193
195
197
198#if _WIN32||_WIN64
199 void register_master ( ::rml::server::execution_resource_t& ) __TBB_override {}
200 void unregister_master ( ::rml::server::execution_resource_t ) __TBB_override {}
201#endif /* _WIN32||_WIN64 */
202};
203
204//------------------------------------------------------------------------
205// Methods of private_worker
206//------------------------------------------------------------------------
207#if _MSC_VER && !defined(__INTEL_COMPILER)
208 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
209 #pragma warning(push)
210 #pragma warning(disable:4189)
211#endif
212#if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
213// ensure that stack is properly aligned for TBB threads
214__attribute__((force_align_arg_pointer))
215#endif
216__RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
217 private_worker* self = static_cast<private_worker*>(arg);
218 AVOID_64K_ALIASING( self->my_index );
219 self->run();
220 return 0;
221}
222#if _MSC_VER && !defined(__INTEL_COMPILER)
223 #pragma warning(pop)
224#endif
225
227 if (join)
228 thread_monitor::join(handle);
229 else
230 thread_monitor::detach_thread(handle);
231}
232
234 state_t s;
235
236 do {
237 s = my_state;
238 __TBB_ASSERT( s!=st_quit, NULL );
239 } while( my_state.compare_and_swap( st_quit, s )!=s );
240 if( s==st_normal || s==st_starting ) {
241 // May have invalidated invariant for sleeping, so wake up the thread.
242 // Note that the notify() here occurs without maintaining invariants for my_slack.
243 // It does not matter, because my_state==st_quit overrides checking of my_slack.
244 my_thread_monitor.notify();
245 // Do not need release handle in st_init state,
246 // because in this case the thread wasn't started yet.
247 // For st_starting release is done at launch site.
248 if (s==st_normal)
250 } else if( s==st_init ) {
251 // Perform action that otherwise would be performed by associated thread when it quits.
253 }
254}
255
258
259 // Transiting to st_normal here would require setting my_handle,
260 // which would create race with the launching thread and
261 // complications in handle management on Windows.
262
263 ::rml::job& j = *my_client.create_one_job();
264 while( my_state!=st_quit ) {
265 if( my_server.my_slack>=0 ) {
266 my_client.process(j);
267 } else {
268 thread_monitor::cookie c;
269 // Prepare to wait
270 my_thread_monitor.prepare_wait(c);
271 // Check/set the invariant for sleeping
273 my_thread_monitor.commit_wait(c);
274 __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
276 } else {
277 // Invariant broken
278 my_thread_monitor.cancel_wait();
279 }
280 }
281 }
282 my_client.cleanup(j);
283
286}
287
289 if( my_state==st_init && my_state.compare_and_swap( st_starting, st_init )==st_init ) {
290 // after this point, remove_server_ref() must be done by created thread
291#if USE_WINTHREAD
292 my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
293#elif USE_PTHREAD
294 {
295 affinity_helper fpa;
296 fpa.protect_affinity_mask( /*restore_process_mask=*/true );
297 my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
298 // Implicit destruction of fpa resets original affinity mask.
299 }
300#endif /* USE_PTHREAD */
301 state_t s = my_state.compare_and_swap( st_normal, st_starting );
302 if (st_starting != s) {
303 // Do shutdown during startup. my_handle can't be released
304 // by start_shutdown, because my_handle value might be not set yet
305 // at time of transition from st_starting to st_quit.
306 __TBB_ASSERT( s==st_quit, NULL );
308 }
309 }
310 else {
311 __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" );
312 my_thread_monitor.notify();
313 }
314}
315
316//------------------------------------------------------------------------
317// Methods of private_server
318//------------------------------------------------------------------------
319private_server::private_server( tbb_client& client ) :
320 my_client(client),
321 my_n_thread(client.max_job_count()),
322 my_stack_size(client.min_stack_size()),
323 my_thread_array(NULL)
324{
326 my_slack = 0;
327#if TBB_USE_ASSERT
328 my_net_slack_requests = 0;
329#endif /* TBB_USE_ASSERT */
330 my_asleep_list_root = NULL;
332 for( size_t i=0; i<my_n_thread; ++i ) {
333 private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
336 }
337}
338
340 __TBB_ASSERT( my_net_slack_requests==0, NULL );
341 for( size_t i=my_n_thread; i--; )
345}
346
348 asleep_list_mutex_type::scoped_lock lock;
349 if( !lock.try_acquire(my_asleep_list_mutex) )
350 return false;
351 // Contribute to slack under lock so that if another takes that unit of slack,
352 // it sees us sleeping on the list and wakes us up.
353 int k = ++my_slack;
354 if( k<=0 ) {
357 return true;
358 } else {
359 --my_slack;
360 return false;
361 }
362}
363
364void private_server::wake_some( int additional_slack ) {
365 __TBB_ASSERT( additional_slack>=0, NULL );
366 private_worker* wakee[2];
367 private_worker**w = wakee;
368 {
369 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
370 while( my_asleep_list_root && w<wakee+2 ) {
371 if( additional_slack>0 ) {
372 if (additional_slack+my_slack<=0) // additional demand does not exceed surplus supply
373 break;
374 --additional_slack;
375 } else {
376 // Chain reaction; Try to claim unit of slack
377 int old;
378 do {
379 old = my_slack;
380 if( old<=0 ) goto done;
381 } while( my_slack.compare_and_swap(old-1,old)!=old );
382 }
383 // Pop sleeping worker to combine with claimed unit of slack
384 my_asleep_list_root = (*w++ = my_asleep_list_root)->my_next;
385 }
386 if( additional_slack ) {
387 // Contribute our unused slack to my_slack.
388 my_slack += additional_slack;
389 }
390 }
391done:
392 while( w>wakee ) {
393 private_worker* ww = *--w;
394 ww->my_next = NULL;
395 ww->wake_or_launch();
396 }
397}
398
400#if TBB_USE_ASSERT
401 my_net_slack_requests+=delta;
402#endif /* TBB_USE_ASSERT */
403 if( delta<0 ) {
404 my_slack+=delta;
405 } else if( delta>0 ) {
406 wake_some( delta );
407 }
408}
409
411tbb_server* make_private_server( tbb_client& client ) {
413}
414
415} // namespace rml
416} // namespace internal
417
418} // namespace tbb
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_Yield()
Definition: ibm_aix51.h:44
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
The graph class.
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:398
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
__TBB_SCHEDULER_MUTEX_TYPE scheduler_mutex_type
Mutex type for global locks in the scheduler.
thread_monitor::handle_type thread_handle
tbb_server * make_private_server(tbb_client &client)
Factory method called from task.cpp to create a private_server.
static const size_t cache_line_size
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:304
static unsigned default_num_threads()
Definition: governor.h:84
private_server & my_server
Associated server.
thread_handle my_handle
Handle of the OS thread associated with this worker.
void start_shutdown()
Called by a thread (usually not the associated thread) to commence termination.
private_worker * my_next
Link for list of workers that are sleeping or have no associated thread.
state_t
State in finite-state machine that controls the worker.
@ st_normal
Associated thread is doing normal life sequence.
@ st_quit
Associated thread has ended normal life sequence and promises to never touch *this again.
@ st_starting
*this has associated thread that is starting up.
thread_monitor my_thread_monitor
Monitor for sleeping when there is no work to do.
static void release_handle(thread_handle my_handle, bool join)
static __RML_DECL_THREAD_ROUTINE thread_routine(void *arg)
private_worker(private_server &server, tbb_client &client, const size_t i)
void wake_or_launch()
Wake up associated thread (or launch a thread if there is none)
tbb_client & my_client
Associated client.
const size_t my_index
index used for avoiding the 64K aliasing problem
void run()
Actions executed by the associated thread.
char pad[cache_line_size - sizeof(private_worker)%cache_line_size]
padded_private_worker(private_server &server, tbb_client &client, const size_t i)
version_type version() const __TBB_override
const size_t my_stack_size
Stack size for each thread. *‍/.
const tbb_client::size_type my_n_thread
Maximum number of threads to be created.
void propagate_chain_reaction()
Wake up to two sleeping workers, if there are any sleeping.
void wake_some(int additional_slack)
Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
void independent_thread_number_changed(int) __TBB_override
unsigned default_concurrency() const __TBB_override
scheduler_mutex_type asleep_list_mutex_type
Protects my_asleep_list_root.
bool try_insert_in_asleep_list(private_worker &t)
Try to add t to list of sleeping workers.
atomic< int > my_ref_count
Counter used to determine when to delete this.
atomic< int > my_slack
Number of jobs that could use their associated thread minus number of active threads.
void request_close_connection(bool) __TBB_override
tbb::atomic< private_worker * > my_asleep_list_root
List of workers that are asleep or committed to sleeping until notified by another thread.
asleep_list_mutex_type my_asleep_list_mutex
padded_private_worker * my_thread_array
void adjust_job_count_estimate(int delta) __TBB_override

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.