1

I've got a very time critical simulator that is supposed to run as fast as humanly possible. I tried to increase the data throughput by distributing every data frame to multiple worker threads. To save the overhead of creating new threads every frame, the worker threads are detached, never exit and wait on a conditional variable to signal them their data is ready.
It is important to note that none of the threads access the same data. They all work on their own (fixed) non-overlapping memory addresses.

Every worker thread gets assigned an own thread_data_t struct and the following function:

void worker(thread_data_t* thread_data) {
    
    thread_data->status = THREAD_STATUS_WAITING;

    while (thread_data->status != THREAD_STATUS_CLOSEING) {

        std::unique_lock<std::mutex> lock(thread_data->mutex);

        // thread_data->status set by calling thread, along with .notify_one()
        thread_data->condition.wait(lock, [thread_data]() { return thread_data->status != THREAD_STATUS_WAITING; });
        
        if (thread_data->status == THREAD_STATUS_ASSIGNED) {
            // work and set thread_data->result
        }

        thread_data->status = THREAD_STATUS_WAITING;
    }
}

The simulator function decides when all data for a thread is present and it should start working. This is done by setting thread_data->status = THREAD_STATUS_ASSIGNED; to the thread_data of the thread.

size_t simulate(simulator_t self) {

    size_t sum = 0;

    while (sum < self->done_count) {
    
        // Notify all threads that their data is present and they should start working
        for (size_t i = 0; i < self->num_threads; i++) {

            self->calculation_threads[i].result = 0;

            {
                std::lock_guard<std::mutex> guard(self->calculation_threads[i].mutex);
                self->calculation_threads[i].status = THREAD_STATUS_ASSIGNED;
            }
    
            self->calculation_threads[i].condition.notify_one();
        }

        // All threads should have started working now

        // Wait for all threads to finish
        for (size_t i = 0; i < self->num_threads; i++) {

            while (self->calculation_threads[i].status != THREAD_STATUS_WAITING) { 
                // This is the endless loop occuring
                self->calculation_threads[i].condition.notify_one(); // Notify again in case it was missed the first time

            }

            sum += self->calculation_threads[i].result;
        }

    }
}

Problem

  1. The first time simulate() is called, the function runs correctly. However in the second attempt the simulator gets stuck in the while loop while waiting for all threads to finish. It seems like in the second run the threads don't start working and I don't understand why.
  2. The script works with up to 2 threads, but not with 3 or more. The thread_data of each thread isn't changed outside of the worker or simulate function.

The question is: WHY?

My guess is that I'm doing something wrong with the mutexes that I haven't understood yet.

Any help on this is much appreciated!

4
  • 5
    Looks like you read the status outside of any lock whilst another thread could be changing it -> ub Commented Dec 2, 2023 at 22:46
  • 1
    See if the outcome changes if you give thread_data_t::status a type of std::atomic<int>. Commented Dec 2, 2023 at 22:55
  • Don't reinvent the wheel. Use asio::executor_work_guard and run an array of std::jthread on the asio::io_context. You just need stranded execution. asio performs the house-keeping chore of keeping threads alive and dispatching jobs. Commented Dec 2, 2023 at 22:58
  • 1
    "as fast as humanly possible" -- shouldn't that be "as fast as machinely possible"? Commented Dec 2, 2023 at 23:09

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.