0

I have a thread that calculates data in an endless loop. it produces three results

  1. a std::vector (behind each vector element there is a float array of 3 elements on the heap)
  2. an int array*
  3. an int that indicates the size of the int[].

two other threads should now process this data (also endless). Since all three threads do not take the same processing time, data is sometimes "skipped".

std::vector<float*> a_vecor;
int* a_Array;
int a_Array_size;
std::mutex a_mutex;


void thread_A() 
{
    while (true) 
    { 
         calculate(); 
         a_mutex.lock();
         a_vector = getvector();
         a_Array = getArray();
         a_Array_size = getArraysize();
         a_mutex.unlock();
    }
}

void thread_B() 
{
    while (true) 
    { 
         a_mutex.lock();
         std::vector<float*> b_vector = a_vector;
         int* b_Array = a_Array;
         int b_array_Size = a_array_Size; 
         a_mutex.unlock();
         calculate_b();
    }
}

void thread_C() 
{
    while (true) 
    { 
         a_mutex.lock();
         std::vector<float*> c_vector = a_vector;
         int* c_Array = a_Array;
         int c_array_Size = a_array_Size; 
         a_mutex.unlock();
         calculate_c();
    }
}

My problem is how can I pass this data from one thread to another? I would actually make a copy of the three parameters in the following thread, but I can just:

std::vector <float*> b = calculate();
std::vector <float*> a = b;

is this a copy or a reference? how about the vector elements which are only references? what is the fastest way to copy a vector and array? Doesn't a loop run internally here too? the flow of information is only in one direction, one process generates the data, the next only reads it. can i take advantage of this? is there an alternative to the mutex?

3
  • std::vector <float*> a = b; is a copy, but elements of a might become invalid when producer recomputes b. @Metatron Commented May 17, 2020 at 11:30
  • the size of the vector is several thousand elements large, but each element consists of an array (new float [3]) of 3 elements Commented May 17, 2020 at 11:46
  • Probably you should fix that first. Allocations are a very costly operation. Commented May 17, 2020 at 17:35

2 Answers 2

0

So you want one thread to write data and another to read it? - a standard way to do this is with a mutex. You really will need a mutex because the value you are reading could change (e.g. the size of your vector) even as you try to access it - causing undefined behaviour (maybe a segfault!).

Also with vector it is not guaranteed to be in the same location once you add more elements to it - it could be that more memory is needed and it is moved elsewhere! - if you are in the middle of a read this is bad.

So I really don't see what choice you have - unless you make a copy - but that is more wasteful. If your vector is fixed size you might be able to get away with using a vector of std::atomic<int/float>, but then if your vector is not changing in size then use a std::array.

You could use a deque (double ended queue - effectively a linked list) where each element does stay in the same place in memory and should be safe to read from assuming your writer does not delete items from the deque!

So I would start with a mutex - which is very easy to use, and some shared data e.g.:

// NOTE: uses the gate class from the link in the comments for convenience
// pseudo code only

std::vector<int> shared_data;
std::atomic<int> workers_finished = 0;
gate writer_gate;
gate worker_gate;


// writer thread
// ... write to shared_data ...
// Now reset workers finished count and signal workers to start
workers_finished = 0;
worker_gate.open();
// Wait for workers to finish
while (workers_finished < num_workers)
{
    writer_gate.wait_at_gate();
}
// All workers finish - Loop back to start or whatever...


// Worker thread:
// Note - all workers can look like this:
// Wait to be signalled by the writer
worker_gate.wait_at_gate();
// Writer signalled - do work here
// Finished work, increment workers finished count
workers_finished++;
// Signal the writer thread
writer_gate.open();
// loop back to start or whatever...

I depends how you want to do this though. If you want to wait for the writer to finished writing or you want to read at the same time as the writer?

Another approach would be:

  1. writer does work
  2. When writer is finish - spawn multiple threads to do work
  3. Wait for threads to finish (join_thread - or use std::async / std::future).
  4. loop

I would say this is a simpler approach..

Sign up to request clarification or add additional context in comments.

12 Comments

single producer multiple consumer, lockless queues are also quite performant here. @code_fodder
@v78 I think it would really depend exactly what OP wants to do. Some operations are ok with atomic behaviour and other not. I do not think you would want to use a std::vector without any mutex if you had one thread adding elements to it for example (since the whole vector can be re-allocated in memory and OP suggests he wants std::vector (AFAIK)
if you see my edit, I currently have it. but the "copying process" takes almost as long as the actual processing. Maybe I get it wrong, but if I only pass a ref, thread A can change the data while B / C is accessing it. If I set mutex over the entire access, I can also work serially ...
@Metatron since you are using mutex - there is no need to be copying the data around. Just access it directly using the a_vector global variable - no need to use whatever getvector() does (where is that defined?). I'll try to add a simple e.g.
@Metatron infact I cant add a sensible example because I don't know what you are doing with the data! - what does calculate_a() do? If you want to optimise for speed, you could pre-allocate your vector (but use std::array) and so you do not need to add / remove elements to it and use std::atomic (as mentioned) then you won't need mutex's... but without seeing what all the code does - or what you are trying to achieve it is difficult to write a good example :)
|
0

std::vector <float*> a = b; makes a copy of b, but because it's a vector of pointers, only the pointers are copied, not the data they're pointing to.

Store computed data instead by-value so that it's easy to copy:

struct Data {
    std::vector<std::array<float, 3>> dataA;
    std::vector<int> dataB;
};

Now Data a = b; will make a copy of the data.


Additional notes:

  • you might want to add a condition_variable to signal when new data has been produced, in order to avoid consumers processing the same data again:

    std::condition_variable data_produced_cond;
    
  • it's OK to copy of the data in the consumers but a more efficient solution could be to use a ring-buffer, in order to re-use already allocated memory.

3 Comments

Hello thanks, I know a ring buffer, but I think that does not solve my problem. I want thread A to evaluate sensor data all the time and thread B and C always process the most recently created data record (all older ones are irrelevant). The problem is that all threads take different lengths of time and therefore do not always process the same data. With the ring buffer I would still have the problem that thread A may write data to the site in thread B or C grade read.
I can only think of B and C copying the data for themselves at the beginning (download the data set). only the copying seems to take a long time, because I am currently looping through the vector element for element with an iterator and could actually already run the evaluation here ...
I didn't understand that from the question. Anyway, it's easy to adjust the ring-buffer solution for that by always processing the last element from the buffer and discarding all queued data before it.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.