- qitem_t is the queue element (holds void* of data)
- queue_t is the main struct
- qerror_t is the enumeration of errors' codes
- create_queue() is used for creation with maximal elements count limit
- delete_queue() is the destructor
- clear_queue() deletes all elements (last arg determines how to freeing elements)
- put_to_queue() put element
- get_from_queue() get (and remove) one element if available
- get_all_from_queue() get all (and remove them) elements from queue
See the code:
#include <assert.h> #include <stdlib.h> #include "mtqueue.h" #ifdef HAVE_LEAKDEBUG # include "cMemDbg.h" #endif extern void panic(const char *msg); queue_t* create_queue(int maxlen) { queue_t* q = (queue_t*)malloc(sizeof (queue_t)); if (!q) goto alloc_error; SIMPLEQ_INIT(&q->head); q->len = 0; pthread_mutex_init(&q->qlock, NULL); q->maxlen = maxlen; return (q); alloc_error: panic("Can not allocate queue"); return (NULL); } void delete_queue(queue_t *q, void (*delete_data)(void*)) { if (q) { clear_queue(q, delete_data); // delete all items from queue pthread_mutex_destroy(&q->qlock); free(q); // free queue } } void clear_queue(queue_t *q, void (*delete_data)(void*)) { qitem_t *itemp = NULL; assert(q); if (lock_queue(q)) { panic("Can not lock the queue qlock"); return; } while (!SIMPLEQ_EMPTY(&q->head)) { itemp = SIMPLEQ_FIRST(&q->head); if (itemp) { if (delete_data && itemp->data) delete_data(itemp->data); free(itemp); } SIMPLEQ_REMOVE_HEAD(&q->head, items); } q->len = 0; unlock_queue(q); } qerror_t put_to_queue(queue_t *q, void *data) { qerror_t result = (qerror_t)0; qitem_t *item = NULL; lock_queue(q); if (q->len == q->maxlen) { result = EQUEUEFULL; goto unlock; } item = (qitem_t*)malloc(sizeof (qitem_t)); if (!item) { result = EQUEUEMEMORY; goto unlock; } item->data = data; SIMPLEQ_INSERT_TAIL(&q->head, item, items); q->len++; unlock: unlock_queue(q); return (result); } qerror_t get_from_queue(queue_t *q, void **dataptr) { qerror_t result = (qerror_t)0; qitem_t *item = NULL; lock_queue(q); if (SIMPLEQ_EMPTY(&q->head)) { result = EQUEUEEMPTY; goto unlock; } item = SIMPLEQ_FIRST(&q->head); *dataptr = item->data; SIMPLEQ_REMOVE_HEAD(&q->head, items); q->len--; free(item); unlock: unlock_queue(q); return (result); } qerror_t get_all_from_queue(queue_t *q, void **dataptr, int *num) { int i; qerror_t result = (qerror_t)0; qitem_t *item = NULL; void **all = NULL; lock_queue(q); if (SIMPLEQ_EMPTY(&q->head)) { result = EQUEUEEMPTY; goto unlock; } assert(q->len); all = malloc(q->len * sizeof (void*)); if (!all) { result = EQUEUEMEMORY; panic("Can not allocate result items for get_all_from_queue()"); goto unlock; } for (i=0; i<q->len; i++) { item = SIMPLEQ_FIRST(&q->head); //assert(item); if (!item) break; all[i] = item->data; SIMPLEQ_REMOVE_HEAD(&q->head, items); free(item); } *num = q->len; *dataptr = all; q->len = 0; unlock: unlock_queue(q); return (result); }
Here is .h file:
#ifndef _MTQUEUE_H #define _MTQUEUE_H #include <pthread.h> #include <queue.h> typedef struct qitem_t { void *data; SIMPLEQ_ENTRY(qitem_t) items; /* links */ } qitem_t; typedef SIMPLEQ_HEAD(qhead_t, qitem_t) qhead_t; typedef struct queue_t { qhead_t head; int len; int maxlen; pthread_mutex_t qlock; } queue_t; typedef enum qerror_t { EQUEUEFULL = 1, EQUEUEEMPTY, EQUEUEMEMORY } qerror_t; #define FREE_QDATA free #define lock_queue(q) pthread_mutex_lock(&(q)->qlock) #define unlock_queue(q) pthread_mutex_unlock(&(q)->qlock) queue_t* create_queue(int maxlen); void delete_queue(queue_t *q, void (*delete_data)(void*)); qerror_t put_to_queue(queue_t *q, void *data); qerror_t get_from_queue(queue_t *q, void **dataptr); qerror_t get_all_from_queue(queue_t *q, void **dataptr, int *num); void clear_queue(queue_t *q, void (*delete_data)(void*)); #endif /*!_MTQUEUE_H*/
And simple test:
#include <assert.h> #include <stdio.h> #include <stdlib.h> #include "mtqueue.h" #include "delays.h" #define NCICLES 200 queue_t *q = NULL; void panic(char *msg){ printf("PANIC: %s\n", msg); exit(1); } /* Thread of putting an elements to queue. */ void* thrput(void *arg) { int n; int *data; int i = *(int*)arg; while (i--) { printf("put #%d\n", i); n = NCICLES; while (n--) { data = (int*)malloc(sizeof (int)); /* msg in heap! */ if (!data) { panic("Can not allocate data for put"); } *data = i*NCICLES + n; while (put_to_queue(q, (void*)data)) { printf("*** ERROR: queue is full!\n"); // wait 1 sec and try again pthread_delay(1000); } } } } void thrget1() { int i; int num = 0; void *data = NULL; int** idata; int prev = -1; while (1) { data = NULL; if (get_all_from_queue(q, &data, &num)) { pthread_delay(2000); if (!q->len) break; else continue; } printf("get_all_from_queue num=%d\n", num); idata = (int**)data; for (i=0; i<num; i++) { printf("got %d (prev=%d)\n", *idata[i], prev); assert(prev==-1 || prev-*idata[i]==1); prev = *idata[i]; free(idata[i]); } free(data); } end: return; } void thrget2() { int prev = -1; int *data; while (1) { if (get_from_queue(q, (void**)&data)) { pthread_delay(2000); if (!q>len) break; else continue; } printf("got %d (prev=%d)\n", *data, prev); assert(prev==-1 || prev-*data==1); prev = *data; free(data); } } void *thrget(void *arg) { thrget2(); // first, get all until NULL thrget1(); // then get one by one until NULL } void main() { int n = 2; pthread_t thrput_id, thrget_id; q = create_queue(200); // size of queue pthread_create(&thrput_id, NULL, thrput, (void*)&n); pthread_create(&thrget_id, NULL, thrget, NULL); // wait finishing of the threads pthread_join(thrget_id, NULL); pthread_join(thrput_id, NULL); delete_queue(q, FREE_QDATA); #ifdef HAVE_LEAKDEBUG PrintMemoryLeakInfo(); #endif }Tested on Linux, BSD, Windows succesfully.