Lab Session # 7 Producer Consumer Problem Solution using pthreads
In this lab session, you will learn use of pthreads for famous synchronization problem
of producer consumer. Please download following files pthread.h, semaphore.h,
sched.h, pthreadVC2.lib, pthreadVC2.dll. The following code creates multiple threads
for producers and consumers and ensures synchronization between them. Visual C++
6.0 IDE is recommended for editing/compiling the program(s).
#include <pthread.h>
#include <semaphore.h>
#include <time.h> /* timeval{} for select() */
#include <stdio.h>
#include <stdlib.h>
#define NBUFF 10
#define MAXNTHREADS 100
int min (int a, int b);
int nitems, nproducers, nconsumers; /* read-only by producer and consumer */
struct
{ /* data shared by producers and consumer */
int buff[NBUFF];
int nput;
int nputval;
25
sem_t mutex, nempty, nstored; /* semaphores, not pointers */
}
shared;
void *produce (void *), *consume (void *);
int
main (int argc, char **argv)
{
int i,ccount[MAXNTHREADS], pcount[MAXNTHREADS];
pthread_t tid_consume[MAXNTHREADS],tid_produce[MAXNTHREADS] ;
if (argc != 4)
{
printf ("Invalid no. of arguments. Need nitems and nproducers. \n");
exit (3);
};
nitems = atoi (argv[1]);
nproducers = min (atoi (argv[2]), MAXNTHREADS);
nconsumers = min (atoi(argv[3]), MAXNTHREADS);
/* initialize three semaphores */
26
sem_init (&shared.mutex, 0, 1);
sem_init (&shared.nempty, 0, NBUFF);
sem_init (&shared.nstored, 0, 0);
/* create all producers */
for (i = 0; i < nproducers; i++)
{
pcount[i] = 0;
pthread_create (&tid_produce[i], NULL, produce, &pcount[i]);
}
/*create multipul consumers*/
for (i = 0; i < nconsumers; i++)
{
ccount[i] = 0;
pthread_create (&tid_consume[i], NULL, consume, &ccount[i]);
}
/* wait for all producers and the consumer */
for (i = 0; i < nproducers; i++)
pthread_join (tid_produce[i], NULL);
for (i = 0; i < nconsumers; i++)
pthread_join (tid_consume[i], NULL);
27
printf ("\n Number of buffer items contributed by each thread: \n");
for (i = 0; i < nproducers; i++)
printf ("pcount[%d] = %d\n", i, pcount[i]);
for (i = 0; i < nconsumers; i++)
printf ("ccount[%d] = %d\n", i, ccount[i]);
sem_destroy (&shared.mutex);
sem_destroy (&shared.nempty);
sem_destroy (&shared.nstored);
exit (0);
}
/* end main */
/* include produce */
void *
produce (void *arg)
{
for (;;)
{
sem_wait (&shared.nempty); /* wait for at least 1 empty slot */
28
sem_wait (&shared.mutex);
if (shared.nput >= nitems)
{
sem_post (&shared.nempty);
sem_post (&shared.mutex);
return (NULL); /* all done */
}
shared.buff[shared.nput % NBUFF] = shared.nputval;
printf ("p:%d ", shared.nputval);
shared.nput++;
shared.nputval++;
sem_post (&shared.mutex);
sem_post (&shared.nstored); /* 1 more stored item */
*((int *) arg) += 1; /*increment count of items contributed by this thread. */
}
}
/* end produce */
/* include consume */
void *consume (void *arg)
{
29
int i;
for (i = 0; i < nitems; i++)
//for(;;)
{
sem_wait (&shared.nstored); /* wait for at least 1 stored item */
sem_wait (&shared.mutex);
if (shared.buff[i % NBUFF] != i)
printf ("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
printf ("c:%d ", shared.buff[i % NBUFF]);
sem_post (&shared.mutex);
sem_post (&shared.nempty); /* 1 more empty slot */
}
return (NULL);
}
/* end consume */
int min (int a, int b)
{
if (a < b)
return a;
else
return b;
}
30
Exercise
Implement the same code for unbounded buffer producer consumer problem.