- qitem_t is the queue element (holds void* of data)
- queue_t is the main struct
- qerror_t is the enumeration of errors' codes
- create_queue() is used for creation with maximal elements count limit
- delete_queue() is the destructor
- clear_queue() deletes all elements (last arg determines how to freeing elements)
- put_to_queue() put element
- get_from_queue() get (and remove) one element if available
- get_all_from_queue() get all (and remove them) elements from queue
See the code:
#include <assert.h>
#include <stdlib.h>
#include "mtqueue.h"
#ifdef HAVE_LEAKDEBUG
# include "cMemDbg.h"
#endif
extern void panic(const char *msg);
queue_t*
create_queue(int maxlen) {
queue_t* q = (queue_t*)malloc(sizeof (queue_t));
if (!q)
goto alloc_error;
SIMPLEQ_INIT(&q->head);
q->len = 0;
pthread_mutex_init(&q->qlock, NULL);
q->maxlen = maxlen;
return (q);
alloc_error:
panic("Can not allocate queue");
return (NULL);
}
void
delete_queue(queue_t *q, void (*delete_data)(void*)) {
if (q) {
clear_queue(q, delete_data); // delete all items from queue
pthread_mutex_destroy(&q->qlock);
free(q); // free queue
}
}
void
clear_queue(queue_t *q, void (*delete_data)(void*)) {
qitem_t *itemp = NULL;
assert(q);
if (lock_queue(q)) {
panic("Can not lock the queue qlock");
return;
}
while (!SIMPLEQ_EMPTY(&q->head)) {
itemp = SIMPLEQ_FIRST(&q->head);
if (itemp) {
if (delete_data && itemp->data)
delete_data(itemp->data);
free(itemp);
}
SIMPLEQ_REMOVE_HEAD(&q->head, items);
}
q->len = 0;
unlock_queue(q);
}
qerror_t
put_to_queue(queue_t *q, void *data) {
qerror_t result = (qerror_t)0;
qitem_t *item = NULL;
lock_queue(q);
if (q->len == q->maxlen) {
result = EQUEUEFULL;
goto unlock;
}
item = (qitem_t*)malloc(sizeof (qitem_t));
if (!item) {
result = EQUEUEMEMORY;
goto unlock;
}
item->data = data;
SIMPLEQ_INSERT_TAIL(&q->head, item, items);
q->len++;
unlock:
unlock_queue(q);
return (result);
}
qerror_t
get_from_queue(queue_t *q, void **dataptr) {
qerror_t result = (qerror_t)0;
qitem_t *item = NULL;
lock_queue(q);
if (SIMPLEQ_EMPTY(&q->head)) {
result = EQUEUEEMPTY;
goto unlock;
}
item = SIMPLEQ_FIRST(&q->head);
*dataptr = item->data;
SIMPLEQ_REMOVE_HEAD(&q->head, items);
q->len--;
free(item);
unlock:
unlock_queue(q);
return (result);
}
qerror_t
get_all_from_queue(queue_t *q, void **dataptr, int *num) {
int i;
qerror_t result = (qerror_t)0;
qitem_t *item = NULL;
void **all = NULL;
lock_queue(q);
if (SIMPLEQ_EMPTY(&q->head)) {
result = EQUEUEEMPTY;
goto unlock;
}
assert(q->len);
all = malloc(q->len * sizeof (void*));
if (!all) {
result = EQUEUEMEMORY;
panic("Can not allocate result items for get_all_from_queue()");
goto unlock;
}
for (i=0; i<q->len; i++) {
item = SIMPLEQ_FIRST(&q->head);
//assert(item);
if (!item) break;
all[i] = item->data;
SIMPLEQ_REMOVE_HEAD(&q->head, items);
free(item);
}
*num = q->len;
*dataptr = all;
q->len = 0;
unlock:
unlock_queue(q);
return (result);
}
Here is .h file:
#ifndef _MTQUEUE_H
#define _MTQUEUE_H
#include <pthread.h>
#include <queue.h>
typedef struct qitem_t {
void *data;
SIMPLEQ_ENTRY(qitem_t) items; /* links */
} qitem_t;
typedef SIMPLEQ_HEAD(qhead_t, qitem_t) qhead_t;
typedef struct queue_t {
qhead_t head;
int len;
int maxlen;
pthread_mutex_t qlock;
} queue_t;
typedef enum qerror_t {
EQUEUEFULL = 1,
EQUEUEEMPTY,
EQUEUEMEMORY
} qerror_t;
#define FREE_QDATA free
#define lock_queue(q) pthread_mutex_lock(&(q)->qlock)
#define unlock_queue(q) pthread_mutex_unlock(&(q)->qlock)
queue_t* create_queue(int maxlen);
void delete_queue(queue_t *q, void (*delete_data)(void*));
qerror_t put_to_queue(queue_t *q, void *data);
qerror_t get_from_queue(queue_t *q, void **dataptr);
qerror_t get_all_from_queue(queue_t *q, void **dataptr, int *num);
void clear_queue(queue_t *q, void (*delete_data)(void*));
#endif /*!_MTQUEUE_H*/And simple test:
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include "mtqueue.h"
#include "delays.h"
#define NCICLES 200
queue_t *q = NULL;
void panic(char *msg){
printf("PANIC: %s\n", msg);
exit(1);
}
/* Thread of putting an elements to queue.
*/
void* thrput(void *arg) {
int n;
int *data;
int i = *(int*)arg;
while (i--) {
printf("put #%d\n", i);
n = NCICLES;
while (n--) {
data = (int*)malloc(sizeof (int)); /* msg in heap! */
if (!data) {
panic("Can not allocate data for put");
}
*data = i*NCICLES + n;
while (put_to_queue(q, (void*)data)) {
printf("*** ERROR: queue is full!\n");
// wait 1 sec and try again
pthread_delay(1000);
}
}
}
}
void thrget1() {
int i;
int num = 0;
void *data = NULL;
int** idata;
int prev = -1;
while (1) {
data = NULL;
if (get_all_from_queue(q, &data, &num)) {
pthread_delay(2000);
if (!q->len)
break;
else
continue;
}
printf("get_all_from_queue num=%d\n", num);
idata = (int**)data;
for (i=0; i<num; i++) {
printf("got %d (prev=%d)\n", *idata[i], prev);
assert(prev==-1 || prev-*idata[i]==1);
prev = *idata[i];
free(idata[i]);
}
free(data);
}
end:
return;
}
void thrget2() {
int prev = -1;
int *data;
while (1) {
if (get_from_queue(q, (void**)&data)) {
pthread_delay(2000);
if (!q>len)
break;
else
continue;
}
printf("got %d (prev=%d)\n", *data, prev);
assert(prev==-1 || prev-*data==1);
prev = *data;
free(data);
}
}
void *thrget(void *arg) {
thrget2(); // first, get all until NULL
thrget1(); // then get one by one until NULL
}
void
main() {
int n = 2;
pthread_t thrput_id, thrget_id;
q = create_queue(200); // size of queue
pthread_create(&thrput_id, NULL, thrput, (void*)&n);
pthread_create(&thrget_id, NULL, thrget, NULL);
// wait finishing of the threads
pthread_join(thrget_id, NULL);
pthread_join(thrput_id, NULL);
delete_queue(q, FREE_QDATA);
#ifdef HAVE_LEAKDEBUG
PrintMemoryLeakInfo();
#endif
}
Tested on Linux, BSD, Windows succesfully.