
Steven Watanabe-4 wrote:
AMDG
gabe_rosser wrote:
Apologies for my ignorance but I would be extremely grateful if someone would kindly point me in the right direction.
I am trying to parallelize my Monte Carlo simulation. The bottleneck in the process is the generation of Normally distributed random numbers (using boost.random libraries), so I was hoping to substantially improve the efficiency of the process by setting 2 or 3 cores of my quad core processor to the the task of generating these numbers while one core gets on with the rest of the algorithm.
The simplest way I can envisage doing this is by allocating shared memory that is simultaneously written (by rand number generating threads) and read (by main thread), with some method of ensuring that the main thread doesn't read too far ahead (perhaps locking or just a simple while loop?).
Boost.thread alone doesn't seem to be enough - an attempt to do exactly this resulted in a substantial slow down (with or without locking - without may result in undefined behaviour but I tried it anyway to see if it was causing the slowdown). Do I need to investigate shared pointers? I really need some large container in shared memory which I can fill in any order, much as would be the case if I were appending to and reading from a file, but hopefully faster!
Any suggestions please? Sorry for the lack of understanding!
The synchronization is too fine-grained. Even without the overhead of locking, the cache is probably unhappy because of the false-sharing.
The following is slightly faster for me using the multi threaded version.
#include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <boost/spirit/home/phoenix/stl/container.hpp> #include <boost/spirit/home/phoenix/operator/logical.hpp> #include <boost/spirit/home/phoenix/core/reference.hpp> #include <boost/random.hpp> #include <boost/timer.hpp> #include <boost/noncopyable.hpp> #include <deque>
template<class T> class queue_writer;
template<class T> class simple_queue { public: simple_queue(T* buffer) : data(buffer), index(0), size_(0) {} void push_back(const T& arg) { data[size_++] = arg; } T& front() { return(data[index]); } void pop_front() { ++index; } bool empty() const { return(index == size_); } std::size_t size() const { return(size_ - index); } private: T* data; std::size_t index; std::size_t size_; };
template<class T> class queue_reader : boost::noncopyable { template<class T> friend class queue_writer; public: queue_reader() {} T pop() { if(local_data.empty()) { boost::unique_lock<boost::mutex> l(mutex); condition.wait(l, !boost::phoenix::empty(boost::phoenix::ref(shared_data))); std::swap(shared_data, local_data); } T result = local_data.front().front(); local_data.front().pop_front(); if(local_data.front().empty()) { local_data.pop_front(); } return(result); } private: // strong guarantee void push_impl(simple_queue<T>& arg) { if(!arg.empty()) { boost::unique_lock<boost::mutex> l(mutex); shared_data.push_back(arg); condition.notify_all(); } }
boost::mutex mutex; boost::condition condition; std::deque<simple_queue<T> > shared_data;
std::deque<simple_queue<T> > local_data; };
template<class T> class queue_writer { public: queue_writer(queue_reader<T>& q, std::size_t s) : data(new T[s]), index(0), impl(&q), block_size(s) {} void push(const T& arg) { data.push_back(arg); if(data.size() == block_size) { impl->push_impl(data); data = simple_queue<T>(new T[block_size]); } } ~queue_writer() { impl->push_impl(data); } private: simple_queue<T> data; std::size_t index; queue_reader<T>* impl; std::size_t block_size; };
static const int N = 100000000;
void reader_thread(queue_reader<double>& q) { for(int i = 0; i < N; ++i) { q.pop(); } }
void writer_thread(queue_reader<double>& qr, boost::mt19937& prng) { boost::variate_generator<boost::mt19937&, boost::exponential_distribution<> > gen(prng, boost::exponential_distribution<>()); queue_writer<double> q(qr, 1024);
for(int i = 0; i < N / 2; ++i) { q.push(gen()); } }
int main() { boost::timer timer; #if 0 boost::mt19937 prng; boost::variate_generator<boost::mt19937&, boost::exponential_distribution<> > gen(prng, boost::exponential_distribution<>()); for(int i = 0; i < N; ++i) { gen(); } #else boost::mt19937 prng1(57); boost::mt19937 prng2(38); queue_reader<double> queue; boost::thread t1(&writer_thread, boost::ref(queue), prng1); boost::thread t2(&writer_thread, boost::ref(queue), prng2); reader_thread(queue); t1.join(); t2.join(); #endif std::cout << timer.elapsed() << std::endl; }
In Christ, Steven Watanabe
_______________________________________________ Boost-users mailing list Boost-users@lists.boost.org http://lists.boost.org/mailman/listinfo.cgi/boost-users
Hi Steven, Thanks for the suggestion. I've run into a couple of problems compiling this. Firstly I got an error: shadows template parm `class T' from the line I've placed in bold type above (line 33). Fixed that by renaming template<class U>. Now I have an error from main() (also shown in bold above): main.cpp:123: error: variable ‘boost::thread t1’ has initialiser but incomplete type main.cpp:124: error: variable ‘boost::thread t2’ has initialiser but incomplete type Sorry but I'm not sure how to fix this. Gabe -- View this message in context: http://www.nabble.com/-Thread--%28Newb%29-shared-memory-in-multithread-proce... Sent from the Boost - Users mailing list archive at Nabble.com.