parallelcxx11.hh
Go to the documentation of this file.
1 /* -*- mia-c++ -*-
2  *
3  * This file is part of MIA - a toolbox for medical image analysis
4  * Copyright (c) Leipzig, Madrid 1999-2016 Gert Wollny
5  *
6  * MIA is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with MIA; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #ifndef mia_core_parallelcxx11_hh
22 #define mia_core_parallelcxx11_hh
23 
24 #include <mia/core/defines.hh>
25 
26 #include <thread>
27 #include <atomic>
28 #include <mutex>
29 #include <cassert>
30 #include <vector>
31 
33 
34 typedef std::mutex CMutex;
35 typedef std::recursive_mutex CRecursiveMutex;
36 
37 
39 public:
40  static int get_max_tasks();
41  static void set_max_tasks(int mt);
42 private:
43  static int max_tasks;
44 };
45 
46 #define ATOMIC std::atomic
47 
48 template <typename Mutex>
49 class TScopedLock {
50 public:
51  TScopedLock(Mutex& m): m_mutex(m){
52  m_mutex.lock();
53  own_lock = true;
54  };
55 
56  TScopedLock(const TScopedLock<Mutex>& other) = delete;
57  TScopedLock& operator = (const TScopedLock<Mutex>& other) = delete;
58 
60  if (own_lock)
61  m_mutex.unlock();
62  };
63 
64  void release() {
65  if (own_lock) {
66  own_lock = false;
67  m_mutex.unlock();
68  }
69  }
70 private:
71  Mutex& m_mutex;
72  bool own_lock;
73 };
74 
77 
79 public:
80  C1DParallelRange(int begin, int end, int block = 1):
81  m_begin(begin),
82  m_end(end),
83  m_block(block),
84  m_current_wp(0)
85  {
86  assert(begin <= end);
87  }
88 
90  m_begin(orig.m_begin),
91  m_end(orig.m_end),
92  m_block(orig.m_block)
93  {
94  m_current_wp = orig.m_current_wp.load();
95  }
96 
98  int wp = m_current_wp++;
99  int begin = m_begin + wp * m_block;
100  int end = begin + m_block;
101  if (begin > m_end) {
102  return C1DParallelRange(m_end,m_end,0);
103  }
104  if (end > m_end) {
105  return C1DParallelRange(begin, m_end, 1);
106  }
107  return C1DParallelRange(begin, end, 1);
108  }
109 
110  bool empty() const {
111  return m_begin >= m_end;
112  }
113 
114  int begin() const {
115  return m_begin;
116  }
117 
118  int end() const {
119  return m_end;
120  }
121 
122 private:
123  int m_begin;
124  int m_end;
125  int m_block;
126  std::atomic<int> m_current_wp;
127 };
128 
129 template <typename Range, typename Func>
130 void pfor_callback(Range& range, Func f)
131 {
132  while (true) {
133  Range wp = range.get_next_workpackage();
134  if (!wp.empty())
135  f(wp);
136  else
137  break;
138  }
139 }
140 
141 template <typename Range, typename Func>
142 void pfor(Range range, Func f) {
143 
144  int max_treads = CMaxTasks::get_max_tasks();
145 
146  std::thread::hardware_concurrency();
147 
148  std::vector<std::thread> threads;
149  for (int i = 0; i < max_treads; ++i) {
150  threads.push_back(std::thread(pfor_callback<Range, Func>, std::ref(range), f));
151  }
152 
153  for (int i = 0; i < max_treads; ++i) {
154  threads[i].join();
155  }
156 };
157 
158 template <typename V>
159 class ReduceValue {
160 public:
161  typedef V Value;
162  ReduceValue(const Value& i):identity(i), value(i) {
163  }
164 
165  template <typename Reduce>
166  void reduce(const Value& v, Reduce r)
167  {
168  CScopedLock sl(mutex);
169  value = r(v, value);
170  }
171  const Value& get_identity() const {
172  return identity;
173  }
174  const Value& get_reduced() const {
175  return value;
176  }
177 private:
178  mutable CMutex mutex;
179  Value identity;
180  Value value;
181 };
182 
183 template <typename Range, typename Value, typename Func, typename Reduce>
184 void preduce_callback(Range& range, ReduceValue<Value>& v, Func f, Reduce r)
185 {
186  Value value = v.get_identity();
187  while (true) {
188  Range wp = range.get_next_workpackage();
189  if (!wp.empty())
190  value = f(wp, value);
191  else
192  break;
193  }
194  v.reduce(value, r);
195 }
196 
197 template <typename Range, typename Value, typename Func, typename Reduce>
198 Value preduce(Range range, Value identity, Func f, Reduce r)
199 {
200  int max_treads = CMaxTasks::get_max_tasks();
201 
202  ReduceValue<Value> value(identity);
203 
204  std::vector<std::thread> threads;
205  for (int i = 0; i < max_treads; ++i) {
206  threads.push_back(std::thread(preduce_callback<Range, Value, Func, Reduce>,
207  std::ref(range), std::ref(value), f, r));
208  }
209 
210  for (int i = 0; i < max_treads; ++i) {
211  threads[i].join();
212  }
213  return value.get_reduced();
214 };
215 
216 NS_MIA_END
217 
218 #endif
TScopedLock(Mutex &m)
const Value & get_reduced() const
Value preduce(Range range, Value identity, Func f, Reduce r)
C1DParallelRange get_next_workpackage()
TScopedLock< CRecursiveMutex > CRecursiveScopedLock
#define NS_MIA_BEGIN
conveniance define to start the mia namespace
Definition: defines.hh:33
void reduce(const Value &v, Reduce r)
ReduceValue(const Value &i)
bool empty() const
std::recursive_mutex CRecursiveMutex
static int get_max_tasks()
void release()
void pfor_callback(Range &range, Func f)
C1DParallelRange(const C1DParallelRange &orig)
#define EXPORT_CORE
Macro to manage Visual C++ style dllimport/dllexport.
Definition: defines.hh:101
const Value & get_identity() const
TScopedLock< CMutex > CScopedLock
void preduce_callback(Range &range, ReduceValue< Value > &v, Func f, Reduce r)
std::mutex CMutex
C1DParallelRange(int begin, int end, int block=1)
int begin() const
#define NS_MIA_END
conveniance define to end the mia namespace
Definition: defines.hh:36
void pfor(Range range, Func f)