`
airu
  • 浏览: 267318 次
  • 性别: Icon_minigender_1
  • 来自: 云南
社区版块
存档分类
最新评论

线程队列

 
阅读更多
队列很常见,但是如果我们考虑在多线程环境中,那么可能就要注意同步互斥了。
这里使用读写锁,可以在读的时候不需要锁住整个队列。但是添加到队列就必须用互斥的锁了。
这里就介绍一下读写锁。pthread_rwlock_t lock定义一个读写锁lock,同样注意要初始化。
用 pthread_rwlock_init(&lock) 来初始化 lock。

下面来看一个列子,使用双向链表实现队列。可以从头添加或者从尾添加。
th_queue.h
#ifndef TH_QUEUE_H
#define TH_QUEUE_H
#endif

struct task{
 struct task *prev;
 struct task *next;
 pthread_t pid;
 void *(*handle)(void *);
 void *arg;
 int task_id;
};

struct th_queue{
        struct task *tail;
        struct task *head;
        pthread_rwlock_t q_lock;
        unsigned int count;
};

/*
 * insert into the head of the queue
 */
void ins_th_queue(struct th_queue*, struct task*);

/*
 * remove the task from the queue
 * return: the task which comes from the queue.if the task no
 *         int the queue ,return NULL
 */
struct task* rm_f_th_queue(struct th_queue*, struct task*);

/*
 * find a task from the queue with the task_id
 * return: if we find the task, NULL if we don't find it
 */
struct task* q_f_th_queue(struct th_queue*, int task_id);

/*
 * initialize the queue
 */
int init_th_queue(struct th_queue*);

/*
 * add the task to the tail of the queue
 */
void app_th_queue(struct th_queue*, struct task*);


头文件定义了一个task,是放入队列的任务。我们可以执行它,现在暂时不考虑太多。
接下来是实现 th_queue.c

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<sys/types.h>
#include<assert.h>
#include"th_queue.h"

int init_th_queue(struct th_queue * queue)
{
        int err;

        assert(queue!=NULL);

        queue->head = NULL;
        queue->tail = NULL;
        err = pthread_rwlock_init(&queue->q_lock,NULL);
        if(err!= 0)
                return err;
        queue->count = 0;
        return 0;
}


void app_th_queue(struct th_queue* queue, struct task *task)
{
        assert(task != NULL);
        assert(queue != NULL);

        if(q_f_th_queue(queue,task->task_id)!=NULL){
                printf("You have appended!\n");
                return;
        }
        pthread_rwlock_wrlock(&queue->q_lock);
        task->next = NULL;
        task->prev = queue->tail;
        if(queue->tail != NULL){
                queue->tail->next = task;
	}else{
                queue->head = task;
        }
        queue->tail = task;
        queue->count++;
        pthread_rwlock_unlock(&queue->q_lock);

}

void ins_th_queue(struct th_queue* queue, struct task *task)
{
        assert(queue != NULL);
        assert(task != NULL);
        if(q_f_th_queue(queue,task->task_id)!=NULL){
                printf("You have inserted!\n");
                return;
        }    
        pthread_rwlock_wrlock(&queue->q_lock);  
        task->next = queue->head;
        task->prev = NULL;
        if(queue->head != NULL)
                queue->head->prev = task;
        else
                queue->tail = task;
        queue->head = task;
        queue->count++;
        pthread_rwlock_unlock(&queue->q_lock);
}

struct task* rm_f_th_queue(struct th_queue* queue, struct task *task)
{
        assert(queue != NULL);
        assert(task != NULL);
        struct task *iter;
        int flag = 0;
        pthread_rwlock_rdlock(&queue->q_lock);
        for(iter = queue->head;iter != NULL; iter = iter->next){
                if(iter == task){
                        flag = 1;
                        break;
                }
        }
        pthread_rwlock_unlock(&queue->q_lock);
        if(!flag){
                 printf("task %d not in the queue!\n",task->task_id);
                 return task;
        }

        pthread_rwlock_wrlock(&queue->q_lock);
        if(queue->head == task){
                queue->head = task->next;
                if(queue->tail == task)
                        queue->tail = NULL;
        }else if(queue->tail == task){
                queue->tail = task->prev;
 		if(queue->head == task)
                        queue->head = NULL;
        }else{
                task->prev->next = task->next;
                task->next->prev = task->prev;
        }
        queue->count--;
        pthread_rwlock_unlock(&queue->q_lock);
        return task;

}

struct task* q_f_th_queue(struct th_queue* queue, int id)
{
        assert(queue != NULL);

        struct task *iter;
        if(pthread_rwlock_rdlock(&queue->q_lock)!=0)
                return NULL;
        for(iter = queue->head; iter!=NULL;iter = iter->next){
                if(iter->task_id == id){
                        pthread_rwlock_unlock(&queue->q_lock);
                        return (iter);
                }
        }
        pthread_rwlock_unlock(&queue->q_lock);
        return NULL;
}



怎么样很简单吧。接下来,我们要测试一下我们的queue是否是线程同步的呢?
写个测试文件吧。test_th_queue.c
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<sys/types.h>
#include<pthread.h>
#include"th_queue.h"

#define MAX 100
//define the operation type
enum ops{
	APP,
	INS,
	RM
};
// we use it to simplify the test
struct op_queue{
	struct task *task;
	enum ops op;
};
//global queue
struct th_queue *gl_queue;

//the real function to do the test
void *th_process_task(void* arg)
{
	struct op_queue *oq = (struct op_queue *)arg;
	struct task *t = oq->task;
	switch(oq->op){
	case APP: app_th_queue(gl_queue,t);
		  printf("add task %d to tail, queue count=%d\n",t->task_id,gl_queue->count);
		  break;
	case INS: ins_th_queue(gl_queue,t);
		  printf("add task %d to head, queue count=%d\n",t->task_id,gl_queue->count);
		  break;
	case RM:  t = rm_f_th_queue(gl_queue,t);
		  printf("rm task %d ,queue count=%d\n",t->task_id,gl_queue->count);
		  break;
	default: app_th_queue(gl_queue,t);
		 printf("add task %d to tail, queue count=%d\n",t->task_id,gl_queue->count);
	}
	return ((void *)0);
}
int main(int argc, char **argv)
{
	pthread_t pid[MAX];
	struct task taskset[MAX];
	struct op_queue q_op[MAX];
	int i,d,err;
	d = 0;
	gl_queue = (struct th_queue*)malloc(sizeof(struct th_queue));
	init_th_queue(gl_queue);
        // just for the test
	for(i = 0; i < MAX; i++){
		taskset[i].next = NULL;
		taskset[i].prev = NULL;
		taskset[i].handle = NULL;
		taskset[i].arg = NULL;
		taskset[i].task_id = i;
		taskset[i].status = 0;
		taskset[i].pri = 0;
		taskset[i].pos = 0;
	}	
	for(i = 0; i < MAX; i++){
		q_op[i].task = &taskset[i];	
		if( i < (MAX/2)){
			q_op[i].op = APP;
		}else if( i>= (MAX/2) && (i <= (MAX/2 + MAX/4))){
			q_op[i].op = INS;
		}else{
                       //actually we can omit following row
			q_op[i].op = RM;
		}
	}	
   //we will create 100 threads to operate the queue
	for(i = 0; i < MAX ; i++){
		if(i<= (MAX/2 + MAX/4)){
			err = pthread_create(&pid[i],NULL,th_process_task,&q_op[i]);
			if(err != 0){
				fprintf(stderr,"can't create thread ");
				exit(1);
			}	
			if(pthread_join(pid[i],NULL)!= 0){
				fprintf(stderr,"can't join thread");
				exit(1);
			}
		}else{
			q_op[d].op=RM;
			
			err = pthread_create(&pid[i],NULL,th_process_task,&q_op[d]);
			if(err != 0){
				fprintf(stderr,"can't create thread ");
				exit(1);
			}	
			if(pthread_join(pid[i],NULL)!= 0){
				fprintf(stderr,"can't join thread");
				exit(1);
			}
			d++;

		}	
	}
	//please free the queue ,if we don't use it again
        //but, we will exit completely, so i did not write free here
}


编译,链接:
$gcc -g -c th_queue.c
$gcc -g -c test_th_queue.c
$gcc -g -lpthread -o test_th_queue th_queue.o test_th_queue.o


来看看结果吧。

add task 0 to tail, queue count=1
add task 1 to tail, queue count=2
add task 2 to tail, queue count=3
add task 3 to tail, queue count=4
add task 4 to tail, queue count=5
add task 5 to tail, queue count=6
add task 6 to tail, queue count=7
add task 7 to tail, queue count=8
add task 8 to tail, queue count=9
add task 9 to tail, queue count=10
add task 10 to tail, queue count=11
add task 11 to tail, queue count=12
add task 12 to tail, queue count=13
add task 13 to tail, queue count=14
add task 14 to tail, queue count=15
add task 15 to tail, queue count=16
add task 16 to tail, queue count=17
add task 17 to tail, queue count=18
add task 18 to tail, queue count=19
add task 19 to tail, queue count=20
add task 20 to tail, queue count=21
add task 21 to tail, queue count=22
add task 22 to tail, queue count=23
add task 23 to tail, queue count=24
add task 24 to tail, queue count=25
add task 25 to tail, queue count=26
add task 26 to tail, queue count=27
add task 27 to tail, queue count=28
add task 28 to tail, queue count=29
add task 29 to tail, queue count=30
add task 30 to tail, queue count=31
add task 31 to tail, queue count=32
add task 32 to tail, queue count=33
add task 33 to tail, queue count=34
add task 34 to tail, queue count=35
add task 35 to tail, queue count=36
add task 36 to tail, queue count=37
add task 37 to tail, queue count=38
add task 38 to tail, queue count=39
add task 39 to tail, queue count=40
add task 40 to tail, queue count=41
add task 41 to tail, queue count=42
add task 42 to tail, queue count=43
add task 43 to tail, queue count=44
add task 44 to tail, queue count=45
add task 45 to tail, queue count=46
add task 46 to tail, queue count=47
add task 47 to tail, queue count=48
add task 48 to tail, queue count=49
add task 49 to tail, queue count=50
add task 50 to head, queue count=51
add task 51 to head, queue count=52
add task 52 to head, queue count=53
add task 53 to head, queue count=54
add task 54 to head, queue count=55
add task 55 to head, queue count=56
add task 56 to head, queue count=57
add task 57 to head, queue count=58
add task 58 to head, queue count=59
add task 59 to head, queue count=60
add task 60 to head, queue count=61
add task 61 to head, queue count=62
add task 62 to head, queue count=63
add task 63 to head, queue count=64
add task 64 to head, queue count=65
add task 65 to head, queue count=66
add task 66 to head, queue count=67
add task 67 to head, queue count=68
add task 68 to head, queue count=69
add task 69 to head, queue count=70
add task 70 to head, queue count=71
add task 71 to head, queue count=72
add task 72 to head, queue count=73
add task 73 to head, queue count=74
add task 74 to head, queue count=75
add task 75 to head, queue count=76
rm task 0 ,queue count=75
rm task 1 ,queue count=74
rm task 2 ,queue count=73
rm task 3 ,queue count=72
rm task 4 ,queue count=71
rm task 5 ,queue count=70
rm task 6 ,queue count=69
rm task 7 ,queue count=68
rm task 8 ,queue count=67
rm task 9 ,queue count=66
rm task 10 ,queue count=65
rm task 11 ,queue count=64
rm task 12 ,queue count=63
rm task 13 ,queue count=62
rm task 14 ,queue count=61
rm task 15 ,queue count=60
rm task 16 ,queue count=59
rm task 17 ,queue count=58
rm task 18 ,queue count=57
rm task 19 ,queue count=56
rm task 20 ,queue count=55
rm task 21 ,queue count=54
rm task 22 ,queue count=53
rm task 23 ,queue count=52

简单吧。。。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics