вторник, 31 мая 2011 г.

Multithread queue

This C module is similar to Python Queue - for multithreading programs. It is based on POSIX Threads and BSD queue.h.
  • qitem_t is the queue element (holds void* of data)
  • queue_t is the main struct
  • qerror_t is the enumeration of errors' codes
  • create_queue() is used for creation with maximal elements count limit
  • delete_queue() is the destructor
  • clear_queue() deletes all elements (last arg determines how to freeing elements)
  • put_to_queue() put element
  • get_from_queue() get (and remove) one element if available
  • get_all_from_queue() get all (and remove them) elements from queue

See the code:

#include <assert.h>
#include <stdlib.h>
#include "mtqueue.h"
#ifdef HAVE_LEAKDEBUG
#  include "cMemDbg.h"
#endif
extern void panic(const char *msg);
queue_t*
create_queue(int maxlen) {
        queue_t* q = (queue_t*)malloc(sizeof (queue_t));
        if (!q)
                goto alloc_error;
        SIMPLEQ_INIT(&q->head);
        q->len = 0;
        pthread_mutex_init(&q->qlock, NULL);
        q->maxlen = maxlen;
        return (q);
alloc_error:
    panic("Can not allocate queue");
    return (NULL);
}
void
delete_queue(queue_t *q, void (*delete_data)(void*)) {
        if (q) {
                clear_queue(q, delete_data); // delete all items from queue
                pthread_mutex_destroy(&q->qlock);
                free(q); // free queue
        }
}
void
clear_queue(queue_t *q, void (*delete_data)(void*)) {
        qitem_t *itemp = NULL;
        assert(q);
        if (lock_queue(q)) {
                panic("Can not lock the queue qlock");
                return;
        }
        while (!SIMPLEQ_EMPTY(&q->head)) {
                itemp = SIMPLEQ_FIRST(&q->head);
                if (itemp) {
                        if (delete_data && itemp->data)
                                delete_data(itemp->data);
                        free(itemp);
                }
                SIMPLEQ_REMOVE_HEAD(&q->head, items);
        }
        q->len = 0;
        unlock_queue(q);
}
qerror_t
put_to_queue(queue_t *q, void *data) {
        qerror_t result = (qerror_t)0;
        qitem_t *item = NULL;
        lock_queue(q);
        if (q->len == q->maxlen) {
                result = EQUEUEFULL;
                goto unlock;
        }
        item = (qitem_t*)malloc(sizeof (qitem_t));
        if (!item) {
                result = EQUEUEMEMORY;
                goto unlock;
        }
        item->data = data;
        SIMPLEQ_INSERT_TAIL(&q->head, item, items);
        q->len++;
unlock:
        unlock_queue(q);
        return (result);
}
qerror_t
get_from_queue(queue_t *q, void **dataptr) {
        qerror_t result = (qerror_t)0;
        qitem_t *item = NULL;
        lock_queue(q);
        if (SIMPLEQ_EMPTY(&q->head)) {
                result = EQUEUEEMPTY;
                goto unlock;
        }
        item = SIMPLEQ_FIRST(&q->head);
        *dataptr = item->data;
        SIMPLEQ_REMOVE_HEAD(&q->head, items);
        q->len--;
        free(item);
unlock:
        unlock_queue(q);
        return (result);
}
qerror_t
get_all_from_queue(queue_t *q, void **dataptr, int *num) {
        int i;
        qerror_t result = (qerror_t)0;
        qitem_t *item = NULL;
        void **all = NULL;
        lock_queue(q);
        if (SIMPLEQ_EMPTY(&q->head)) {
                result = EQUEUEEMPTY;
                goto unlock;
        }
        assert(q->len);
        all = malloc(q->len * sizeof (void*));
        if (!all) {
                result = EQUEUEMEMORY;
                panic("Can not allocate result items for get_all_from_queue()");
                goto unlock;
        }
        for (i=0; i<q->len; i++) {
                item = SIMPLEQ_FIRST(&q->head);
                //assert(item);
                if (!item) break;
                all[i] = item->data;
                SIMPLEQ_REMOVE_HEAD(&q->head, items);
                free(item);
        }
        *num = q->len;
        *dataptr = all;
        q->len = 0;
unlock:
        unlock_queue(q);
        return (result);
}

Here is .h file:

#ifndef _MTQUEUE_H
#define _MTQUEUE_H
#include <pthread.h>
#include <queue.h>
typedef struct qitem_t {
        void *data;
        SIMPLEQ_ENTRY(qitem_t) items; /* links */
} qitem_t;
typedef SIMPLEQ_HEAD(qhead_t, qitem_t) qhead_t;
typedef struct queue_t {
        qhead_t head;
        int len;
        int maxlen;
        pthread_mutex_t qlock;
} queue_t;
typedef enum qerror_t {
        EQUEUEFULL = 1,
        EQUEUEEMPTY,
        EQUEUEMEMORY
} qerror_t;
#define FREE_QDATA free
#define lock_queue(q) pthread_mutex_lock(&(q)->qlock)
#define unlock_queue(q) pthread_mutex_unlock(&(q)->qlock)
queue_t* create_queue(int maxlen);
void delete_queue(queue_t *q, void (*delete_data)(void*));
qerror_t put_to_queue(queue_t *q, void *data);
qerror_t get_from_queue(queue_t *q, void **dataptr);
qerror_t get_all_from_queue(queue_t *q, void **dataptr, int *num);
void clear_queue(queue_t *q, void (*delete_data)(void*));
#endif /*!_MTQUEUE_H*/

And simple test:

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include "mtqueue.h"
#include "delays.h"
#define NCICLES 200
queue_t *q = NULL;
void panic(char *msg){
        printf("PANIC: %s\n", msg);
        exit(1);
}
/* Thread of putting an elements to queue.
 */
void* thrput(void *arg) {
        int n;
        int *data;
        int i = *(int*)arg;
        while (i--) {
                printf("put #%d\n", i);
                n = NCICLES;
                while (n--) {
                        data = (int*)malloc(sizeof (int)); /* msg in heap! */
                        if (!data) {
                                panic("Can not allocate data for put");
                        }
                        *data = i*NCICLES + n;
                        while (put_to_queue(q, (void*)data)) {
                            printf("*** ERROR: queue is full!\n");
                            // wait 1 sec and try again
                            pthread_delay(1000);
                        }
                }
        }
}
void thrget1() {
        int i;
        int num = 0;
        void *data = NULL;
        int** idata;
        int prev = -1;
        while (1) {
                data = NULL;
                if (get_all_from_queue(q, &data, &num)) {
                        pthread_delay(2000);
                        if (!q->len)
                                break;
                        else
                                continue;
                }
                printf("get_all_from_queue num=%d\n", num);
                idata = (int**)data;
                for (i=0; i<num; i++) {
                        printf("got %d (prev=%d)\n", *idata[i], prev);
                        assert(prev==-1 || prev-*idata[i]==1);
                        prev = *idata[i];
                        free(idata[i]);
                }
                free(data);
        }
end:
        return;
}

void thrget2() {
        int prev = -1;
        int *data;
        while (1) {
                if (get_from_queue(q, (void**)&data)) {
                        pthread_delay(2000);
                        if (!q>len)
                                break;
                        else
                                continue;
                }
                printf("got %d (prev=%d)\n", *data, prev);
                assert(prev==-1 || prev-*data==1);
                prev = *data;
                free(data);
        }
}
void *thrget(void *arg) {
        thrget2(); // first, get all until NULL
        thrget1(); // then get one by one until NULL
}
void
main() {
        int n = 2;
        pthread_t thrput_id, thrget_id;
        q = create_queue(200); // size of queue
        pthread_create(&thrput_id, NULL, thrput, (void*)&n);
        pthread_create(&thrget_id, NULL, thrget, NULL);
        // wait finishing of the threads
        pthread_join(thrget_id, NULL);
        pthread_join(thrput_id, NULL);
        delete_queue(q, FREE_QDATA);
#ifdef HAVE_LEAKDEBUG
        PrintMemoryLeakInfo();
#endif
}
Tested on Linux, BSD, Windows succesfully.

Комментариев нет:

Отправить комментарий

Thanks for your posting!