multithreading - C++ UNIX threading -
i doing project threading in unix , c++. there producer thread, , 5 consumer threads. producer thread adds incrementing numbers queue @ random times, , consumer threads poll q trying remove it. reason q.size() keeps going negative , cant figure out why.
#include <queue> #include <list> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> using namespace std; #define num_consumer_threads 5 #define num_producer_threads 1 #define buffer_size 20 void *c_thread_function(void *arg); void *p_thread_function(void *arg); queue<int> q; int produce(int cur) { int temp = cur + 1; return temp; } void append(int num) { if ( q.size() < buffer_size ) { q.push(num); } } int take() { int removed = q.front(); q.pop(); sleep(1); return removed; } void consume(int num, int thread) { printf("%d consumed %d \n", thread, num); } int main() { int result; pthread_t cthreads[num_consumer_threads]; pthread_t pthreads[num_producer_threads]; void *thread_result; // build array of consumer threads for(int num_of_cthreads = 0; num_of_cthreads < num_consumer_threads; num_of_cthreads++) { result = pthread_create(&(cthreads[num_of_cthreads]), null, c_thread_function, (void *)num_of_cthreads); if ( result != 0 ) { perror( "thread creation failed"); exit(exit_failure); } //sleep(1); } // build array of producer threads for(int num_of_pthreads = 0; num_of_pthreads < num_producer_threads; num_of_pthreads++) { result = pthread_create(&(pthreads[num_of_pthreads]), null, p_thread_function, null); if ( result != 0 ) { perror( "thread creation failed"); exit(exit_failure); } //sleep(1); } printf("all threads created\n"); while ( true ) { // nothing } } void *c_thread_function(void *arg) { int temp = (long)arg; printf("consumer thread %d created \n", temp); while ( true ) { while ( q.size() > 0 ) { int w = take(); consume(w, temp); printf(" q size %d \n", q.size()); } } } void *p_thread_function(void *arg) { printf("producer thread created \n"); int itemsadded = 0; int temp; int sleeptime; while ( true ) { while ( q.size() < buffer_size ) { temp = produce(itemsadded); sleeptime = 1+(int)(9.0*rand()/(rand_max+1.0)); sleep(sleeptime); append(temp); printf("producer adds: %d \n", temp); printf(" q size %d \n", q.size()); itemsadded++; } } }
output:
producer adds: 1
q size -1
0 consumed 1
q size -2
1 consumed 1
q size -3
3 consumed 1
q size -4
4 consumed 0
q size -5
0 consumed 0
you need learn concept of race conditions , mutual exclusion. std::queue
object shared resource, meaning more 1 thread operating on - potentially at same time. means have protect using locks (known mutexes), each access synchronized. otherwise, you'll what's known race condition, 1 thread modifies data while thread accessing/modifying data, leading inconsistent or corrupt program state.
to prevent race conditions, need lock pthread_mutex
object before every queue access.
first, need create mutex object , initialize it.
pthread_mutex mymutex; pthread_mutex_init(&mymutex, 0);
your application code should this:
pthread_mutex_lock(&mymutex); // queue pthread_mutex_unlock(&mymutex);
when 1 thread acquires lock, no other thread can acquire lock. thread tries acquire lock has been acquired thread wait until lock released. synchronizes access queue, ensuring 1 thread modifies @ time.
Comments
Post a Comment