Fawkes API  Fawkes Development Version
test_syncpoint.cpp
1 /***************************************************************************
2  * test_syncpoint.cpp - SyncPoint Unit Test
3  *
4  * Created: Wed Jan 22 11:17:43 2014
5  * Copyright 2014-2018 Till Hofmann
6  *
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include <gtest/gtest.h>
23 
24 #include <pthread.h>
25 #ifdef __FreeBSD__
26 # include <pthread_np.h>
27 #endif
28 #include <core/threading/barrier.h>
29 #include <core/threading/mutex.h>
30 #include <core/threading/mutex_locker.h>
31 #include <core/threading/wait_condition.h>
32 #include <core/utils/refptr.h>
33 #include <libs/syncpoint/exceptions.h>
34 #include <libs/syncpoint/syncpoint.h>
35 #include <libs/syncpoint/syncpoint_manager.h>
36 #include <logging/cache.h>
37 #include <logging/multi.h>
38 #include <sys/time.h>
39 
40 #include <atomic>
41 #include <cmath>
42 #include <errno.h>
43 #include <string>
44 #include <time.h>
45 #include <unistd.h>
46 
47 using namespace fawkes;
48 using namespace std;
49 
50 /** @class SyncPointTest
51  * Test class for SyncPoint
52  * This class tests basic functionality of SyncPoints
53  */
54 class SyncPointTest : public ::testing::Test
55 {
56 protected:
57  /**
58  * Initialize the test class
59  */
60  virtual void
62  {
63  logger_ = new MultiLogger();
64  string id1 = "/id1";
65  string id2 = "/id2";
66  sp1 = new SyncPoint(id1, logger_);
67  sp2 = new SyncPoint(id1, logger_);
68  sp3 = new SyncPoint(id2, logger_);
69  }
70 
71  /** Clean up */
72  virtual void
74  {
75  delete logger_;
76  }
77 
78  /**@{*/
79  /**
80  * Syncpoints for testing purposes
81  */
85  /**@}*/
86 
87  /** Logger for testing */
89 };
90 
91 /** @class SyncPointManagerTest
92  * Test class for SyncPointManager
93  * This class tests basic functionality of the SyncPointManager
94  */
95 class SyncPointManagerTest : public ::testing::Test
96 {
97 protected:
98  /**
99  * Initialize the test class
100  */
102  {
103  logger_ = new MultiLogger();
104  cache_logger_ = new CacheLogger();
105  logger_->add_logger(cache_logger_);
106  manager = new SyncPointManager(logger_);
107 
108  pthread_attr_init(&attrs);
109  }
110 
111  /**
112  * Deinitialize the test class
113  */
115  {
116  pthread_attr_destroy(&attrs);
117  delete logger_;
118  // delete cache_logger_;
119  }
120 
121  /**
122  * A Pointer to a SyncPointManager
123  */
125 
126  /** Logger used to initialize SyncPoints */
128 
129  /** Cache Logger used for testing */
131 
132  /** Thread attributes */
133  pthread_attr_t attrs;
134 };
135 
136 /** @class SyncBarrierTest
137  * Test SyncBarriers
138  */
140 {
141 protected:
142  /** Constructor. */
144  {
145  }
146 };
147 
148 TEST_F(SyncPointTest, CreateSyncPoint)
149 {
150  ASSERT_TRUE(*sp1 != NULL);
151 }
152 
153 TEST_F(SyncPointTest, Equals)
154 {
155  // RefPtr<SyncPoint>
156  ASSERT_NE(sp1, sp2);
157  // SyncPoint*
158  ASSERT_NE(*sp1, *sp2);
159  // SyncPoint
160  ASSERT_EQ(**sp1, **sp2);
161 }
162 
163 TEST_F(SyncPointTest, LessThan)
164 {
165  ASSERT_LT(**sp1, **sp3);
166  ASSERT_FALSE(**sp3 < **sp1);
167  ASSERT_FALSE(**sp1 < **sp2);
168  ASSERT_FALSE(**sp2 < **sp1);
169 }
170 
171 TEST_F(SyncPointTest, SyncPointSets)
172 {
173  using namespace std;
174  set<RefPtr<SyncPoint>, SyncPointSetLessThan> sp_set;
175  pair<set<RefPtr<SyncPoint>>::iterator, bool> ret;
176 
177  // insert sp1
178  ret = sp_set.insert(sp1);
179  ASSERT_TRUE(ret.second);
180  ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
181 
182  // insert sp3
183  ret = sp_set.insert(sp3);
184  ASSERT_TRUE(ret.second);
185  ASSERT_EQ(sp3->get_identifier(), (*(ret.first))->get_identifier());
186 
187  // insert sp1 again
188  ret = sp_set.insert(sp1);
189  ASSERT_FALSE(ret.second);
190  ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
191 
192  // insert sp2 (same as sp1)
193  ret = sp_set.insert(sp2);
194  ASSERT_FALSE(ret.second);
195  ASSERT_EQ(sp2->get_identifier(), (*(ret.first))->get_identifier());
196 }
197 
199 {
200  ASSERT_EQ(0u, manager->get_syncpoints().size());
201  manager->get_syncpoint("test", "/test/1");
202  ASSERT_EQ(3u, manager->get_syncpoints().size());
203  ASSERT_EQ(1u,
204  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
205  manager->get_syncpoint("test2", "/test/2");
206  ASSERT_EQ(4u, manager->get_syncpoints().size());
207  ASSERT_EQ(1u,
208  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
209  ASSERT_EQ(1u,
210  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/2", logger_))));
211  manager->get_syncpoint("test3", "/test/1");
212  ASSERT_EQ(4u, manager->get_syncpoints().size());
213  ASSERT_EQ(1u,
214  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
215  ASSERT_EQ(1u,
216  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/2", logger_))));
217  ASSERT_EQ(1u, manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/", logger_))));
218  ASSERT_EQ(1u,
219  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test", logger_))));
220 }
221 
222 TEST_F(SyncPointManagerTest, WatcherSet)
223 {
224  ASSERT_NO_THROW(manager->get_syncpoint("component 1", "/test"));
225  ASSERT_NO_THROW(manager->get_syncpoint("component 2", "/test"));
226  ASSERT_NO_THROW(manager->get_syncpoint("component 3", "/test"));
227 }
228 
229 /** Test what happens if we acquire a SyncPoint, release it, and then acquire it
230  * again. If release_syncpoint works properly, this should not throw. Otherwise,
231  * we would expect a SyncPointAlreadyOpenedException
232  */
233 TEST_F(SyncPointManagerTest, ReleaseAndReacquire)
234 {
235  string comp = "component";
236  string id = "/test/sp1";
237  RefPtr<SyncPoint> sp = manager->get_syncpoint(comp, id);
238  set<RefPtr<SyncPoint>, SyncPointSetLessThan> syncpoints = manager->get_syncpoints();
239  ASSERT_EQ(1, syncpoints.count(RefPtr<SyncPoint>(new SyncPoint("/test", logger_))));
240  for (set<RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
241  sp_it++) {
242  EXPECT_EQ(1, (*sp_it)->get_watchers().count(comp))
243  << "for component '" << comp << "' and SyncPoint '" << (*sp_it)->get_identifier() << "'";
244  }
245  manager->release_syncpoint(comp, sp);
246  for (set<RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
247  sp_it++) {
248  EXPECT_EQ(0, (*sp_it)->get_watchers().count(comp))
249  << "for component '" << comp << "' and SyncPoint '" << (*sp_it)->get_identifier() << "'";
250  }
251  ASSERT_NO_THROW(manager->get_syncpoint(comp, id));
252 }
253 
254 TEST_F(SyncPointTest, EmptyIdentifier)
255 {
256  ASSERT_THROW(sp1 = new SyncPoint("", NULL), SyncPointInvalidIdentifierException);
257 }
258 
259 TEST_F(SyncPointTest, InvalidIdentifier)
260 {
261  EXPECT_THROW(sp1 = new SyncPoint("invalid", NULL), SyncPointInvalidIdentifierException);
262  EXPECT_NO_THROW(sp1 = new SyncPoint("/", NULL));
263  EXPECT_THROW(sp1 = new SyncPoint("/test/", NULL), SyncPointInvalidIdentifierException);
264 }
265 
266 TEST_F(SyncPointManagerTest, SyncPointManagerExceptions)
267 {
268  RefPtr<SyncPoint> invalid_sp;
269  ASSERT_THROW(invalid_sp = manager->get_syncpoint("", "/test/sp1"),
271 
272  // make sure syncpoint_manager doesn't catch the exceptions thrown by SyncPoint
273  ASSERT_THROW(invalid_sp = manager->get_syncpoint("waiter", ""),
275  ASSERT_THROW(invalid_sp = manager->get_syncpoint("waiter", "invalid"),
277 }
278 
279 TEST_F(SyncPointManagerTest, SyncPointHierarchyRegisteredWatchers)
280 {
281  string comp = "component1";
282  string id = "/test/sp1";
283  RefPtr<SyncPoint> sp = manager->get_syncpoint(comp, "/test/sp1");
284  set<RefPtr<SyncPoint>, SyncPointSetLessThan> syncpoints = manager->get_syncpoints();
285  set<RefPtr<SyncPoint>>::iterator sp_test_it =
286  syncpoints.find(RefPtr<SyncPoint>(new SyncPoint("/test", logger_)));
287  set<RefPtr<SyncPoint>>::iterator sp_root_it =
288  syncpoints.find(RefPtr<SyncPoint>(new SyncPoint("/", logger_)));
289  ASSERT_NE(syncpoints.end(), sp_test_it);
290  ASSERT_NE(syncpoints.end(), sp_root_it);
291  RefPtr<SyncPoint> sp_test = *sp_test_it;
292  RefPtr<SyncPoint> sp_root = *sp_root_it;
293  EXPECT_EQ(1, syncpoints.count(sp_test));
294  EXPECT_EQ(1, syncpoints.count(sp_root));
295  EXPECT_EQ(1, sp->get_watchers().count(comp));
296  EXPECT_EQ(1, sp_test->get_watchers().count(comp));
297  EXPECT_EQ(0, sp_test->get_watchers().count(id));
298  EXPECT_EQ(1, sp_root->get_watchers().count(comp));
299  EXPECT_EQ(0, sp_root->get_watchers().count(id));
300  EXPECT_EQ(0, sp_root->get_watchers().count(sp_test->get_identifier()));
301 
302  manager->release_syncpoint(comp, sp);
303  EXPECT_EQ(0, sp_test->get_watchers().count(id));
304 }
305 
306 TEST_F(SyncPointManagerTest, SyncPointComponentRegistersForMultipleSyncPoints)
307 {
308  string comp = "component1";
309  string sp1_id = "/test/sp1";
310  string sp2_id = "/test/sp2";
311  RefPtr<SyncPoint> sp1 = manager->get_syncpoint(comp, sp1_id);
312  // the following should not throw
313  // if it does, registering for the predecessor '/test' may be broken
314  RefPtr<SyncPoint> sp2 = manager->get_syncpoint(comp, sp2_id);
315  RefPtr<SyncPoint> predecessor =
316  *manager->get_syncpoints().find(RefPtr<SyncPoint>(new SyncPoint("/test", logger_)));
317  EXPECT_EQ(1, sp1->get_watchers().count(comp))
318  << comp << " is not registered for " << sp1->get_identifier() << ", but should be!";
319  EXPECT_EQ(1, sp2->get_watchers().count(comp))
320  << comp << " is not registered for " << sp2->get_identifier() << ", but should be!";
321  EXPECT_EQ(1, predecessor->get_watchers().count(comp))
322  << comp << " is not registered for " << predecessor->get_identifier() << ", but should be!";
323 
324  manager->release_syncpoint(comp, sp1);
325  EXPECT_EQ(1, sp2->get_watchers().count(comp));
326  EXPECT_EQ(1, predecessor->get_watchers().count(comp))
327  << comp << " is not registered for " << predecessor->get_identifier() << ", but should be!";
328 }
329 
330 enum ThreadStatus { PENDING, RUNNING, FINISHED };
331 
332 /** struct used for multithreading tests */
334 {
335  /** SyncPointManager passed to the thread */
337  /** Thread number */
338  uint thread_nr = 0;
339  /** Wait type */
340  SyncPoint::WakeupType type = SyncPoint::WAIT_FOR_ONE;
341  /** Number of wait calls the thread should make */
343  /** Name of the SyncPoint */
345  /** Name of the component */
346  string component = "";
347  /** timeout in sec */
348  uint timeout_sec = 0;
349  /** timeout in nsec */
350  uint timeout_nsec = 0;
351  /** current status of the thread */
352  atomic<ThreadStatus> status;
353  /** Mutex to protect cond_running */
355  /** WaitCondition to indicate that the thread is running */
356  WaitCondition cond_running = WaitCondition(&mutex_running);
357  /** Mutex to protect cond_finished */
359  /** WaitCondition to indicate that the thread has finished */
360  WaitCondition cond_finished = WaitCondition(&mutex_finished);
361  /** Barrier for startup synchronization. */
362  Barrier *start_barrier = nullptr;
363 };
364 
365 /** Helper function to wait for a thread to be running */
366 bool
367 wait_for_running(waiter_thread_params *params, long int sec = 1, long int nanosec = 0)
368 {
369  RefPtr<SyncPoint> sp = params->manager->get_syncpoint("test_runner", params->sp_identifier);
370  const int wait_time_us = 1000;
371  for (uint i = 0; i < (sec * pow(10, 9) + nanosec) / (wait_time_us * pow(10, 3)); i++) {
372  if (sp->watcher_is_waiting(params->component, params->type)) {
373  return true;
374  }
375  usleep(wait_time_us);
376  }
377  return false;
378 }
379 
380 /** Helper function to wait for a thread to be finished */
381 bool
382 wait_for_finished(waiter_thread_params *params, long int sec = 1, long int nanosec = 0)
383 {
384  MutexLocker ml(params->mutex_finished);
385  if (params->status == FINISHED) {
386  return true;
387  } else {
388  return params->cond_finished.reltimed_wait(sec, nanosec);
389  }
390 }
391 
392 /** get a SyncPoint and wait for it */
393 void *
394 start_waiter_thread(void *data)
395 {
396  waiter_thread_params *params = (waiter_thread_params *)data;
397  string component = params->component;
398  RefPtr<SyncPoint> sp = params->manager->get_syncpoint(component, params->sp_identifier);
399  params->status = RUNNING;
400  if (params->start_barrier) {
401  params->start_barrier->wait();
402  }
403  params->mutex_running.lock();
404  params->cond_running.wake_all();
405  params->mutex_running.unlock();
406  for (uint i = 0; i < params->num_wait_calls; i++) {
407  sp->wait(component, params->type, params->timeout_sec, params->timeout_nsec);
408  }
409  params->status = FINISHED;
410  params->mutex_finished.lock();
411  params->cond_finished.wake_all();
412  params->mutex_finished.unlock();
413  pthread_exit(NULL);
414 }
415 
416 TEST_F(SyncPointManagerTest, MultipleWaits)
417 {
418  RefPtr<SyncPoint> sp_ref = manager->get_syncpoint("component", "/test/sp1");
419  pthread_t thread1;
420  waiter_thread_params params;
421  params.component = "component";
422  params.manager = manager;
423  params.num_wait_calls = 1;
424  params.sp_identifier = "/test/sp1";
425  pthread_create(&thread1, &attrs, start_waiter_thread, &params);
426  wait_for_running(&params);
427  ASSERT_THROW(sp_ref->wait("component"), SyncPointMultipleWaitCallsException);
428  pthread_cancel(thread1);
429  pthread_join(thread1, NULL);
430 }
431 
432 /** Create multiple threads which will all call get_syncpoint
433  * for the same SyncPoint. Do not wait for the SyncPoint but return
434  * immediately.
435  */
436 TEST_F(SyncPointManagerTest, MultipleManagerRequests)
437 {
438  uint num_threads = 50;
439  pthread_t threads[num_threads];
440  waiter_thread_params *params[num_threads];
441  string sp_identifier = "/test/sp1";
442  for (uint i = 0; i < num_threads; i++) {
443  params[i] = new waiter_thread_params();
444  params[i]->component = "component " + to_string(i);
445  params[i]->manager = manager;
446  params[i]->thread_nr = i;
447  params[i]->num_wait_calls = 0;
448  params[i]->sp_identifier = sp_identifier;
449  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
450  pthread_yield();
451  ASSERT_LE(manager->get_syncpoints().size(), 3u);
452  }
453 
454  for (uint i = 0; i < num_threads; i++) {
455  pthread_join(threads[i], NULL);
456  delete params[i];
457  }
458 }
459 
460 /** start multiple threads and let them wait.
461  * This just tests whether there are any segfaults.
462  * No assertions are made.
463  */
464 TEST_F(SyncPointManagerTest, ParallelWaitCalls)
465 {
466  uint num_threads = 50;
467  uint num_wait_calls = 10;
468  pthread_t threads[num_threads];
469  waiter_thread_params *params[num_threads];
470  string sp_identifier = "/test/sp1";
471  for (uint i = 0; i < num_threads; i++) {
472  params[i] = new waiter_thread_params();
473  params[i]->component = "component " + to_string(i);
474  params[i]->manager = manager;
475  params[i]->thread_nr = i;
476  params[i]->num_wait_calls = num_wait_calls;
477  params[i]->sp_identifier = sp_identifier;
478  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
479  pthread_yield();
480  ASSERT_LE(manager->get_syncpoints().size(), 3u);
481  }
482 
483  for (uint i = 0; i < num_threads; i++) {
484  EXPECT_TRUE(wait_for_running(params[i]));
485  }
486  for (uint i = 0; i < num_threads; i++) {
487  pthread_cancel(threads[i]);
488  ASSERT_EQ(0, pthread_join(threads[i], NULL));
489  delete params[i];
490  }
491 }
492 
493 /** start multiple threads, let them wait for a SyncPoint,
494  * emit the SyncPoint and verify that they all returned
495  */
496 TEST_F(SyncPointManagerTest, ParallelWaitsReturn)
497 {
498  uint num_threads = 10;
499  uint num_wait_calls = 5;
500  pthread_t threads[num_threads];
501  waiter_thread_params *params[num_threads];
502  string sp_identifier = "/test/sp1";
503  for (uint i = 0; i < num_threads; i++) {
504  params[i] = new waiter_thread_params();
505  params[i]->component = "component " + to_string(i);
506  params[i]->manager = manager;
507  params[i]->thread_nr = i;
508  params[i]->num_wait_calls = num_wait_calls;
509  params[i]->sp_identifier = sp_identifier;
510  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
511  pthread_yield();
512  }
513 
514  for (uint i = 0; i < num_threads; i++) {
515  EXPECT_TRUE(wait_for_running(params[i]));
516  }
517 
518  string component = "emitter";
519  RefPtr<SyncPoint> sp = manager->get_syncpoint(component, sp_identifier);
520  sp->register_emitter(component);
521  for (uint i = 0; i < num_wait_calls; i++) {
522  sp->emit(component);
523  usleep(20000);
524  }
525 
526  for (uint i = 0; i < num_threads; i++) {
527  ASSERT_TRUE(wait_for_finished(params[i]));
528  pthread_join(threads[i], NULL);
529  delete params[i];
530  }
531 }
532 
533 /** start multiple threads, let them wait for a SyncPoint,
534  * but don't emit the SyncPoint. Verify that they have not returned
535  */
536 TEST_F(SyncPointManagerTest, WaitDoesNotReturnImmediately)
537 {
538  uint num_threads = 50;
539  pthread_t threads[num_threads];
540  waiter_thread_params *params[num_threads];
541  for (uint i = 0; i < num_threads; i++) {
542  params[i] = new waiter_thread_params();
543  params[i]->component = "component " + to_string(i);
544  params[i]->manager = manager;
545  params[i]->thread_nr = i;
546  params[i]->num_wait_calls = 1;
547  params[i]->sp_identifier = "/test/sp1";
548  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
549  }
550 
551  for (uint i = 0; i < num_threads; i++) {
552  EXPECT_TRUE(wait_for_running(params[i]));
553  }
554 
555  for (uint i = 0; i < num_threads; i++) {
556  EXPECT_EQ(RUNNING, params[i]->status);
557  pthread_cancel(threads[i]);
558  ASSERT_EQ(0, pthread_join(threads[i], NULL));
559  delete params[i];
560  }
561 }
562 
563 /**
564  * Test the SyncPoint hierarchy.
565  * This creates a SyncPoint, an emitter and waiters which wait for the
566  * SyncPoint's predecessor, the predecessor's predecessor (grandparent),
567  * and the root SyncPoint ("/").
568  */
569 TEST_F(SyncPointManagerTest, SyncPointHierarchy)
570 {
571  vector<string> identifiers = {"/test/topic", "/test", "/", "/other/topic"};
572  uint num_threads = identifiers.size();
573  pthread_t threads[num_threads];
574  waiter_thread_params *params[num_threads];
575  for (uint i = 0; i < num_threads; i++) {
576  params[i] = new waiter_thread_params();
577  params[i]->component = "component " + to_string(i);
578  params[i]->manager = manager;
579  params[i]->thread_nr = i;
580  params[i]->num_wait_calls = 1;
581  params[i]->sp_identifier = identifiers.at(i);
582  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
583  }
584 
585  for (uint i = 0; i < num_threads; i++) {
586  EXPECT_TRUE(wait_for_running(params[i]));
587  }
588  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test/topic/sp");
589  sp->register_emitter("emitter");
590  sp->emit("emitter");
591 
592  /* The first waiters should be unblocked */
593  for (uint i = 0; i < num_threads - 1; i++) {
594  ASSERT_TRUE(wait_for_finished(params[i]));
595  pthread_join(threads[i], NULL);
596  delete params[i];
597  }
598 
599  /* The last waiter should still wait */
600  pthread_t last_thread = threads[num_threads - 1];
601  EXPECT_FALSE(wait_for_finished(params[num_threads - 1], 0, pow(10, 6)));
602  pthread_cancel(last_thread);
603  ASSERT_EQ(0, pthread_join(last_thread, NULL));
604 }
605 
606 /** Emit a barrier without registering */
607 TEST_F(SyncBarrierTest, EmitWithoutRegister)
608 {
609  string component = "emitter";
610  RefPtr<SyncPoint> barrier = manager->get_syncpoint(component, "/test/barrier");
611  ASSERT_THROW(barrier->emit(component), SyncPointNonEmitterCalledEmitException);
612 }
613 
614 /** Register multiple times
615  * This is allowed, but the component should then also emit multiple times */
616 TEST_F(SyncBarrierTest, MultipleRegisterCalls)
617 {
618  string component = "emitter";
619  RefPtr<SyncPoint> barrier = manager->get_syncpoint(component, "/test/barrier");
620  EXPECT_NO_THROW(barrier->register_emitter(component));
621  EXPECT_NO_THROW(barrier->register_emitter(component));
622 }
623 
624 /** get a SyncBarrier, register as emitter and emit */
625 void *
626 start_barrier_emitter_thread(void *data)
627 {
628  waiter_thread_params *params = (waiter_thread_params *)data;
629  string component = "emitter " + to_string(params->thread_nr);
631  EXPECT_NO_THROW(sp = params->manager->get_syncpoint(component, params->sp_identifier));
632  sp->register_emitter(component);
633  for (uint i = 0; i < params->num_wait_calls; i++) {
634  sp->emit(component);
635  }
636  pthread_exit(NULL);
637 }
638 
639 /** Helper class which registers and emits a given SyncBarrier */
640 class Emitter
641 {
642 public:
643  /** Constructor.
644  * @param identifier The identifier of this emitter.
645  * @param syncbarrier The identifier of the SyncBarrier to register for.
646  * @param manager Pointer to the SyncPointManager to use.
647  */
648  Emitter(string identifier, string syncbarrier, RefPtr<SyncPointManager> manager)
649  : identifier_(identifier), manager_(manager)
650  {
651  barrier_ = manager->get_syncpoint(identifier_, syncbarrier);
652  barrier_->register_emitter(identifier_);
653  }
654 
655  /** Destructor. */
656  virtual ~Emitter()
657  {
658  barrier_->unregister_emitter(identifier_);
659  manager_->release_syncpoint(identifier_, barrier_);
660  }
661 
662  /** emit the SyncBarrier */
663  void
665  {
666  barrier_->emit(identifier_);
667  }
668 
669 private:
670  string identifier_;
671  RefPtr<SyncPoint> barrier_;
672  RefPtr<SyncPointManager> manager_;
673 };
674 
675 /** Barrier: wait() returns immediately if no emitter is registered */
676 TEST_F(SyncBarrierTest, WaitWithNoRegisteredEmitter)
677 {
678  string barrier_id = "/test/barrier";
679  RefPtr<SyncPoint> barrier = manager->get_syncpoint("main loop", barrier_id);
680  const uint num_waiter_threads = 1;
681  const uint num_wait_calls = 1;
682  pthread_t waiter_threads[num_waiter_threads];
683  waiter_thread_params *params[num_waiter_threads];
684  for (uint i = 0; i < num_waiter_threads; i++) {
685  params[i] = new waiter_thread_params();
686  params[i]->type = SyncPoint::WAIT_FOR_ALL;
687  params[i]->component = "component " + to_string(i);
688  params[i]->manager = manager;
689  params[i]->thread_nr = i;
690  params[i]->num_wait_calls = num_wait_calls;
691  params[i]->sp_identifier = barrier_id;
692  pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
693  }
694  for (uint i = 0; i < num_waiter_threads; i++) {
695  ASSERT_TRUE(wait_for_finished(params[i]));
696  pthread_join(waiter_threads[i], NULL);
697  delete params[i];
698  }
699 }
700 
701 /** Start multiple threads, let them wait for a SyncBarrier,
702  * also have two threads registered as emitter.
703  * Let the first thread emit the barrier, assert the waiters did not unblock,
704  * then let the second thread emit.
705  * This tests the fundamental difference to a SyncPoint: With a SyncPoint,
706  * wait() returns if the SyncPoint is emitted by one component.
707  * With a SyncBarrier, all registered emitters need to emit the SyncBarrier
708  * before wait() returns.
709  */
710 TEST_F(SyncBarrierTest, WaitForAllEmitters)
711 {
712  string barrier_id = "/test/barrier";
713  Emitter em1("emitter 1", barrier_id, manager);
714  Emitter em2("emitter 2", barrier_id, manager);
715 
716  RefPtr<SyncPoint> barrier = manager->get_syncpoint("main loop", barrier_id);
717 
718  const uint num_waiter_threads = 50;
719  const uint num_wait_calls = 1;
720  pthread_t waiter_threads[num_waiter_threads];
721  waiter_thread_params *params[num_waiter_threads];
722  for (uint i = 0; i < num_waiter_threads; i++) {
723  params[i] = new waiter_thread_params();
724  params[i]->component = "component " + to_string(i);
725  params[i]->type = SyncPoint::WAIT_FOR_ALL;
726  params[i]->manager = manager;
727  params[i]->thread_nr = i;
728  params[i]->num_wait_calls = num_wait_calls;
729  params[i]->sp_identifier = barrier_id;
730  pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
731  }
732 
733  for (uint i = 0; i < num_waiter_threads; i++) {
734  EXPECT_TRUE(wait_for_running(params[i]));
735  }
736 
737  em1.emit();
738 
739  for (uint i = 0; i < num_waiter_threads; i++) {
740  EXPECT_EQ(RUNNING, params[i]->status);
741  }
742 
743  em1.emit();
744  em2.emit();
745 
746  for (uint i = 0; i < num_waiter_threads; i++) {
747  ASSERT_TRUE(wait_for_finished(params[i]));
748  pthread_join(waiter_threads[i], NULL);
749  delete params[i];
750  }
751 }
752 
753 /** two barriers, emit the first one. Only the threads waiting on the first
754  * barrier should unblock
755  */
756 TEST_F(SyncBarrierTest, BarriersAreIndependent)
757 {
758  string barrier1_id = "/test/barrier1";
759  string barrier2_id = "/test/barrier2";
760  Emitter em1("em1", barrier1_id, manager);
761  Emitter em2("em2", barrier2_id, manager);
762 
763  RefPtr<SyncPoint> barrier1 = manager->get_syncpoint("m1", barrier1_id);
764 
765  RefPtr<SyncPoint> barrier2 = manager->get_syncpoint("m2", barrier2_id);
766 
767  const uint num_waiter_threads = 50;
768  const uint num_wait_calls = 1;
769  pthread_t waiter_threads1[num_waiter_threads];
770  waiter_thread_params *params1[num_waiter_threads];
771  for (uint i = 0; i < num_waiter_threads; i++) {
772  params1[i] = new waiter_thread_params();
773  params1[i]->component = "component " + to_string(i);
774  params1[i]->type = SyncPoint::WAIT_FOR_ALL;
775  params1[i]->manager = manager;
776  params1[i]->thread_nr = i;
777  params1[i]->num_wait_calls = num_wait_calls;
778  params1[i]->sp_identifier = barrier1_id;
779  pthread_create(&waiter_threads1[i], &attrs, start_waiter_thread, params1[i]);
780  }
781 
782  pthread_t waiter_threads2[num_waiter_threads];
783  waiter_thread_params *params2[num_waiter_threads];
784  for (uint i = 0; i < num_waiter_threads; i++) {
785  params2[i] = new waiter_thread_params();
786  params2[i]->component = "component " + to_string(i);
787  params2[i]->type = SyncPoint::WAIT_FOR_ALL;
788  params2[i]->manager = manager;
789  params2[i]->thread_nr = num_waiter_threads + i;
790  params2[i]->num_wait_calls = num_wait_calls;
791  params2[i]->sp_identifier = barrier2_id;
792  pthread_create(&waiter_threads2[i], &attrs, start_waiter_thread, params2[i]);
793  }
794 
795  for (uint i = 0; i < num_waiter_threads; i++) {
796  EXPECT_TRUE(wait_for_running(params1[i]));
797  }
798 
799  for (uint i = 0; i < num_waiter_threads; i++) {
800  EXPECT_TRUE(wait_for_running(params2[i]));
801  }
802 
803  em1.emit();
804 
805  for (uint i = 0; i < num_waiter_threads; i++) {
806  ASSERT_TRUE(wait_for_finished(params1[i]));
807  pthread_join(waiter_threads1[i], NULL);
808  delete params1[i];
809  }
810 
811  for (uint i = 0; i < num_waiter_threads; i++) {
812  EXPECT_EQ(RUNNING, params2[i]->status);
813  }
814 
815  em2.emit();
816 
817  for (uint i = 0; i < num_waiter_threads; i++) {
818  ASSERT_TRUE(wait_for_finished(params2[i]));
819  pthread_join(waiter_threads2[i], NULL);
820  delete params2[i];
821  }
822 }
823 
824 /**
825  * Test the SyncBarrier hierarchy, similar to the SyncPoint hierarchy test.
826  * This creates a SyncBarrier, an emitter and waiters which wait for the
827  * SyncBarrier's predecessor, the predecessor's predecessor (grandparent),
828  * and the root SyncBarrier ("/").
829  */
830 TEST_F(SyncBarrierTest, SyncBarrierHierarchy)
831 {
832  Emitter em1("emitter 1", "/test/topic/b1", manager);
833  Emitter em2("emitter 2", "/test/topic/b2", manager);
834  Emitter em3("emitter 3", "/other/topic", manager);
835 
836  vector<string> identifiers = {"/test/topic", "/test", "/", "/other/topic"};
837  uint num_threads = identifiers.size();
838  pthread_t threads[num_threads];
839  waiter_thread_params *params[num_threads];
840  Barrier * barrier = new Barrier(num_threads + 1);
841  for (uint i = 0; i < num_threads; i++) {
842  params[i] = new waiter_thread_params();
843  params[i]->component = "component " + to_string(i);
844  params[i]->type = SyncPoint::WAIT_FOR_ALL;
845  params[i]->manager = manager;
846  params[i]->thread_nr = i;
847  params[i]->num_wait_calls = 1;
848  params[i]->sp_identifier = identifiers.at(i);
849  params[i]->start_barrier = barrier;
850  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
851  }
852 
853  barrier->wait();
854  delete barrier;
855 
856  for (uint i = 0; i < num_threads; i++) {
857  EXPECT_TRUE(wait_for_running(params[i]));
858  }
859 
860  em1.emit();
861  for (uint i = 0; i < num_threads; i++) {
862  ASSERT_EQ(RUNNING, params[i]->status);
863  }
864  em2.emit();
865  /* The first waiters should be unblocked */
866  for (uint i = 0; i < num_threads - 2; i++) {
867  ASSERT_TRUE(wait_for_finished(params[i]));
868  pthread_join(threads[i], NULL);
869  delete params[i];
870  }
871  /* The last two waiters should still be waiting */
872  for (uint i = num_threads - 2; i < num_threads; i++) {
873  EXPECT_EQ(RUNNING, params[i]->status);
874  pthread_cancel(threads[i]);
875  ASSERT_EQ(0, pthread_join(threads[i], NULL));
876  delete params[i];
877  }
878 }
879 
880 /** One component registers as emitter for two syncpoints, two other components
881  * wait for the first and second syncpoint respectively.
882  * Then, the first component unregisters for the first syncpoint.
883  * Test whether it is still registered for the second syncpoint.
884  * A third waiter waits for the predecessor syncpoint and should also still be
885  * waiting after the emitter has unregistered for the first syncpoint.
886  */
887 TEST_F(SyncPointManagerTest, OneEmitterRegistersForMultipleSyncPointsHierarchyTest)
888 {
889  string id_sp1 = "/test/sp1";
890  string id_sp2 = "/test/sp2";
891  string id_sp_pred = "/test";
892  string id_emitter = "component_emitter";
893  string id_waiter1 = "component_waiter1";
894  string id_waiter2 = "component_waiter2";
895  string id_waiter3 = "component_waiter_on_predecessor";
896 
897  RefPtr<SyncPoint> sp1 = manager->get_syncpoint(id_emitter, id_sp1);
898  RefPtr<SyncPoint> sp2 = manager->get_syncpoint(id_emitter, id_sp2);
899  manager->get_syncpoint(id_waiter1, id_sp1);
900  manager->get_syncpoint(id_waiter2, id_sp2);
901  RefPtr<SyncPoint> pred = manager->get_syncpoint(id_waiter3, id_sp_pred);
902  sp1->register_emitter(id_emitter);
903  sp2->register_emitter(id_emitter);
904  EXPECT_EQ(1, sp1->get_emitters().count(id_emitter));
905  EXPECT_EQ(1, sp2->get_emitters().count(id_emitter));
906  // this should be 2 as the emitter has registered twice
907  EXPECT_EQ(2, pred->get_emitters().count(id_emitter));
908 
910  params1->manager = manager;
911  params1->component = id_waiter1;
912  params1->type = SyncPoint::WAIT_FOR_ALL;
913  params1->num_wait_calls = 1;
914  params1->sp_identifier = id_sp1;
915 
917  params2->manager = manager;
918  params2->component = id_waiter2;
919  params2->type = SyncPoint::WAIT_FOR_ALL;
920  params2->num_wait_calls = 1;
921  params2->sp_identifier = id_sp2;
922 
924  params3->manager = manager;
925  params3->component = id_waiter3;
926  params3->type = SyncPoint::WAIT_FOR_ALL;
927  params3->num_wait_calls = 1;
928  params3->sp_identifier = id_sp_pred;
929 
930  pthread_t pthread1;
931  pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
932  pthread_t pthread2;
933  pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
934  pthread_t pthread3;
935  pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
936  EXPECT_TRUE(wait_for_running(params1));
937  EXPECT_TRUE(wait_for_running(params2));
938  EXPECT_TRUE(wait_for_running(params3));
939 
940  sp1->emit(id_emitter);
941 
942  ASSERT_TRUE(wait_for_finished(params1));
943  ASSERT_FALSE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
944  // this should be waiting as the component has registered twice for '/test'
945  // and thus should emit '/test' also twice (by hierarchical emit calls)
946  ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
947  sp2->emit(id_emitter);
948  ASSERT_TRUE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
949  ASSERT_TRUE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
950 
951  pthread_join(pthread1, NULL);
952  pthread_join(pthread2, NULL);
953  pthread_join(pthread3, NULL);
954 
955  sp2->unregister_emitter(id_emitter);
956  EXPECT_EQ(1, sp1->get_emitters().count(id_emitter));
957  EXPECT_EQ(0, sp2->get_emitters().count(id_emitter));
958  EXPECT_EQ(1, pred->get_emitters().count(id_emitter));
959 
960  pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
961  pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
962  pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
963 
964  ASSERT_TRUE(wait_for_running(params1));
965  ASSERT_TRUE(wait_for_running(params3));
966 
967  ASSERT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
968  ASSERT_TRUE(wait_for_finished(params2));
969  ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
970 
971  sp1->emit(id_emitter);
972  ASSERT_TRUE(wait_for_finished(params1));
973  ASSERT_TRUE(wait_for_finished(params3));
974  pthread_join(pthread1, NULL);
975  pthread_join(pthread2, NULL);
976  pthread_join(pthread3, NULL);
977  delete params1;
978  delete params2;
979  delete params3;
980 }
981 
982 /** Test if an exception is thrown if a registered emitter is currently not
983  * pending
984  */
985 TEST_F(SyncBarrierTest, NonPendingEmitterEmits)
986 {
987  Emitter em1("em1", "/barrier", manager);
988  // register a second emitter to avoid immediate reset after emit
989  Emitter em2("em2", "/barrier", manager);
990  EXPECT_NO_THROW(em1.emit());
991  EXPECT_NO_THROW(em1.emit());
992 }
993 
994 /** Test if a component waiting for a syncpoint is woken up
995  * if an emitter is registered for two successor syncpoints and the emitter
996  * emits the same syncpoint twice
997  */
998 TEST_F(SyncPointManagerTest, EmitterEmitsSameSyncPointTwiceTest)
999 {
1000  RefPtr<SyncPoint> sp1 = manager->get_syncpoint("emitter", "/test/sp1");
1001  RefPtr<SyncPoint> sp2 = manager->get_syncpoint("emitter", "/test/sp2");
1002  RefPtr<SyncPoint> sp_pred = manager->get_syncpoint("waiter", "/test");
1003 
1004  sp1->register_emitter("emitter");
1005  sp2->register_emitter("emitter");
1006 
1007  waiter_thread_params *params1 = new waiter_thread_params();
1008  params1->manager = manager;
1009  params1->component = "waiter";
1010  params1->type = SyncPoint::WAIT_FOR_ALL;
1011  params1->num_wait_calls = 1;
1012  params1->sp_identifier = "/test";
1013 
1014  pthread_t pthread1;
1015  pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
1016 
1017  EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1018 
1019  sp1->emit("emitter");
1020 
1021  EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1022 
1023  sp1->emit("emitter");
1024  EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1025 
1026  sp2->emit("emitter");
1027  ASSERT_TRUE(wait_for_finished(params1));
1028  pthread_join(pthread1, NULL);
1029 
1030  delete params1;
1031 }
1032 
1033 /** Test if the component returns when using reltime_wait */
1034 TEST_F(SyncPointManagerTest, RelTimeWaitTest)
1035 {
1036  RefPtr<SyncPoint> sp1 = manager->get_syncpoint("emitter", "/test/sp1");
1037  manager->get_syncpoint("waiter", "/test/sp1");
1038  sp1->register_emitter("emitter");
1039  pthread_t thread;
1040  waiter_thread_params params;
1041  params.manager = manager;
1042  params.type = SyncPoint::WAIT_FOR_ALL;
1043  params.num_wait_calls = 1;
1044  params.timeout_sec = 0;
1045  params.timeout_nsec = 100000;
1046  params.component = "waiter";
1047  params.sp_identifier = "/test/sp1";
1048  pthread_create(&thread, NULL, start_waiter_thread, &params);
1049  ASSERT_TRUE(wait_for_finished(&params));
1050  /* The SyncPoint should have logged the error */
1051  ASSERT_GT(cache_logger_->get_messages().size(), 0);
1052 }
1053 
1054 /// @cond INTERNALS
1055 struct emitter_thread_data
1056 {
1057  RefPtr<SyncPointManager> manager;
1058  std::string name;
1059  std::string sp_name;
1060  atomic<ThreadStatus> status;
1061  Mutex mutex_running;
1062  WaitCondition cond_running = WaitCondition(&mutex_running);
1063  Mutex mutex_finished;
1064  WaitCondition cond_finished = WaitCondition(&mutex_finished);
1065 };
1066 /// @endcond
1067 
1068 /** helper function to call emit in a thread */
1069 void *
1070 call_emit(void *data)
1071 {
1072  emitter_thread_data *tdata = (emitter_thread_data *)data;
1073  tdata->status = RUNNING;
1074  tdata->mutex_running.lock();
1075  tdata->cond_running.wake_all();
1076  tdata->mutex_running.unlock();
1077  RefPtr<SyncPoint> sp = tdata->manager->get_syncpoint(tdata->name, tdata->sp_name);
1078  sp->register_emitter(tdata->name);
1079  sp->emit(tdata->name);
1080  tdata->status = FINISHED;
1081  tdata->mutex_finished.lock();
1082  tdata->cond_finished.wake_all();
1083  tdata->mutex_finished.unlock();
1084  return NULL;
1085 }
1086 
1087 /** Test the functionality of lock_until_next_wait */
1088 TEST_F(SyncPointManagerTest, LockUntilNextWaitTest)
1089 {
1090  RefPtr<SyncPoint> sp = manager->get_syncpoint("component", "/test");
1091 
1092  sp->lock_until_next_wait("component");
1093  pthread_t thread;
1094  emitter_thread_data *emitter_params = new emitter_thread_data();
1095  emitter_params->manager = manager;
1096  emitter_params->name = "emitter";
1097  emitter_params->sp_name = "/test";
1098  pthread_create(&thread, NULL, call_emit, (void *)emitter_params);
1099 
1100  emitter_params->mutex_running.lock();
1101  if (emitter_params->status != RUNNING) {
1102  ASSERT_TRUE(emitter_params->cond_running.reltimed_wait(1, 0));
1103  }
1104  emitter_params->mutex_running.unlock();
1105  emitter_params->mutex_finished.lock();
1106  EXPECT_FALSE(emitter_params->cond_finished.reltimed_wait(0, 100000));
1107  emitter_params->mutex_finished.unlock();
1108 
1109  pthread_t waiter_thread;
1110  waiter_thread_params waiter_params;
1111  waiter_params.manager = manager;
1112  waiter_params.component = "component";
1113  waiter_params.num_wait_calls = 1;
1114  waiter_params.sp_identifier = "/test";
1115  pthread_create(&waiter_thread, NULL, start_waiter_thread, &waiter_params);
1116 
1117  emitter_params->mutex_finished.lock();
1118  ASSERT_TRUE(emitter_params->status == FINISHED
1119  || emitter_params->cond_finished.reltimed_wait(1, 0));
1120  emitter_params->mutex_finished.unlock();
1121  pthread_join(thread, NULL);
1122  pthread_join(waiter_thread, NULL);
1123  delete emitter_params;
1124 }
1125 
1126 /** helper function used for testing wait() */
1127 void *
1128 call_wait_for_all(void *data)
1129 {
1130  SyncPoint *sp = (SyncPoint *)(data);
1131  sp->wait_for_all("waiter");
1132  return NULL;
1133 }
1134 
1135 /** Test the functionality of lock_until_next_wait
1136  * Test whether the waiter really calls wait before ALL emitters call emit
1137  * This tests a potential race condition between wait() and emit() */
1138 TEST_F(SyncPointManagerTest, LockUntilNextWaitWaiterComesFirstTest)
1139 {
1140  RefPtr<SyncPoint> sp = manager->get_syncpoint("waiter", "/test");
1141 
1142  sp->lock_until_next_wait("waiter");
1143 
1144  uint num_emitters = 100;
1145  pthread_t emitter_thread[num_emitters];
1146  emitter_thread_data *params[num_emitters];
1147  for (uint i = 0; i < num_emitters; i++) {
1148  params[i] = new emitter_thread_data();
1149  params[i]->manager = manager;
1150  string emitter_name = "emitter" + to_string(i);
1151  params[i]->name = emitter_name;
1152  params[i]->sp_name = "/test";
1153  pthread_create(&emitter_thread[i], NULL, call_emit, (void *)params[i]);
1154  }
1155 
1156  for (uint i = 0; i < num_emitters; i++) {
1157  params[i]->mutex_running.lock();
1158  if (params[i]->status != RUNNING) {
1159  ASSERT_TRUE(params[i]->cond_running.reltimed_wait(1, 0));
1160  }
1161  params[i]->mutex_running.unlock();
1162  }
1163 
1164  pthread_t waiter_thread;
1166  thread_params.component = "waiter";
1167  thread_params.type = SyncPoint::WAIT_FOR_ALL;
1168  thread_params.manager = manager;
1169  thread_params.thread_nr = 1;
1170  thread_params.num_wait_calls = 1;
1171  thread_params.sp_identifier = "/test";
1172  pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1173 
1174  for (uint i = 0; i < num_emitters; i++) {
1175  params[i]->mutex_finished.lock();
1176  ASSERT_TRUE(params[i]->status == FINISHED || params[i]->cond_finished.reltimed_wait(1, 0));
1177  params[i]->mutex_finished.unlock();
1178  pthread_join(emitter_thread[i], NULL);
1179  delete params[i];
1180  }
1181 
1182  ASSERT_TRUE(wait_for_finished(&thread_params));
1183  pthread_join(waiter_thread, NULL);
1184 }
1185 
1186 /** Test whether all waiters are always released at the same time, even if one
1187  * waiter called wait after one emitter already emitted. In particular, this
1188  * tests the following scenario:
1189  * 1. waiter1: wait
1190  * 2. emitter1: emit
1191  * 3. waiter2: wait
1192  * 4. emitter2: emit
1193  * 5. both waiter1 and waiter2 are released
1194  */
1195 TEST_F(SyncPointManagerTest, WaitersAreAlwaysReleasedSimultaneouslyTest)
1196 {
1197  string sp_identifier = "/test";
1198  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", sp_identifier);
1199  manager->get_syncpoint("emitter2", sp_identifier);
1200  sp->register_emitter("emitter1");
1201  sp->register_emitter("emitter2");
1202  uint num_threads = 2;
1203  pthread_t threads[num_threads];
1204  waiter_thread_params params[num_threads];
1205  for (uint i = 0; i < num_threads; i++) {
1206  params[i].component = "component " + to_string(i);
1207  params[i].manager = manager;
1208  params[i].type = SyncPoint::WAIT_FOR_ALL;
1209  params[i].thread_nr = i;
1210  params[i].num_wait_calls = 1;
1211  params[i].sp_identifier = sp_identifier;
1212  }
1213  pthread_create(&threads[0], &attrs, start_waiter_thread, &params[0]);
1214  ASSERT_FALSE(wait_for_finished(&params[0], 0, 10 * pow(10, 6)));
1215  sp->emit("emitter1");
1216  ASSERT_FALSE(wait_for_finished(&params[0], 0, 10 * pow(10, 6)));
1217  pthread_create(&threads[1], &attrs, start_waiter_thread, &params[1]);
1218  for (uint i = 0; i < num_threads; i++) {
1219  ASSERT_FALSE(wait_for_finished(&params[i], 0, 10 * pow(10, 6)));
1220  }
1221  sp->emit("emitter2");
1222  for (uint i = 0; i < num_threads; i++) {
1223  ASSERT_TRUE(wait_for_finished(&params[i]));
1224  pthread_join(threads[i], NULL);
1225  }
1226 }
1227 
1228 /** Test whether all syncpoints are released simultaneously if a timeout occurs;
1229  * i.e. make sure that only the first waiter's timeout matters and all
1230  * subsequent waiters are released when the first waiter times out.
1231  */
1232 TEST_F(SyncPointManagerTest, WaitersTimeoutSimultaneousReleaseTest)
1233 {
1234  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
1235  sp->register_emitter("emitter1");
1236  uint num_threads = 2;
1237  pthread_t threads[num_threads];
1238  string sp_identifier = "/test";
1239  waiter_thread_params params[num_threads];
1240  for (uint i = 0; i < num_threads; i++) {
1241  params[i].component = "component " + to_string(i);
1242  params[i].type = SyncPoint::WAIT_FOR_ALL;
1243  params[i].manager = manager;
1244  params[i].thread_nr = i;
1245  params[i].num_wait_calls = 1;
1246  params[i].timeout_sec = 0;
1247  params[i].timeout_nsec = 100 * pow(10, 6);
1248  params[i].sp_identifier = sp_identifier;
1249  }
1250  pthread_create(&threads[0], &attrs, start_waiter_thread, &params[0]);
1251  EXPECT_TRUE(wait_for_running(&params[0]));
1252  params[1].timeout_sec = 5;
1253  params[1].timeout_nsec = 0;
1254  pthread_create(&threads[1], &attrs, start_waiter_thread, &params[1]);
1255  for (uint i = 0; i < num_threads; i++) {
1256  EXPECT_TRUE(wait_for_running(&params[i]));
1257  }
1258  wait_for_finished(&params[0], params[0].timeout_sec, params[0].timeout_nsec);
1259  wait_for_finished(&params[1], 0, pow(10, 6));
1260  for (uint i = 0; i < num_threads; i++) {
1261  pthread_join(threads[i], NULL);
1262  }
1263 }
1264 
1265 /** Similar as before, test if the timeout is handled properly. This time, let
1266  * a wait_for_one with a short timeout step by. The other waiters should not be
1267  * affected, i.e. they should still be waiting even when the timeout for the
1268  * wait_for_one occurred.
1269  * In other words, wait_for_one waiters are handled completeley separately.
1270  */
1271 TEST_F(SyncPointManagerTest, WaitForOneSeparateTimeoutTest)
1272 {
1273  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
1274  sp->register_emitter("emitter1");
1275  string sp_identifier = "/test";
1276  uint num_threads = 2;
1277  Barrier * barrier = new Barrier(num_threads + 2);
1278  pthread_t wait_for_one_thread;
1279  waiter_thread_params wait_for_one_params;
1280  wait_for_one_params.component = "wait_for_one";
1281  wait_for_one_params.type = SyncPoint::WAIT_FOR_ONE;
1282  wait_for_one_params.manager = manager;
1283  wait_for_one_params.thread_nr = 2;
1284  wait_for_one_params.num_wait_calls = 1;
1285  wait_for_one_params.timeout_sec = 0;
1286  wait_for_one_params.timeout_nsec = 100 * pow(10, 6);
1287  wait_for_one_params.status = PENDING;
1288  wait_for_one_params.sp_identifier = sp_identifier;
1289  wait_for_one_params.start_barrier = barrier;
1290  pthread_create(&wait_for_one_thread, &attrs, start_waiter_thread, &wait_for_one_params);
1291  pthread_t threads[num_threads];
1292  waiter_thread_params params[num_threads];
1293  for (uint i = 0; i < num_threads; i++) {
1294  params[i].component = "component " + to_string(i);
1295  params[i].type = SyncPoint::WAIT_FOR_ALL;
1296  params[i].manager = manager;
1297  params[i].thread_nr = i;
1298  params[i].num_wait_calls = 1;
1299  params[i].timeout_sec = 1;
1300  params[i].timeout_nsec = 0;
1301  params[i].sp_identifier = sp_identifier;
1302  params[i].start_barrier = barrier;
1303  pthread_create(&threads[i], &attrs, start_waiter_thread, &params[i]);
1304  }
1305  barrier->wait();
1306  EXPECT_TRUE(wait_for_running(&wait_for_one_params));
1307  for (uint i = 0; i < num_threads; i++) {
1308  EXPECT_TRUE(wait_for_running(&params[i]));
1309  }
1310  EXPECT_TRUE(wait_for_finished(&wait_for_one_params));
1311  for (uint i = 0; i < num_threads; i++) {
1312  EXPECT_EQ(RUNNING, params[i].status);
1313  }
1314  for (uint i = 0; i < num_threads; i++) {
1315  EXPECT_TRUE(wait_for_finished(&params[i], params[i].timeout_sec, params[i].timeout_nsec));
1316  pthread_join(threads[i], NULL);
1317  }
1318  pthread_join(wait_for_one_thread, NULL);
1319 }
1320 
1321 TEST_F(SyncPointManagerTest, MultipleWaitsWithoutEmitters)
1322 {
1323  RefPtr<SyncPoint> sp = manager->get_syncpoint("waiter", "/test");
1324  pthread_t waiter_thread;
1326  thread_params.component = "waiter";
1327  thread_params.type = SyncPoint::WAIT_FOR_ALL;
1328  thread_params.manager = manager;
1329  thread_params.thread_nr = 1;
1330  thread_params.num_wait_calls = 2;
1331  thread_params.sp_identifier = "/test";
1332  pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1333  ASSERT_TRUE(wait_for_finished(&thread_params));
1334  pthread_join(waiter_thread, NULL);
1335 }
1336 
1337 TEST_F(SyncPointManagerTest, ReleaseOfEmitterThrowsException)
1338 {
1339  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1340  sp->register_emitter("emitter");
1341  ASSERT_THROW(manager->release_syncpoint("emitter", sp), SyncPointCannotReleaseEmitter);
1342 }
1343 
1344 TEST_F(SyncPointManagerTest, UnregisterNonEmitter)
1345 {
1346  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1347  // "emitter" is a watcher but not an emitter
1348  EXPECT_NO_THROW(sp->unregister_emitter("emitter"));
1349  // "foo" is not known to the syncpoint
1350  EXPECT_NO_THROW(sp->unregister_emitter("foo"));
1351 }
1352 
1353 TEST_F(SyncPointManagerTest, ReleaseBarrierWaiter)
1354 {
1355  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1356  sp->register_emitter("emitter");
1357  pthread_t waiter_thread;
1359  thread_params.component = "component 1";
1360  thread_params.type = SyncPoint::WAIT_FOR_ALL;
1361  thread_params.manager = manager;
1362  thread_params.thread_nr = 1;
1363  thread_params.num_wait_calls = 1;
1364  thread_params.sp_identifier = "/test";
1365  thread_params.timeout_sec = 2;
1366  pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1367  EXPECT_TRUE(wait_for_running(&thread_params));
1368  ASSERT_TRUE(sp->watcher_is_waiting("component 1", SyncPoint::WAIT_FOR_ALL));
1369  pthread_cancel(waiter_thread);
1370  pthread_join(waiter_thread, NULL);
1371  ASSERT_TRUE(sp->watcher_is_waiting("component 1", SyncPoint::WAIT_FOR_ALL));
1372  manager->release_syncpoint("component 1", sp);
1373  sp = manager->get_syncpoint("component 1", "/test");
1374  EXPECT_NO_THROW(sp->reltime_wait_for_all("component 1", 0, pow(10, 6)));
1375 }
Invalid identifier used (i.e.
Definition: exceptions.h:121
Wait until a given condition holds.
pthread_attr_t attrs
Thread attributes.
The component called release but is still registered as emitter.
Definition: exceptions.h:202
virtual ~Emitter()
Destructor.
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
Definition: syncpoint.cpp:565
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Test class for SyncPoint This class tests basic functionality of SyncPoints.
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:153
RefPtr< SyncPoint > sp1
Syncpoints for testing purposes.
SyncPointManagerTest()
Initialize the test class.
Emitter(string identifier, string syncbarrier, RefPtr< SyncPointManager > manager)
Constructor.
void wake_all()
Wake up all waiting threads.
Mutex locking helper.
Definition: mutex_locker.h:33
virtual ~SyncPointManagerTest()
Deinitialize the test class.
virtual void emit(const std::string &component)
send a signal to all waiting threads
Definition: syncpoint.cpp:148
WaitCondition cond_finished
WaitCondition to indicate that the thread has finished.
MultiLogger * logger_
Logger used to initialize SyncPoints.
Logging Cache.
Definition: cache.h:37
std::set< std::string > get_watchers() const
Definition: syncpoint.cpp:516
RefPtr< SyncPoint > get_syncpoint(const std::string &component, const std::string &identifier)
Get a SyncPoint.
Test class for SyncPointManager This class tests basic functionality of the SyncPointManager.
WaitCondition cond_running
WaitCondition to indicate that the thread is running.
std::multiset< std::string > get_emitters() const
Definition: syncpoint.cpp:543
Test SyncBarriers.
virtual void SetUp()
Initialize the test class.
Log through multiple loggers.
Definition: multi.h:34
atomic< ThreadStatus > status
current status of the thread
CacheLogger * cache_logger_
Cache Logger used for testing.
virtual void TearDown()
Clean up.
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
Definition: syncpoint.h:56
This class gives access to SyncPoints.
The parameters passed to the threads.
Mutex mutex_finished
Mutex to protect cond_finished.
void emit()
emit the SyncBarrier
struct used for multithreading tests
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
Definition: syncpoint.cpp:385
uint num_wait_calls
Number of wait calls the thread should make.
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
Definition: syncpoint.cpp:454
SyncBarrierTest()
Constructor.
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
Definition: syncpoint.cpp:416
SyncPoint::WakeupType type
Wait type.
RefPtr< SyncPointManager > manager
A Pointer to a SyncPointManager.
Invalid component name used (i.e.
Definition: exceptions.h:139
virtual void register_emitter(const std::string &component)
register as emitter
Definition: syncpoint.cpp:437
uint timeout_sec
timeout in sec
Helper class which registers and emits a given SyncBarrier.
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
Definition: syncpoint.cpp:363
uint thread_nr
Thread number.
MultiLogger * logger_
Logger for testing.
A component called wait() but is already waiting.
Definition: exceptions.h:155
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Definition: exceptions.h:173
Compare sets of syncpoints.
Definition: syncpoint.h:43
std::string get_identifier() const
Definition: syncpoint.cpp:105
Mutex mutex_running
Mutex to protect cond_running.
Barrier * start_barrier
Barrier for startup synchronization.
void lock()
Lock this mutex.
Definition: mutex.cpp:87
RefPtr< SyncPointManager > manager
SyncPointManager passed to the thread.
Mutex mutual exclusion lock.
Definition: mutex.h:32
uint timeout_nsec
timeout in nsec
The SyncPoint class.
Definition: syncpoint.h:49
string component
Name of the component.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:31
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
Definition: syncpoint.cpp:239
string sp_identifier
Name of the SyncPoint.