Fawkes API  Fawkes Development Version
test_syncpoint.cpp
1 /***************************************************************************
2  * test_syncpoint.cpp - SyncPoint Unit Test
3  *
4  * Created: Wed Jan 22 11:17:43 2014
5  * Copyright 2014-2018 Till Hofmann
6  *
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include <gtest/gtest.h>
23 
24 #include <pthread.h>
25 #ifdef __FreeBSD__
26 # include <pthread_np.h>
27 #endif
28 #include <core/threading/barrier.h>
29 #include <core/threading/mutex.h>
30 #include <core/threading/mutex_locker.h>
31 #include <core/threading/wait_condition.h>
32 #include <core/utils/refptr.h>
33 #include <libs/syncpoint/exceptions.h>
34 #include <libs/syncpoint/syncpoint.h>
35 #include <libs/syncpoint/syncpoint_manager.h>
36 #include <logging/cache.h>
37 #include <logging/multi.h>
38 #include <sys/time.h>
39 
40 #include <atomic>
41 #include <cmath>
42 #include <errno.h>
43 #include <string>
44 #include <time.h>
45 #include <unistd.h>
46 
47 using namespace fawkes;
48 using namespace std;
49 
50 /** @class SyncPointTest
51  * Test class for SyncPoint
52  * This class tests basic functionality of SyncPoints
53  */
54 class SyncPointTest : public ::testing::Test
55 {
56 protected:
57  /**
58  * Initialize the test class
59  */
60  virtual void
62  {
63  logger_ = new MultiLogger();
64  string id1 = "/id1";
65  string id2 = "/id2";
66  sp1 = new SyncPoint(id1, logger_);
67  sp2 = new SyncPoint(id1, logger_);
68  sp3 = new SyncPoint(id2, logger_);
69  }
70 
71  /** Clean up */
72  virtual void
74  {
75  delete logger_;
76  }
77 
78  /** A syncpoint for testing */
80  /** A syncpoint for testing */
82  /** A syncpoint for testing */
84 
85  /** Logger for testing */
87 };
88 
89 /** @class SyncPointManagerTest
90  * Test class for SyncPointManager
91  * This class tests basic functionality of the SyncPointManager
92  */
93 class SyncPointManagerTest : public ::testing::Test
94 {
95 protected:
96  /**
97  * Initialize the test class
98  */
100  {
101  logger_ = new MultiLogger();
102  cache_logger_ = new CacheLogger();
103  logger_->add_logger(cache_logger_);
104  manager = new SyncPointManager(logger_);
105 
106  pthread_attr_init(&attrs);
107  }
108 
109  /**
110  * Deinitialize the test class
111  */
113  {
114  pthread_attr_destroy(&attrs);
115  delete logger_;
116  // delete cache_logger_;
117  }
118 
119  /**
120  * A Pointer to a SyncPointManager
121  */
123 
124  /** Logger used to initialize SyncPoints */
126 
127  /** Cache Logger used for testing */
129 
130  /** Thread attributes */
131  pthread_attr_t attrs;
132 };
133 
134 /** @class SyncBarrierTest
135  * Test SyncBarriers
136  */
138 {
139 protected:
140  /** Constructor. */
142  {
143  }
144 };
145 
146 TEST_F(SyncPointTest, CreateSyncPoint)
147 {
148  ASSERT_TRUE(*sp1 != NULL);
149 }
150 
151 TEST_F(SyncPointTest, Equals)
152 {
153  // RefPtr<SyncPoint>
154  ASSERT_NE(sp1, sp2);
155  // SyncPoint*
156  ASSERT_NE(*sp1, *sp2);
157  // SyncPoint
158  ASSERT_EQ(**sp1, **sp2);
159 }
160 
161 TEST_F(SyncPointTest, LessThan)
162 {
163  ASSERT_LT(**sp1, **sp3);
164  ASSERT_FALSE(**sp3 < **sp1);
165  ASSERT_FALSE(**sp1 < **sp2);
166  ASSERT_FALSE(**sp2 < **sp1);
167 }
168 
169 TEST_F(SyncPointTest, SyncPointSets)
170 {
171  using namespace std;
172  set<RefPtr<SyncPoint>, SyncPointSetLessThan> sp_set;
173  pair<set<RefPtr<SyncPoint>>::iterator, bool> ret;
174 
175  // insert sp1
176  ret = sp_set.insert(sp1);
177  ASSERT_TRUE(ret.second);
178  ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
179 
180  // insert sp3
181  ret = sp_set.insert(sp3);
182  ASSERT_TRUE(ret.second);
183  ASSERT_EQ(sp3->get_identifier(), (*(ret.first))->get_identifier());
184 
185  // insert sp1 again
186  ret = sp_set.insert(sp1);
187  ASSERT_FALSE(ret.second);
188  ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
189 
190  // insert sp2 (same as sp1)
191  ret = sp_set.insert(sp2);
192  ASSERT_FALSE(ret.second);
193  ASSERT_EQ(sp2->get_identifier(), (*(ret.first))->get_identifier());
194 }
195 
197 {
198  ASSERT_EQ(0u, manager->get_syncpoints().size());
199  manager->get_syncpoint("test", "/test/1");
200  ASSERT_EQ(3u, manager->get_syncpoints().size());
201  ASSERT_EQ(1u,
202  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
203  manager->get_syncpoint("test2", "/test/2");
204  ASSERT_EQ(4u, manager->get_syncpoints().size());
205  ASSERT_EQ(1u,
206  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
207  ASSERT_EQ(1u,
208  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/2", logger_))));
209  manager->get_syncpoint("test3", "/test/1");
210  ASSERT_EQ(4u, manager->get_syncpoints().size());
211  ASSERT_EQ(1u,
212  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
213  ASSERT_EQ(1u,
214  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/2", logger_))));
215  ASSERT_EQ(1u, manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/", logger_))));
216  ASSERT_EQ(1u,
217  manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test", logger_))));
218 }
219 
220 TEST_F(SyncPointManagerTest, WatcherSet)
221 {
222  ASSERT_NO_THROW(manager->get_syncpoint("component 1", "/test"));
223  ASSERT_NO_THROW(manager->get_syncpoint("component 2", "/test"));
224  ASSERT_NO_THROW(manager->get_syncpoint("component 3", "/test"));
225 }
226 
227 /** Test what happens if we acquire a SyncPoint, release it, and then acquire it
228  * again. If release_syncpoint works properly, this should not throw. Otherwise,
229  * we would expect a SyncPointAlreadyOpenedException
230  */
231 TEST_F(SyncPointManagerTest, ReleaseAndReacquire)
232 {
233  string comp = "component";
234  string id = "/test/sp1";
235  RefPtr<SyncPoint> sp = manager->get_syncpoint(comp, id);
236  set<RefPtr<SyncPoint>, SyncPointSetLessThan> syncpoints = manager->get_syncpoints();
237  ASSERT_EQ(1, syncpoints.count(RefPtr<SyncPoint>(new SyncPoint("/test", logger_))));
238  for (set<RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
239  sp_it++) {
240  EXPECT_EQ(1, (*sp_it)->get_watchers().count(comp))
241  << "for component '" << comp << "' and SyncPoint '" << (*sp_it)->get_identifier() << "'";
242  }
243  manager->release_syncpoint(comp, sp);
244  for (set<RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
245  sp_it++) {
246  EXPECT_EQ(0, (*sp_it)->get_watchers().count(comp))
247  << "for component '" << comp << "' and SyncPoint '" << (*sp_it)->get_identifier() << "'";
248  }
249  ASSERT_NO_THROW(manager->get_syncpoint(comp, id));
250 }
251 
252 TEST_F(SyncPointTest, EmptyIdentifier)
253 {
254  ASSERT_THROW(sp1 = new SyncPoint("", NULL), SyncPointInvalidIdentifierException);
255 }
256 
257 TEST_F(SyncPointTest, InvalidIdentifier)
258 {
259  EXPECT_THROW(sp1 = new SyncPoint("invalid", NULL), SyncPointInvalidIdentifierException);
260  EXPECT_NO_THROW(sp1 = new SyncPoint("/", NULL));
261  EXPECT_THROW(sp1 = new SyncPoint("/test/", NULL), SyncPointInvalidIdentifierException);
262 }
263 
264 TEST_F(SyncPointManagerTest, SyncPointManagerExceptions)
265 {
266  RefPtr<SyncPoint> invalid_sp;
267  ASSERT_THROW(invalid_sp = manager->get_syncpoint("", "/test/sp1"),
269 
270  // make sure syncpoint_manager doesn't catch the exceptions thrown by SyncPoint
271  ASSERT_THROW(invalid_sp = manager->get_syncpoint("waiter", ""),
273  ASSERT_THROW(invalid_sp = manager->get_syncpoint("waiter", "invalid"),
275 }
276 
277 TEST_F(SyncPointManagerTest, SyncPointHierarchyRegisteredWatchers)
278 {
279  string comp = "component1";
280  string id = "/test/sp1";
281  RefPtr<SyncPoint> sp = manager->get_syncpoint(comp, "/test/sp1");
282  set<RefPtr<SyncPoint>, SyncPointSetLessThan> syncpoints = manager->get_syncpoints();
283  set<RefPtr<SyncPoint>>::iterator sp_test_it =
284  syncpoints.find(RefPtr<SyncPoint>(new SyncPoint("/test", logger_)));
285  set<RefPtr<SyncPoint>>::iterator sp_root_it =
286  syncpoints.find(RefPtr<SyncPoint>(new SyncPoint("/", logger_)));
287  ASSERT_NE(syncpoints.end(), sp_test_it);
288  ASSERT_NE(syncpoints.end(), sp_root_it);
289  RefPtr<SyncPoint> sp_test = *sp_test_it;
290  RefPtr<SyncPoint> sp_root = *sp_root_it;
291  EXPECT_EQ(1, syncpoints.count(sp_test));
292  EXPECT_EQ(1, syncpoints.count(sp_root));
293  EXPECT_EQ(1, sp->get_watchers().count(comp));
294  EXPECT_EQ(1, sp_test->get_watchers().count(comp));
295  EXPECT_EQ(0, sp_test->get_watchers().count(id));
296  EXPECT_EQ(1, sp_root->get_watchers().count(comp));
297  EXPECT_EQ(0, sp_root->get_watchers().count(id));
298  EXPECT_EQ(0, sp_root->get_watchers().count(sp_test->get_identifier()));
299 
300  manager->release_syncpoint(comp, sp);
301  EXPECT_EQ(0, sp_test->get_watchers().count(id));
302 }
303 
304 TEST_F(SyncPointManagerTest, SyncPointComponentRegistersForMultipleSyncPoints)
305 {
306  string comp = "component1";
307  string sp1_id = "/test/sp1";
308  string sp2_id = "/test/sp2";
309  RefPtr<SyncPoint> sp1 = manager->get_syncpoint(comp, sp1_id);
310  // the following should not throw
311  // if it does, registering for the predecessor '/test' may be broken
312  RefPtr<SyncPoint> sp2 = manager->get_syncpoint(comp, sp2_id);
313  RefPtr<SyncPoint> predecessor =
314  *manager->get_syncpoints().find(RefPtr<SyncPoint>(new SyncPoint("/test", logger_)));
315  EXPECT_EQ(1, sp1->get_watchers().count(comp))
316  << comp << " is not registered for " << sp1->get_identifier() << ", but should be!";
317  EXPECT_EQ(1, sp2->get_watchers().count(comp))
318  << comp << " is not registered for " << sp2->get_identifier() << ", but should be!";
319  EXPECT_EQ(1, predecessor->get_watchers().count(comp))
320  << comp << " is not registered for " << predecessor->get_identifier() << ", but should be!";
321 
322  manager->release_syncpoint(comp, sp1);
323  EXPECT_EQ(1, sp2->get_watchers().count(comp));
324  EXPECT_EQ(1, predecessor->get_watchers().count(comp))
325  << comp << " is not registered for " << predecessor->get_identifier() << ", but should be!";
326 }
327 
328 enum ThreadStatus { PENDING, RUNNING, FINISHED };
329 
330 /** struct used for multithreading tests */
332 {
333  /** SyncPointManager passed to the thread */
335  /** Thread number */
336  uint thread_nr = 0;
337  /** Wait type */
338  SyncPoint::WakeupType type = SyncPoint::WAIT_FOR_ONE;
339  /** Number of wait calls the thread should make */
341  /** Name of the SyncPoint */
343  /** Name of the component */
344  string component = "";
345  /** timeout in sec */
346  uint timeout_sec = 0;
347  /** timeout in nsec */
348  uint timeout_nsec = 0;
349  /** current status of the thread */
350  atomic<ThreadStatus> status;
351  /** Mutex to protect cond_running */
353  /** WaitCondition to indicate that the thread is running */
354  WaitCondition cond_running = WaitCondition(&mutex_running);
355  /** Mutex to protect cond_finished */
357  /** WaitCondition to indicate that the thread has finished */
358  WaitCondition cond_finished = WaitCondition(&mutex_finished);
359  /** Barrier for startup synchronization. */
360  Barrier *start_barrier = nullptr;
361 };
362 
363 /** Helper function to wait for a thread to be running */
364 bool
365 wait_for_running(waiter_thread_params *params, long int sec = 1, long int nanosec = 0)
366 {
367  RefPtr<SyncPoint> sp = params->manager->get_syncpoint("test_runner", params->sp_identifier);
368  const int wait_time_us = 1000;
369  for (uint i = 0; i < (sec * pow(10, 9) + nanosec) / (wait_time_us * pow(10, 3)); i++) {
370  if (sp->watcher_is_waiting(params->component, params->type)) {
371  return true;
372  }
373  usleep(wait_time_us);
374  }
375  return false;
376 }
377 
378 /** Helper function to wait for a thread to be finished */
379 bool
380 wait_for_finished(waiter_thread_params *params, long int sec = 1, long int nanosec = 0)
381 {
382  MutexLocker ml(params->mutex_finished);
383  if (params->status == FINISHED) {
384  return true;
385  } else {
386  return params->cond_finished.reltimed_wait(sec, nanosec);
387  }
388 }
389 
390 /** get a SyncPoint and wait for it */
391 void *
392 start_waiter_thread(void *data)
393 {
394  waiter_thread_params *params = (waiter_thread_params *)data;
395  string component = params->component;
396  RefPtr<SyncPoint> sp = params->manager->get_syncpoint(component, params->sp_identifier);
397  params->status = RUNNING;
398  if (params->start_barrier) {
399  params->start_barrier->wait();
400  }
401  params->mutex_running.lock();
402  params->cond_running.wake_all();
403  params->mutex_running.unlock();
404  for (uint i = 0; i < params->num_wait_calls; i++) {
405  sp->wait(component, params->type, params->timeout_sec, params->timeout_nsec);
406  }
407  params->status = FINISHED;
408  params->mutex_finished.lock();
409  params->cond_finished.wake_all();
410  params->mutex_finished.unlock();
411  pthread_exit(NULL);
412 }
413 
414 TEST_F(SyncPointManagerTest, MultipleWaits)
415 {
416  RefPtr<SyncPoint> sp_ref = manager->get_syncpoint("component", "/test/sp1");
417  pthread_t thread1;
418  waiter_thread_params params;
419  params.component = "component";
420  params.manager = manager;
421  params.num_wait_calls = 1;
422  params.sp_identifier = "/test/sp1";
423  pthread_create(&thread1, &attrs, start_waiter_thread, &params);
424  wait_for_running(&params);
425  ASSERT_THROW(sp_ref->wait("component"), SyncPointMultipleWaitCallsException);
426  pthread_cancel(thread1);
427  pthread_join(thread1, NULL);
428 }
429 
430 /** Create multiple threads which will all call get_syncpoint
431  * for the same SyncPoint. Do not wait for the SyncPoint but return
432  * immediately.
433  */
434 TEST_F(SyncPointManagerTest, MultipleManagerRequests)
435 {
436  uint num_threads = 50;
437  pthread_t threads[num_threads];
438  waiter_thread_params *params[num_threads];
439  string sp_identifier = "/test/sp1";
440  for (uint i = 0; i < num_threads; i++) {
441  params[i] = new waiter_thread_params();
442  params[i]->component = "component " + to_string(i);
443  params[i]->manager = manager;
444  params[i]->thread_nr = i;
445  params[i]->num_wait_calls = 0;
446  params[i]->sp_identifier = sp_identifier;
447  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
448  pthread_yield();
449  ASSERT_LE(manager->get_syncpoints().size(), 3u);
450  }
451 
452  for (uint i = 0; i < num_threads; i++) {
453  pthread_join(threads[i], NULL);
454  delete params[i];
455  }
456 }
457 
458 /** start multiple threads and let them wait.
459  * This just tests whether there are any segfaults.
460  * No assertions are made.
461  */
462 TEST_F(SyncPointManagerTest, ParallelWaitCalls)
463 {
464  uint num_threads = 50;
465  uint num_wait_calls = 10;
466  pthread_t threads[num_threads];
467  waiter_thread_params *params[num_threads];
468  string sp_identifier = "/test/sp1";
469  for (uint i = 0; i < num_threads; i++) {
470  params[i] = new waiter_thread_params();
471  params[i]->component = "component " + to_string(i);
472  params[i]->manager = manager;
473  params[i]->thread_nr = i;
474  params[i]->num_wait_calls = num_wait_calls;
475  params[i]->sp_identifier = sp_identifier;
476  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
477  pthread_yield();
478  ASSERT_LE(manager->get_syncpoints().size(), 3u);
479  }
480 
481  for (uint i = 0; i < num_threads; i++) {
482  EXPECT_TRUE(wait_for_running(params[i]));
483  }
484  for (uint i = 0; i < num_threads; i++) {
485  pthread_cancel(threads[i]);
486  ASSERT_EQ(0, pthread_join(threads[i], NULL));
487  delete params[i];
488  }
489 }
490 
491 /** start multiple threads, let them wait for a SyncPoint,
492  * emit the SyncPoint and verify that they all returned
493  */
494 TEST_F(SyncPointManagerTest, ParallelWaitsReturn)
495 {
496  uint num_threads = 10;
497  uint num_wait_calls = 5;
498  pthread_t threads[num_threads];
499  waiter_thread_params *params[num_threads];
500  string sp_identifier = "/test/sp1";
501  for (uint i = 0; i < num_threads; i++) {
502  params[i] = new waiter_thread_params();
503  params[i]->component = "component " + to_string(i);
504  params[i]->manager = manager;
505  params[i]->thread_nr = i;
506  params[i]->num_wait_calls = num_wait_calls;
507  params[i]->sp_identifier = sp_identifier;
508  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
509  pthread_yield();
510  }
511 
512  for (uint i = 0; i < num_threads; i++) {
513  EXPECT_TRUE(wait_for_running(params[i]));
514  }
515 
516  string component = "emitter";
517  RefPtr<SyncPoint> sp = manager->get_syncpoint(component, sp_identifier);
518  sp->register_emitter(component);
519  for (uint i = 0; i < num_wait_calls; i++) {
520  sp->emit(component);
521  usleep(20000);
522  }
523 
524  for (uint i = 0; i < num_threads; i++) {
525  ASSERT_TRUE(wait_for_finished(params[i]));
526  pthread_join(threads[i], NULL);
527  delete params[i];
528  }
529 }
530 
531 /** start multiple threads, let them wait for a SyncPoint,
532  * but don't emit the SyncPoint. Verify that they have not returned
533  */
534 TEST_F(SyncPointManagerTest, WaitDoesNotReturnImmediately)
535 {
536  uint num_threads = 50;
537  pthread_t threads[num_threads];
538  waiter_thread_params *params[num_threads];
539  for (uint i = 0; i < num_threads; i++) {
540  params[i] = new waiter_thread_params();
541  params[i]->component = "component " + to_string(i);
542  params[i]->manager = manager;
543  params[i]->thread_nr = i;
544  params[i]->num_wait_calls = 1;
545  params[i]->sp_identifier = "/test/sp1";
546  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
547  }
548 
549  for (uint i = 0; i < num_threads; i++) {
550  EXPECT_TRUE(wait_for_running(params[i]));
551  }
552 
553  for (uint i = 0; i < num_threads; i++) {
554  EXPECT_EQ(RUNNING, params[i]->status);
555  pthread_cancel(threads[i]);
556  ASSERT_EQ(0, pthread_join(threads[i], NULL));
557  delete params[i];
558  }
559 }
560 
561 /**
562  * Test the SyncPoint hierarchy.
563  * This creates a SyncPoint, an emitter and waiters which wait for the
564  * SyncPoint's predecessor, the predecessor's predecessor (grandparent),
565  * and the root SyncPoint ("/").
566  */
567 TEST_F(SyncPointManagerTest, SyncPointHierarchy)
568 {
569  vector<string> identifiers = {"/test/topic", "/test", "/", "/other/topic"};
570  uint num_threads = identifiers.size();
571  pthread_t threads[num_threads];
572  waiter_thread_params *params[num_threads];
573  for (uint i = 0; i < num_threads; i++) {
574  params[i] = new waiter_thread_params();
575  params[i]->component = "component " + to_string(i);
576  params[i]->manager = manager;
577  params[i]->thread_nr = i;
578  params[i]->num_wait_calls = 1;
579  params[i]->sp_identifier = identifiers.at(i);
580  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
581  }
582 
583  for (uint i = 0; i < num_threads; i++) {
584  EXPECT_TRUE(wait_for_running(params[i]));
585  }
586  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test/topic/sp");
587  sp->register_emitter("emitter");
588  sp->emit("emitter");
589 
590  /* The first waiters should be unblocked */
591  for (uint i = 0; i < num_threads - 1; i++) {
592  ASSERT_TRUE(wait_for_finished(params[i]));
593  pthread_join(threads[i], NULL);
594  delete params[i];
595  }
596 
597  /* The last waiter should still wait */
598  pthread_t last_thread = threads[num_threads - 1];
599  EXPECT_FALSE(wait_for_finished(params[num_threads - 1], 0, pow(10, 6)));
600  pthread_cancel(last_thread);
601  ASSERT_EQ(0, pthread_join(last_thread, NULL));
602 }
603 
604 /** Emit a barrier without registering */
605 TEST_F(SyncBarrierTest, EmitWithoutRegister)
606 {
607  string component = "emitter";
608  RefPtr<SyncPoint> barrier = manager->get_syncpoint(component, "/test/barrier");
609  ASSERT_THROW(barrier->emit(component), SyncPointNonEmitterCalledEmitException);
610 }
611 
612 /** Register multiple times
613  * This is allowed, but the component should then also emit multiple times */
614 TEST_F(SyncBarrierTest, MultipleRegisterCalls)
615 {
616  string component = "emitter";
617  RefPtr<SyncPoint> barrier = manager->get_syncpoint(component, "/test/barrier");
618  EXPECT_NO_THROW(barrier->register_emitter(component));
619  EXPECT_NO_THROW(barrier->register_emitter(component));
620 }
621 
622 /** get a SyncBarrier, register as emitter and emit */
623 void *
624 start_barrier_emitter_thread(void *data)
625 {
626  waiter_thread_params *params = (waiter_thread_params *)data;
627  string component = "emitter " + to_string(params->thread_nr);
629  EXPECT_NO_THROW(sp = params->manager->get_syncpoint(component, params->sp_identifier));
630  sp->register_emitter(component);
631  for (uint i = 0; i < params->num_wait_calls; i++) {
632  sp->emit(component);
633  }
634  pthread_exit(NULL);
635 }
636 
637 /** Helper class which registers and emits a given SyncBarrier */
638 class Emitter
639 {
640 public:
641  /** Constructor.
642  * @param identifier The identifier of this emitter.
643  * @param syncbarrier The identifier of the SyncBarrier to register for.
644  * @param manager Pointer to the SyncPointManager to use.
645  */
646  Emitter(string identifier, string syncbarrier, RefPtr<SyncPointManager> manager)
647  : identifier_(identifier), manager_(manager)
648  {
649  barrier_ = manager->get_syncpoint(identifier_, syncbarrier);
650  barrier_->register_emitter(identifier_);
651  }
652 
653  /** Destructor. */
654  virtual ~Emitter()
655  {
656  barrier_->unregister_emitter(identifier_);
657  manager_->release_syncpoint(identifier_, barrier_);
658  }
659 
660  /** emit the SyncBarrier */
661  void
663  {
664  barrier_->emit(identifier_);
665  }
666 
667 private:
668  string identifier_;
669  RefPtr<SyncPoint> barrier_;
670  RefPtr<SyncPointManager> manager_;
671 };
672 
673 /** Barrier: wait() returns immediately if no emitter is registered */
674 TEST_F(SyncBarrierTest, WaitWithNoRegisteredEmitter)
675 {
676  string barrier_id = "/test/barrier";
677  RefPtr<SyncPoint> barrier = manager->get_syncpoint("main loop", barrier_id);
678  const uint num_waiter_threads = 1;
679  const uint num_wait_calls = 1;
680  pthread_t waiter_threads[num_waiter_threads];
681  waiter_thread_params *params[num_waiter_threads];
682  for (uint i = 0; i < num_waiter_threads; i++) {
683  params[i] = new waiter_thread_params();
684  params[i]->type = SyncPoint::WAIT_FOR_ALL;
685  params[i]->component = "component " + to_string(i);
686  params[i]->manager = manager;
687  params[i]->thread_nr = i;
688  params[i]->num_wait_calls = num_wait_calls;
689  params[i]->sp_identifier = barrier_id;
690  pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
691  }
692  for (uint i = 0; i < num_waiter_threads; i++) {
693  ASSERT_TRUE(wait_for_finished(params[i]));
694  pthread_join(waiter_threads[i], NULL);
695  delete params[i];
696  }
697 }
698 
699 /** Start multiple threads, let them wait for a SyncBarrier,
700  * also have two threads registered as emitter.
701  * Let the first thread emit the barrier, assert the waiters did not unblock,
702  * then let the second thread emit.
703  * This tests the fundamental difference to a SyncPoint: With a SyncPoint,
704  * wait() returns if the SyncPoint is emitted by one component.
705  * With a SyncBarrier, all registered emitters need to emit the SyncBarrier
706  * before wait() returns.
707  */
708 TEST_F(SyncBarrierTest, WaitForAllEmitters)
709 {
710  string barrier_id = "/test/barrier";
711  Emitter em1("emitter 1", barrier_id, manager);
712  Emitter em2("emitter 2", barrier_id, manager);
713 
714  RefPtr<SyncPoint> barrier = manager->get_syncpoint("main loop", barrier_id);
715 
716  const uint num_waiter_threads = 50;
717  const uint num_wait_calls = 1;
718  pthread_t waiter_threads[num_waiter_threads];
719  waiter_thread_params *params[num_waiter_threads];
720  for (uint i = 0; i < num_waiter_threads; i++) {
721  params[i] = new waiter_thread_params();
722  params[i]->component = "component " + to_string(i);
723  params[i]->type = SyncPoint::WAIT_FOR_ALL;
724  params[i]->manager = manager;
725  params[i]->thread_nr = i;
726  params[i]->num_wait_calls = num_wait_calls;
727  params[i]->sp_identifier = barrier_id;
728  pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
729  }
730 
731  for (uint i = 0; i < num_waiter_threads; i++) {
732  EXPECT_TRUE(wait_for_running(params[i]));
733  }
734 
735  em1.emit();
736 
737  for (uint i = 0; i < num_waiter_threads; i++) {
738  EXPECT_EQ(RUNNING, params[i]->status);
739  }
740 
741  em1.emit();
742  em2.emit();
743 
744  for (uint i = 0; i < num_waiter_threads; i++) {
745  ASSERT_TRUE(wait_for_finished(params[i]));
746  pthread_join(waiter_threads[i], NULL);
747  delete params[i];
748  }
749 }
750 
751 /** two barriers, emit the first one. Only the threads waiting on the first
752  * barrier should unblock
753  */
754 TEST_F(SyncBarrierTest, BarriersAreIndependent)
755 {
756  string barrier1_id = "/test/barrier1";
757  string barrier2_id = "/test/barrier2";
758  Emitter em1("em1", barrier1_id, manager);
759  Emitter em2("em2", barrier2_id, manager);
760 
761  RefPtr<SyncPoint> barrier1 = manager->get_syncpoint("m1", barrier1_id);
762 
763  RefPtr<SyncPoint> barrier2 = manager->get_syncpoint("m2", barrier2_id);
764 
765  const uint num_waiter_threads = 50;
766  const uint num_wait_calls = 1;
767  pthread_t waiter_threads1[num_waiter_threads];
768  waiter_thread_params *params1[num_waiter_threads];
769  for (uint i = 0; i < num_waiter_threads; i++) {
770  params1[i] = new waiter_thread_params();
771  params1[i]->component = "component " + to_string(i);
772  params1[i]->type = SyncPoint::WAIT_FOR_ALL;
773  params1[i]->manager = manager;
774  params1[i]->thread_nr = i;
775  params1[i]->num_wait_calls = num_wait_calls;
776  params1[i]->sp_identifier = barrier1_id;
777  pthread_create(&waiter_threads1[i], &attrs, start_waiter_thread, params1[i]);
778  }
779 
780  pthread_t waiter_threads2[num_waiter_threads];
781  waiter_thread_params *params2[num_waiter_threads];
782  for (uint i = 0; i < num_waiter_threads; i++) {
783  params2[i] = new waiter_thread_params();
784  params2[i]->component = "component " + to_string(i);
785  params2[i]->type = SyncPoint::WAIT_FOR_ALL;
786  params2[i]->manager = manager;
787  params2[i]->thread_nr = num_waiter_threads + i;
788  params2[i]->num_wait_calls = num_wait_calls;
789  params2[i]->sp_identifier = barrier2_id;
790  pthread_create(&waiter_threads2[i], &attrs, start_waiter_thread, params2[i]);
791  }
792 
793  for (uint i = 0; i < num_waiter_threads; i++) {
794  EXPECT_TRUE(wait_for_running(params1[i]));
795  }
796 
797  for (uint i = 0; i < num_waiter_threads; i++) {
798  EXPECT_TRUE(wait_for_running(params2[i]));
799  }
800 
801  em1.emit();
802 
803  for (uint i = 0; i < num_waiter_threads; i++) {
804  ASSERT_TRUE(wait_for_finished(params1[i]));
805  pthread_join(waiter_threads1[i], NULL);
806  delete params1[i];
807  }
808 
809  for (uint i = 0; i < num_waiter_threads; i++) {
810  EXPECT_EQ(RUNNING, params2[i]->status);
811  }
812 
813  em2.emit();
814 
815  for (uint i = 0; i < num_waiter_threads; i++) {
816  ASSERT_TRUE(wait_for_finished(params2[i]));
817  pthread_join(waiter_threads2[i], NULL);
818  delete params2[i];
819  }
820 }
821 
822 /**
823  * Test the SyncBarrier hierarchy, similar to the SyncPoint hierarchy test.
824  * This creates a SyncBarrier, an emitter and waiters which wait for the
825  * SyncBarrier's predecessor, the predecessor's predecessor (grandparent),
826  * and the root SyncBarrier ("/").
827  */
828 TEST_F(SyncBarrierTest, SyncBarrierHierarchy)
829 {
830  Emitter em1("emitter 1", "/test/topic/b1", manager);
831  Emitter em2("emitter 2", "/test/topic/b2", manager);
832  Emitter em3("emitter 3", "/other/topic", manager);
833 
834  vector<string> identifiers = {"/test/topic", "/test", "/", "/other/topic"};
835  uint num_threads = identifiers.size();
836  pthread_t threads[num_threads];
837  waiter_thread_params *params[num_threads];
838  Barrier * barrier = new Barrier(num_threads + 1);
839  for (uint i = 0; i < num_threads; i++) {
840  params[i] = new waiter_thread_params();
841  params[i]->component = "component " + to_string(i);
842  params[i]->type = SyncPoint::WAIT_FOR_ALL;
843  params[i]->manager = manager;
844  params[i]->thread_nr = i;
845  params[i]->num_wait_calls = 1;
846  params[i]->sp_identifier = identifiers.at(i);
847  params[i]->start_barrier = barrier;
848  pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
849  }
850 
851  barrier->wait();
852  delete barrier;
853 
854  for (uint i = 0; i < num_threads; i++) {
855  EXPECT_TRUE(wait_for_running(params[i]));
856  }
857 
858  em1.emit();
859  for (uint i = 0; i < num_threads; i++) {
860  ASSERT_EQ(RUNNING, params[i]->status);
861  }
862  em2.emit();
863  /* The first waiters should be unblocked */
864  for (uint i = 0; i < num_threads - 2; i++) {
865  ASSERT_TRUE(wait_for_finished(params[i]));
866  pthread_join(threads[i], NULL);
867  delete params[i];
868  }
869  /* The last two waiters should still be waiting */
870  for (uint i = num_threads - 2; i < num_threads; i++) {
871  EXPECT_EQ(RUNNING, params[i]->status);
872  pthread_cancel(threads[i]);
873  ASSERT_EQ(0, pthread_join(threads[i], NULL));
874  delete params[i];
875  }
876 }
877 
878 /** One component registers as emitter for two syncpoints, two other components
879  * wait for the first and second syncpoint respectively.
880  * Then, the first component unregisters for the first syncpoint.
881  * Test whether it is still registered for the second syncpoint.
882  * A third waiter waits for the predecessor syncpoint and should also still be
883  * waiting after the emitter has unregistered for the first syncpoint.
884  */
885 TEST_F(SyncPointManagerTest, OneEmitterRegistersForMultipleSyncPointsHierarchyTest)
886 {
887  string id_sp1 = "/test/sp1";
888  string id_sp2 = "/test/sp2";
889  string id_sp_pred = "/test";
890  string id_emitter = "component_emitter";
891  string id_waiter1 = "component_waiter1";
892  string id_waiter2 = "component_waiter2";
893  string id_waiter3 = "component_waiter_on_predecessor";
894 
895  RefPtr<SyncPoint> sp1 = manager->get_syncpoint(id_emitter, id_sp1);
896  RefPtr<SyncPoint> sp2 = manager->get_syncpoint(id_emitter, id_sp2);
897  manager->get_syncpoint(id_waiter1, id_sp1);
898  manager->get_syncpoint(id_waiter2, id_sp2);
899  RefPtr<SyncPoint> pred = manager->get_syncpoint(id_waiter3, id_sp_pred);
900  sp1->register_emitter(id_emitter);
901  sp2->register_emitter(id_emitter);
902  EXPECT_EQ(1, sp1->get_emitters().count(id_emitter));
903  EXPECT_EQ(1, sp2->get_emitters().count(id_emitter));
904  // this should be 2 as the emitter has registered twice
905  EXPECT_EQ(2, pred->get_emitters().count(id_emitter));
906 
908  params1->manager = manager;
909  params1->component = id_waiter1;
910  params1->type = SyncPoint::WAIT_FOR_ALL;
911  params1->num_wait_calls = 1;
912  params1->sp_identifier = id_sp1;
913 
915  params2->manager = manager;
916  params2->component = id_waiter2;
917  params2->type = SyncPoint::WAIT_FOR_ALL;
918  params2->num_wait_calls = 1;
919  params2->sp_identifier = id_sp2;
920 
922  params3->manager = manager;
923  params3->component = id_waiter3;
924  params3->type = SyncPoint::WAIT_FOR_ALL;
925  params3->num_wait_calls = 1;
926  params3->sp_identifier = id_sp_pred;
927 
928  pthread_t pthread1;
929  pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
930  pthread_t pthread2;
931  pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
932  pthread_t pthread3;
933  pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
934  EXPECT_TRUE(wait_for_running(params1));
935  EXPECT_TRUE(wait_for_running(params2));
936  EXPECT_TRUE(wait_for_running(params3));
937 
938  sp1->emit(id_emitter);
939 
940  ASSERT_TRUE(wait_for_finished(params1));
941  ASSERT_FALSE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
942  // this should be waiting as the component has registered twice for '/test'
943  // and thus should emit '/test' also twice (by hierarchical emit calls)
944  ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
945  sp2->emit(id_emitter);
946  ASSERT_TRUE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
947  ASSERT_TRUE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
948 
949  pthread_join(pthread1, NULL);
950  pthread_join(pthread2, NULL);
951  pthread_join(pthread3, NULL);
952 
953  sp2->unregister_emitter(id_emitter);
954  EXPECT_EQ(1, sp1->get_emitters().count(id_emitter));
955  EXPECT_EQ(0, sp2->get_emitters().count(id_emitter));
956  EXPECT_EQ(1, pred->get_emitters().count(id_emitter));
957 
958  pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
959  pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
960  pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
961 
962  ASSERT_TRUE(wait_for_running(params1));
963  ASSERT_TRUE(wait_for_running(params3));
964 
965  ASSERT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
966  ASSERT_TRUE(wait_for_finished(params2));
967  ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
968 
969  sp1->emit(id_emitter);
970  ASSERT_TRUE(wait_for_finished(params1));
971  ASSERT_TRUE(wait_for_finished(params3));
972  pthread_join(pthread1, NULL);
973  pthread_join(pthread2, NULL);
974  pthread_join(pthread3, NULL);
975  delete params1;
976  delete params2;
977  delete params3;
978 }
979 
980 /** Test if an exception is thrown if a registered emitter is currently not
981  * pending
982  */
983 TEST_F(SyncBarrierTest, NonPendingEmitterEmits)
984 {
985  Emitter em1("em1", "/barrier", manager);
986  // register a second emitter to avoid immediate reset after emit
987  Emitter em2("em2", "/barrier", manager);
988  EXPECT_NO_THROW(em1.emit());
989  EXPECT_NO_THROW(em1.emit());
990 }
991 
992 /** Test if a component waiting for a syncpoint is woken up
993  * if an emitter is registered for two successor syncpoints and the emitter
994  * emits the same syncpoint twice
995  */
996 TEST_F(SyncPointManagerTest, EmitterEmitsSameSyncPointTwiceTest)
997 {
998  RefPtr<SyncPoint> sp1 = manager->get_syncpoint("emitter", "/test/sp1");
999  RefPtr<SyncPoint> sp2 = manager->get_syncpoint("emitter", "/test/sp2");
1000  RefPtr<SyncPoint> sp_pred = manager->get_syncpoint("waiter", "/test");
1001 
1002  sp1->register_emitter("emitter");
1003  sp2->register_emitter("emitter");
1004 
1005  waiter_thread_params *params1 = new waiter_thread_params();
1006  params1->manager = manager;
1007  params1->component = "waiter";
1008  params1->type = SyncPoint::WAIT_FOR_ALL;
1009  params1->num_wait_calls = 1;
1010  params1->sp_identifier = "/test";
1011 
1012  pthread_t pthread1;
1013  pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
1014 
1015  EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1016 
1017  sp1->emit("emitter");
1018 
1019  EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1020 
1021  sp1->emit("emitter");
1022  EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1023 
1024  sp2->emit("emitter");
1025  ASSERT_TRUE(wait_for_finished(params1));
1026  pthread_join(pthread1, NULL);
1027 
1028  delete params1;
1029 }
1030 
1031 /** Test if the component returns when using reltime_wait */
1032 TEST_F(SyncPointManagerTest, RelTimeWaitTest)
1033 {
1034  RefPtr<SyncPoint> sp1 = manager->get_syncpoint("emitter", "/test/sp1");
1035  manager->get_syncpoint("waiter", "/test/sp1");
1036  sp1->register_emitter("emitter");
1037  pthread_t thread;
1038  waiter_thread_params params;
1039  params.manager = manager;
1040  params.type = SyncPoint::WAIT_FOR_ALL;
1041  params.num_wait_calls = 1;
1042  params.timeout_sec = 0;
1043  params.timeout_nsec = 100000;
1044  params.component = "waiter";
1045  params.sp_identifier = "/test/sp1";
1046  pthread_create(&thread, NULL, start_waiter_thread, &params);
1047  ASSERT_TRUE(wait_for_finished(&params));
1048  /* The SyncPoint should have logged the error */
1049  ASSERT_GT(cache_logger_->get_messages().size(), 0);
1050 }
1051 
1052 /// @cond INTERNALS
1053 struct emitter_thread_data
1054 {
1055  RefPtr<SyncPointManager> manager;
1056  std::string name;
1057  std::string sp_name;
1058  atomic<ThreadStatus> status;
1059  Mutex mutex_running;
1060  WaitCondition cond_running = WaitCondition(&mutex_running);
1061  Mutex mutex_finished;
1062  WaitCondition cond_finished = WaitCondition(&mutex_finished);
1063 };
1064 /// @endcond
1065 
1066 /** helper function to call emit in a thread */
1067 void *
1068 call_emit(void *data)
1069 {
1070  emitter_thread_data *tdata = (emitter_thread_data *)data;
1071  tdata->status = RUNNING;
1072  tdata->mutex_running.lock();
1073  tdata->cond_running.wake_all();
1074  tdata->mutex_running.unlock();
1075  RefPtr<SyncPoint> sp = tdata->manager->get_syncpoint(tdata->name, tdata->sp_name);
1076  sp->register_emitter(tdata->name);
1077  sp->emit(tdata->name);
1078  tdata->status = FINISHED;
1079  tdata->mutex_finished.lock();
1080  tdata->cond_finished.wake_all();
1081  tdata->mutex_finished.unlock();
1082  return NULL;
1083 }
1084 
1085 /** Test the functionality of lock_until_next_wait */
1086 TEST_F(SyncPointManagerTest, LockUntilNextWaitTest)
1087 {
1088  RefPtr<SyncPoint> sp = manager->get_syncpoint("component", "/test");
1089 
1090  sp->lock_until_next_wait("component");
1091  pthread_t thread;
1092  emitter_thread_data *emitter_params = new emitter_thread_data();
1093  emitter_params->manager = manager;
1094  emitter_params->name = "emitter";
1095  emitter_params->sp_name = "/test";
1096  pthread_create(&thread, NULL, call_emit, (void *)emitter_params);
1097 
1098  emitter_params->mutex_running.lock();
1099  if (emitter_params->status != RUNNING) {
1100  ASSERT_TRUE(emitter_params->cond_running.reltimed_wait(1, 0));
1101  }
1102  emitter_params->mutex_running.unlock();
1103  emitter_params->mutex_finished.lock();
1104  EXPECT_FALSE(emitter_params->cond_finished.reltimed_wait(0, 100000));
1105  emitter_params->mutex_finished.unlock();
1106 
1107  pthread_t waiter_thread;
1108  waiter_thread_params waiter_params;
1109  waiter_params.manager = manager;
1110  waiter_params.component = "component";
1111  waiter_params.num_wait_calls = 1;
1112  waiter_params.sp_identifier = "/test";
1113  pthread_create(&waiter_thread, NULL, start_waiter_thread, &waiter_params);
1114 
1115  emitter_params->mutex_finished.lock();
1116  ASSERT_TRUE(emitter_params->status == FINISHED
1117  || emitter_params->cond_finished.reltimed_wait(1, 0));
1118  emitter_params->mutex_finished.unlock();
1119  pthread_join(thread, NULL);
1120  pthread_join(waiter_thread, NULL);
1121  delete emitter_params;
1122 }
1123 
1124 /** helper function used for testing wait() */
1125 void *
1126 call_wait_for_all(void *data)
1127 {
1128  SyncPoint *sp = (SyncPoint *)(data);
1129  sp->wait_for_all("waiter");
1130  return NULL;
1131 }
1132 
1133 /** Test the functionality of lock_until_next_wait
1134  * Test whether the waiter really calls wait before ALL emitters call emit
1135  * This tests a potential race condition between wait() and emit() */
1136 TEST_F(SyncPointManagerTest, LockUntilNextWaitWaiterComesFirstTest)
1137 {
1138  RefPtr<SyncPoint> sp = manager->get_syncpoint("waiter", "/test");
1139 
1140  sp->lock_until_next_wait("waiter");
1141 
1142  uint num_emitters = 100;
1143  pthread_t emitter_thread[num_emitters];
1144  emitter_thread_data *params[num_emitters];
1145  for (uint i = 0; i < num_emitters; i++) {
1146  params[i] = new emitter_thread_data();
1147  params[i]->manager = manager;
1148  string emitter_name = "emitter" + to_string(i);
1149  params[i]->name = emitter_name;
1150  params[i]->sp_name = "/test";
1151  pthread_create(&emitter_thread[i], NULL, call_emit, (void *)params[i]);
1152  }
1153 
1154  for (uint i = 0; i < num_emitters; i++) {
1155  params[i]->mutex_running.lock();
1156  if (params[i]->status != RUNNING) {
1157  ASSERT_TRUE(params[i]->cond_running.reltimed_wait(1, 0));
1158  }
1159  params[i]->mutex_running.unlock();
1160  }
1161 
1162  pthread_t waiter_thread;
1164  thread_params.component = "waiter";
1165  thread_params.type = SyncPoint::WAIT_FOR_ALL;
1166  thread_params.manager = manager;
1167  thread_params.thread_nr = 1;
1168  thread_params.num_wait_calls = 1;
1169  thread_params.sp_identifier = "/test";
1170  pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1171 
1172  for (uint i = 0; i < num_emitters; i++) {
1173  params[i]->mutex_finished.lock();
1174  ASSERT_TRUE(params[i]->status == FINISHED || params[i]->cond_finished.reltimed_wait(1, 0));
1175  params[i]->mutex_finished.unlock();
1176  pthread_join(emitter_thread[i], NULL);
1177  delete params[i];
1178  }
1179 
1180  ASSERT_TRUE(wait_for_finished(&thread_params));
1181  pthread_join(waiter_thread, NULL);
1182 }
1183 
1184 /** Test whether all waiters are always released at the same time, even if one
1185  * waiter called wait after one emitter already emitted. In particular, this
1186  * tests the following scenario:
1187  * 1. waiter1: wait
1188  * 2. emitter1: emit
1189  * 3. waiter2: wait
1190  * 4. emitter2: emit
1191  * 5. both waiter1 and waiter2 are released
1192  */
1193 TEST_F(SyncPointManagerTest, WaitersAreAlwaysReleasedSimultaneouslyTest)
1194 {
1195  string sp_identifier = "/test";
1196  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", sp_identifier);
1197  manager->get_syncpoint("emitter2", sp_identifier);
1198  sp->register_emitter("emitter1");
1199  sp->register_emitter("emitter2");
1200  uint num_threads = 2;
1201  pthread_t threads[num_threads];
1202  waiter_thread_params params[num_threads];
1203  for (uint i = 0; i < num_threads; i++) {
1204  params[i].component = "component " + to_string(i);
1205  params[i].manager = manager;
1206  params[i].type = SyncPoint::WAIT_FOR_ALL;
1207  params[i].thread_nr = i;
1208  params[i].num_wait_calls = 1;
1209  params[i].sp_identifier = sp_identifier;
1210  }
1211  pthread_create(&threads[0], &attrs, start_waiter_thread, &params[0]);
1212  ASSERT_FALSE(wait_for_finished(&params[0], 0, 10 * pow(10, 6)));
1213  sp->emit("emitter1");
1214  ASSERT_FALSE(wait_for_finished(&params[0], 0, 10 * pow(10, 6)));
1215  pthread_create(&threads[1], &attrs, start_waiter_thread, &params[1]);
1216  for (uint i = 0; i < num_threads; i++) {
1217  ASSERT_FALSE(wait_for_finished(&params[i], 0, 10 * pow(10, 6)));
1218  }
1219  sp->emit("emitter2");
1220  for (uint i = 0; i < num_threads; i++) {
1221  ASSERT_TRUE(wait_for_finished(&params[i]));
1222  pthread_join(threads[i], NULL);
1223  }
1224 }
1225 
1226 /** Test whether all syncpoints are released simultaneously if a timeout occurs;
1227  * i.e. make sure that only the first waiter's timeout matters and all
1228  * subsequent waiters are released when the first waiter times out.
1229  */
1230 TEST_F(SyncPointManagerTest, WaitersTimeoutSimultaneousReleaseTest)
1231 {
1232  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
1233  sp->register_emitter("emitter1");
1234  uint num_threads = 2;
1235  pthread_t threads[num_threads];
1236  string sp_identifier = "/test";
1237  waiter_thread_params params[num_threads];
1238  for (uint i = 0; i < num_threads; i++) {
1239  params[i].component = "component " + to_string(i);
1240  params[i].type = SyncPoint::WAIT_FOR_ALL;
1241  params[i].manager = manager;
1242  params[i].thread_nr = i;
1243  params[i].num_wait_calls = 1;
1244  params[i].timeout_sec = 0;
1245  params[i].timeout_nsec = 100 * pow(10, 6);
1246  params[i].sp_identifier = sp_identifier;
1247  }
1248  pthread_create(&threads[0], &attrs, start_waiter_thread, &params[0]);
1249  EXPECT_TRUE(wait_for_running(&params[0]));
1250  params[1].timeout_sec = 5;
1251  params[1].timeout_nsec = 0;
1252  pthread_create(&threads[1], &attrs, start_waiter_thread, &params[1]);
1253  for (uint i = 0; i < num_threads; i++) {
1254  EXPECT_TRUE(wait_for_running(&params[i]));
1255  }
1256  wait_for_finished(&params[0], params[0].timeout_sec, params[0].timeout_nsec);
1257  wait_for_finished(&params[1], 0, pow(10, 6));
1258  for (uint i = 0; i < num_threads; i++) {
1259  pthread_join(threads[i], NULL);
1260  }
1261 }
1262 
1263 /** Similar as before, test if the timeout is handled properly. This time, let
1264  * a wait_for_one with a short timeout step by. The other waiters should not be
1265  * affected, i.e. they should still be waiting even when the timeout for the
1266  * wait_for_one occurred.
1267  * In other words, wait_for_one waiters are handled completeley separately.
1268  */
1269 TEST_F(SyncPointManagerTest, WaitForOneSeparateTimeoutTest)
1270 {
1271  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
1272  sp->register_emitter("emitter1");
1273  string sp_identifier = "/test";
1274  uint num_threads = 2;
1275  Barrier * barrier = new Barrier(num_threads + 2);
1276  pthread_t wait_for_one_thread;
1277  waiter_thread_params wait_for_one_params;
1278  wait_for_one_params.component = "wait_for_one";
1279  wait_for_one_params.type = SyncPoint::WAIT_FOR_ONE;
1280  wait_for_one_params.manager = manager;
1281  wait_for_one_params.thread_nr = 2;
1282  wait_for_one_params.num_wait_calls = 1;
1283  wait_for_one_params.timeout_sec = 0;
1284  wait_for_one_params.timeout_nsec = 100 * pow(10, 6);
1285  wait_for_one_params.status = PENDING;
1286  wait_for_one_params.sp_identifier = sp_identifier;
1287  wait_for_one_params.start_barrier = barrier;
1288  pthread_create(&wait_for_one_thread, &attrs, start_waiter_thread, &wait_for_one_params);
1289  pthread_t threads[num_threads];
1290  waiter_thread_params params[num_threads];
1291  for (uint i = 0; i < num_threads; i++) {
1292  params[i].component = "component " + to_string(i);
1293  params[i].type = SyncPoint::WAIT_FOR_ALL;
1294  params[i].manager = manager;
1295  params[i].thread_nr = i;
1296  params[i].num_wait_calls = 1;
1297  params[i].timeout_sec = 1;
1298  params[i].timeout_nsec = 0;
1299  params[i].sp_identifier = sp_identifier;
1300  params[i].start_barrier = barrier;
1301  pthread_create(&threads[i], &attrs, start_waiter_thread, &params[i]);
1302  }
1303  barrier->wait();
1304  EXPECT_TRUE(wait_for_running(&wait_for_one_params));
1305  for (uint i = 0; i < num_threads; i++) {
1306  EXPECT_TRUE(wait_for_running(&params[i]));
1307  }
1308  EXPECT_TRUE(wait_for_finished(&wait_for_one_params));
1309  for (uint i = 0; i < num_threads; i++) {
1310  EXPECT_EQ(RUNNING, params[i].status);
1311  }
1312  for (uint i = 0; i < num_threads; i++) {
1313  EXPECT_TRUE(wait_for_finished(&params[i], params[i].timeout_sec, params[i].timeout_nsec));
1314  pthread_join(threads[i], NULL);
1315  }
1316  pthread_join(wait_for_one_thread, NULL);
1317 }
1318 
1319 TEST_F(SyncPointManagerTest, MultipleWaitsWithoutEmitters)
1320 {
1321  RefPtr<SyncPoint> sp = manager->get_syncpoint("waiter", "/test");
1322  pthread_t waiter_thread;
1324  thread_params.component = "waiter";
1325  thread_params.type = SyncPoint::WAIT_FOR_ALL;
1326  thread_params.manager = manager;
1327  thread_params.thread_nr = 1;
1328  thread_params.num_wait_calls = 2;
1329  thread_params.sp_identifier = "/test";
1330  pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1331  ASSERT_TRUE(wait_for_finished(&thread_params));
1332  pthread_join(waiter_thread, NULL);
1333 }
1334 
1335 TEST_F(SyncPointManagerTest, ReleaseOfEmitterThrowsException)
1336 {
1337  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1338  sp->register_emitter("emitter");
1339  ASSERT_THROW(manager->release_syncpoint("emitter", sp), SyncPointCannotReleaseEmitter);
1340 }
1341 
1342 TEST_F(SyncPointManagerTest, UnregisterNonEmitter)
1343 {
1344  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1345  // "emitter" is a watcher but not an emitter
1346  EXPECT_NO_THROW(sp->unregister_emitter("emitter"));
1347  // "foo" is not known to the syncpoint
1348  EXPECT_NO_THROW(sp->unregister_emitter("foo"));
1349 }
1350 
1351 TEST_F(SyncPointManagerTest, ReleaseBarrierWaiter)
1352 {
1353  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1354  sp->register_emitter("emitter");
1355  pthread_t waiter_thread;
1357  thread_params.component = "component 1";
1358  thread_params.type = SyncPoint::WAIT_FOR_ALL;
1359  thread_params.manager = manager;
1360  thread_params.thread_nr = 1;
1361  thread_params.num_wait_calls = 1;
1362  thread_params.sp_identifier = "/test";
1363  thread_params.timeout_sec = 2;
1364  pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1365  EXPECT_TRUE(wait_for_running(&thread_params));
1366  ASSERT_TRUE(sp->watcher_is_waiting("component 1", SyncPoint::WAIT_FOR_ALL));
1367  pthread_cancel(waiter_thread);
1368  pthread_join(waiter_thread, NULL);
1369  ASSERT_TRUE(sp->watcher_is_waiting("component 1", SyncPoint::WAIT_FOR_ALL));
1370  manager->release_syncpoint("component 1", sp);
1371  sp = manager->get_syncpoint("component 1", "/test");
1372  EXPECT_NO_THROW(sp->reltime_wait_for_all("component 1", 0, pow(10, 6)));
1373 }
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:93
waiter_thread_params::manager
RefPtr< SyncPointManager > manager
SyncPointManager passed to the thread.
Definition: test_syncpoint.cpp:334
fawkes::SyncPoint::WakeupType
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
Definition: syncpoint.h:62
waiter_thread_params::sp_identifier
string sp_identifier
Name of the SyncPoint.
Definition: test_syncpoint.cpp:342
fawkes::SyncPointCannotReleaseEmitter
The component called release but is still registered as emitter.
Definition: exceptions.h:209
fawkes::SyncPoint::watcher_is_waiting
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
Definition: syncpoint.cpp:565
SyncPointManagerTest
Definition: test_syncpoint.cpp:93
fawkes::SyncPointManager
Definition: syncpoint_manager.h:43
fawkes::SyncPointInvalidIdentifierException
Invalid identifier used (i.e.
Definition: exceptions.h:128
SyncPointTest::sp2
RefPtr< SyncPoint > sp2
A syncpoint for testing.
Definition: test_syncpoint.cpp:81
SyncPointManagerTest::SyncPointManagerTest
SyncPointManagerTest()
Initialize the test class.
Definition: test_syncpoint.cpp:99
fawkes::Mutex
Definition: mutex.h:38
fawkes::SyncPoint::get_watchers
std::set< std::string > get_watchers() const
Definition: syncpoint.cpp:516
waiter_thread_params
struct used for multithreading tests
Definition: test_syncpoint.cpp:331
fawkes::SyncPoint::emit
virtual void emit(const std::string &component)
send a signal to all waiting threads
Definition: syncpoint.cpp:148
fawkes::MultiLogger
Definition: multi.h:40
fawkes::WaitCondition
Definition: wait_condition.h:42
fawkes::RefPtr< SyncPoint >
SyncPointTest::SetUp
virtual void SetUp()
Initialize the test class.
Definition: test_syncpoint.cpp:61
SyncPointTest::TearDown
virtual void TearDown()
Clean up.
Definition: test_syncpoint.cpp:73
waiter_thread_params::cond_finished
WaitCondition cond_finished
WaitCondition to indicate that the thread has finished.
Definition: test_syncpoint.cpp:358
SyncBarrierTest::SyncBarrierTest
SyncBarrierTest()
Constructor.
Definition: test_syncpoint.cpp:141
SyncPointManagerTest::attrs
pthread_attr_t attrs
Thread attributes.
Definition: test_syncpoint.cpp:131
fawkes::SyncPoint
Definition: syncpoint.h:55
fawkes::SyncPoint::reltime_wait_for_all
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
Definition: syncpoint.cpp:385
SyncPointManagerTest::logger_
MultiLogger * logger_
Logger used to initialize SyncPoints.
Definition: test_syncpoint.cpp:125
fawkes::MutexLocker
Definition: mutex_locker.h:39
waiter_thread_params::cond_running
WaitCondition cond_running
WaitCondition to indicate that the thread is running.
Definition: test_syncpoint.cpp:354
fawkes::SyncPoint::get_identifier
std::string get_identifier() const
Definition: syncpoint.cpp:105
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:137
fawkes::CacheLogger
Definition: cache.h:43
waiter_thread_params::status
atomic< ThreadStatus > status
current status of the thread
Definition: test_syncpoint.cpp:350
SyncPointManagerTest::~SyncPointManagerTest
virtual ~SyncPointManagerTest()
Deinitialize the test class.
Definition: test_syncpoint.cpp:112
thread_params
The parameters passed to the threads.
Definition: test_wait_condition.cpp:36
SyncPointTest::sp3
RefPtr< SyncPoint > sp3
A syncpoint for testing.
Definition: test_syncpoint.cpp:83
Emitter::emit
void emit()
emit the SyncBarrier
Definition: test_syncpoint.cpp:662
fawkes::SyncPointInvalidComponentException
Invalid component name used (i.e.
Definition: exceptions.h:146
SyncPointManagerTest::cache_logger_
CacheLogger * cache_logger_
Cache Logger used for testing.
Definition: test_syncpoint.cpp:128
waiter_thread_params::mutex_finished
Mutex mutex_finished
Mutex to protect cond_finished.
Definition: test_syncpoint.cpp:356
fawkes::SyncPointManager::get_syncpoint
RefPtr< SyncPoint > get_syncpoint(const std::string &component, const std::string &identifier)
Get a SyncPoint.
Definition: syncpoint_manager.cpp:74
Emitter::Emitter
Emitter(string identifier, string syncbarrier, RefPtr< SyncPointManager > manager)
Constructor.
Definition: test_syncpoint.cpp:646
SyncPointTest
Definition: test_syncpoint.cpp:54
waiter_thread_params::timeout_sec
uint timeout_sec
timeout in sec
Definition: test_syncpoint.cpp:346
SyncPointTest::sp1
RefPtr< SyncPoint > sp1
A syncpoint for testing.
Definition: test_syncpoint.cpp:79
fawkes::SyncPoint::unregister_emitter
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
Definition: syncpoint.cpp:454
fawkes
SyncBarrierTest
Definition: test_syncpoint.cpp:137
Emitter
Helper class which registers and emits a given SyncBarrier.
Definition: test_syncpoint.cpp:638
Emitter::~Emitter
virtual ~Emitter()
Destructor.
Definition: test_syncpoint.cpp:654
waiter_thread_params::num_wait_calls
uint num_wait_calls
Number of wait calls the thread should make.
Definition: test_syncpoint.cpp:340
fawkes::WaitCondition::reltimed_wait
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Definition: wait_condition.cpp:212
fawkes::WaitCondition::wake_all
void wake_all()
Wake up all waiting threads.
Definition: wait_condition.cpp:293
waiter_thread_params::mutex_running
Mutex mutex_running
Mutex to protect cond_running.
Definition: test_syncpoint.cpp:352
waiter_thread_params::start_barrier
Barrier * start_barrier
Barrier for startup synchronization.
Definition: test_syncpoint.cpp:360
fawkes::SyncPointMultipleWaitCallsException
A component called wait() but is already waiting.
Definition: exceptions.h:162
waiter_thread_params::type
SyncPoint::WakeupType type
Wait type.
Definition: test_syncpoint.cpp:338
fawkes::SyncPointSetLessThan
Definition: syncpoint.h:49
SyncPointTest::logger_
MultiLogger * logger_
Logger for testing.
Definition: test_syncpoint.cpp:86
fawkes::SyncPoint::register_emitter
virtual void register_emitter(const std::string &component)
register as emitter
Definition: syncpoint.cpp:437
waiter_thread_params::thread_nr
uint thread_nr
Thread number.
Definition: test_syncpoint.cpp:336
fawkes::SyncPointNonEmitterCalledEmitException
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Definition: exceptions.h:180
waiter_thread_params::timeout_nsec
uint timeout_nsec
timeout in nsec
Definition: test_syncpoint.cpp:348
fawkes::SyncPoint::get_emitters
std::multiset< std::string > get_emitters() const
Definition: syncpoint.cpp:543
fawkes::Barrier
Definition: barrier.h:37
fawkes::SyncPoint::wait
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
Definition: syncpoint.cpp:239
SyncPointManagerTest::manager
RefPtr< SyncPointManager > manager
A Pointer to a SyncPointManager.
Definition: test_syncpoint.cpp:122
fawkes::Barrier::wait
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:159
fawkes::SyncPoint::lock_until_next_wait
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
Definition: syncpoint.cpp:416
waiter_thread_params::component
string component
Name of the component.
Definition: test_syncpoint.cpp:344
fawkes::SyncPoint::wait_for_all
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
Definition: syncpoint.cpp:363