diff options
author | John Fultz <jfultz@wolfram.com> | 2019-03-19 11:21:46 -0500 |
---|---|---|
committer | John Fultz <jfultz@wolfram.com> | 2019-07-21 02:02:55 -0700 |
commit | f2e42335d294128591fb123baddf7182ca390d40 (patch) | |
tree | 779931ce5c0963862fd4c870516045549149073c | |
parent | e7f645d4f5030e52b1c9f27dc3fbf2c20fd47cd5 (diff) |
Sims are now threaded.
Defaults to two threads right now. This
can be changed in the Simulator constructor
for the time being. An interface will be
added for this soon.
For an example I tried, I got the following times on 500 simulations in a
macOS release build:
* Quackle 1.0.3 - 26 seconds
* master branch, 1 thread - 27 seconds
* master branch, 2 threads - 19 seconds
* master branch, 4 threads - 15 seconds
This isn't the most efficient use of threads.
It distributes all of the plays being simmed
into a thread pool, but only for one simulation.
Then it comes up for air and tries again.
This was the easiest to implement robustly
given the current architecture.
-rw-r--r-- | sim.cpp | 101 | ||||
-rw-r--r-- | sim.h | 39 |
2 files changed, 139 insertions, 1 deletions
@@ -38,10 +38,12 @@ Simulator::Simulator() : m_logfileIsOpen(false), m_hasHeader(false), m_dispatch(0), m_iterations(0), m_ignoreOppos(false) { m_originalGame.addPosition(); + setThreadCount(2); } Simulator::~Simulator() { + setThreadCount(0); closeLogfile(); } @@ -214,6 +216,42 @@ void Simulator::resetNumbers() m_iterations = 0; } +void Simulator::simThreadFunc(SimmedMoveMessageQueue& incoming, SimmedMoveMessageQueue& outgoing) +{ + while (true) + { + auto result = incoming.pop_or_terminate(); + if (result.second) + break; + Simulator::simulateOnePosition(result.first, incoming.constants()); + outgoing.push(result.first); + } +} + +void Simulator::setThreadCount(size_t count) +{ + if (count == 0 && m_threadPool.size() != 0) + { + m_sendQueue.send_terminate_all(); + for (auto& t : m_threadPool) + t.join(); + m_threadPool.clear(); + } + + while (count > m_threadPool.size()) + { + m_threadPool.emplace_back(Simulator::simThreadFunc, std::ref(m_sendQueue), std::ref(m_receiveQueue)); + } + + while (count < m_threadPool.size()) + { + m_sendQueue.send_terminate_one(m_threadPool.back().get_id()); + m_threadPool.back().join(); + m_threadPool.pop_back(); + } +} + + void Simulator::simulate(int plies, int iterations) { for (int i = 0; i < iterations; ++i) @@ -252,6 +290,8 @@ void Simulator::simulate(int plies) constants.ignoreOppos = m_ignoreOppos; constants.isLogging = isLogging(); + m_sendQueue.setConstants(constants); + if (isLogging()) { if (!m_hasHeader) @@ -261,6 +301,8 @@ void Simulator::simulate(int plies) m_xmlIndent += MARK_UV('\t'); } + int messageCount = 0; + for (auto &moveIt : m_simmedMoves) { if (!moveIt.includeInSimulation()) @@ -279,7 +321,13 @@ void Simulator::simulate(int plies) message.levels = moveIt.levels; message.xmlIndent = m_xmlIndent; - simulateOnePosition(message, constants); + m_sendQueue.push(message); + messageCount++; + } + + while (messageCount-- > 0) + { + SimmedMoveMessage message(m_receiveQueue.pop()); incorporateMessage(message); } @@ -561,6 +609,57 @@ void AveragedValue::clear() //////////// +void SimmedMoveMessageQueue::push(SimmedMoveMessage& msg) +{ + std::lock_guard<std::mutex> lk(m_queueMutex); + m_queue.push(std::move(msg)); + m_condition.notify_one(); +} + +std::pair<SimmedMoveMessage, bool> SimmedMoveMessageQueue::pop_or_terminate() +{ + std::unique_lock<std::mutex> lk(m_queueMutex); + if (m_queue.empty() && !m_terminateAll && m_terminateOne == std::thread::id()) + m_condition.wait(lk); + std::pair<SimmedMoveMessage, bool> result; + result.second = m_terminateAll || m_terminateOne == std::this_thread::get_id(); + if (result.second) + m_terminateOne = std::thread::id(); + else + { + result.first = std::move(m_queue.front()); + m_queue.pop(); + } + return result; +} + +SimmedMoveMessage SimmedMoveMessageQueue::pop() +{ + std::unique_lock<std::mutex> lk(m_queueMutex); + if (m_queue.empty()) + m_condition.wait(lk); + SimmedMoveMessage result = std::move(m_queue.front()); + m_queue.pop(); + return result; +} + +void SimmedMoveMessageQueue::send_terminate_all() +{ + std::lock_guard<std::mutex> lk(m_queueMutex); + m_terminateAll = true; + m_condition.notify_all(); +} + +void SimmedMoveMessageQueue::send_terminate_one(const std::thread::id& id) +{ + std::lock_guard<std::mutex> lk(m_queueMutex); + m_terminateOne = id; + m_condition.notify_all(); +} + + +//////////// + double SimmedMove::calculateEquity() const { if (levels.empty()) @@ -20,7 +20,11 @@ #define QUACKLE_SIM_H #include <atomic> +#include <condition_variable> +#include <mutex> +#include <queue> #include <sstream> +#include <thread> #include <vector> #include "alphabetparameters.h" @@ -201,11 +205,38 @@ public: bool isLogging; }; +class SimmedMoveMessageQueue +{ +public: + SimmedMoveMessageQueue() = default; + SimmedMoveMessageQueue(SimmedMoveMessageQueue&) = delete; + SimmedMoveMessageQueue(SimmedMoveMessageQueue&&) = delete; + + void push(SimmedMoveMessage& msg); + SimmedMoveMessage pop(); + std::pair<SimmedMoveMessage, bool> pop_or_terminate(); + void send_terminate_all(); + void send_terminate_one(const std::thread::id& id); + + const SimmedMoveConstants& constants() { return m_constants; }; + void setConstants(const SimmedMoveConstants& constants) { m_constants = constants; }; + +private: + SimmedMoveConstants m_constants; + std::queue<SimmedMoveMessage> m_queue; + std::condition_variable m_condition; + std::mutex m_queueMutex; + bool m_terminateAll = false; + std::thread::id m_terminateOne; +}; + class Simulator { public: // constructs a new simulator Simulator(); + Simulator(const Simulator&) = delete; + Simulator(Simulator&&) = delete; ~Simulator(); // Simulate moves on this position. Also initializes the @@ -263,6 +294,9 @@ public: void setIgnoreOppos(bool ignore); bool ignoreOppos() const; + static void simThreadFunc(SimmedMoveMessageQueue& incoming, SimmedMoveMessageQueue& outgoing); + void setThreadCount(size_t count); + // set values for all levels of all moves to zero void resetNumbers(); @@ -337,6 +371,11 @@ protected: int m_iterations; bool m_ignoreOppos; + + // Pair of thread and bool requesting to terminate + std::vector<std::thread> m_threadPool; + SimmedMoveMessageQueue m_sendQueue; + SimmedMoveMessageQueue m_receiveQueue; }; inline GamePosition &Simulator::currentPosition() |