线程同步的三种方式

Published On 五月 31, 2017

category linux | tags thread synchronize


本文非原创,大部分内容翻译自advanced-linux-programming这本书的chapter-04: threads。本文在原文的基础上作了部分删减,并加入了一些自己的理解,最重要的是将示例代码补充完整,可以直接编译执行。 ALP这本书很薄,内容浅显易懂,每个知识点都附有hello world级别的示例代码,是一本不错的入门书籍。当然也有很多国内的读者吐槽这本书深度不够,这方面确实不能和Advanced Programming in the Unix Environment比。

本文使用C编程展示linux上线程同步(synchronization)的三种方式:互斥锁,信号量和条件变量。

竞争条件

多线程的使用场景很多,比如IO密集型程序(爬虫),交互式程序(android应用),多核编程等。线程是操作系统进行独立调度的基本单位,在多核的机器上多个线程可以同时执行,即真正的并行(parallel),在核数不够的情况下操作系统会进行线程切换,使多个线程交替使用同一个处理器,也即并发(conncurrent),问题是我们并不知道一个线程何时会被挂起或恢复。而且系统进行调度的次序无法控制,同一个程序多次运行的结果也会不同。

多线程能共享数据是它的一大优势,同时也很危险。多线程编程最棘手的问题就是竞争条件(race condition),也就是两个线程同时修改同一个数据。

举一个例子: 有多个线程访问一个任务队列,任务队列使用一个链表表示。每个线程在循环中检查队列是否为空,如果非空则弹出头结点进行处理。

#include <malloc.h>
struct job {
    /* Link field for linked list. */
    struct job* next;
    /* Other fields describing work to be done... */
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
    while (job_queue != NULL)
    {
        /* Get the next available job. */
        struct job* next_job = job_queue;
        /* Remove this job from the list. */
        job_queue = job_queue->next;
        /* Carry out the work. */
        process_job (next_job);
        /* Clean up. */
        free (next_job);
    }
    return NULL;
}

假设现在队列不为空,有两个线程同时进入循环,第一个线程将队列头结点的指针保存在next_job中,此时,假设系统中断了第一个线程并调度第二个线程,第二个线程将next_job指向了同一个结点,因此两个线程将执行同一个任务。更严重的问题是,假设第一个线程移除头结点后队列变为空,第二个线程再次移除头结点将会导致段错误。这就是一个竞争条件的例子,幸运的话,这种特定的调度永远也不会发生,也就发现不了bug。

为了消除竞争条件,需要一种方法使得操作变成原子的。原子操作(atomic operation)是一组不可再分和不可中断的指令。不可再分是指一旦开始,直到执行完都不能被暂停;不可中断是指不能被中断,并且中途不能发生其他操作。对于这个例子,我们希望将检查队列是否为空和弹出头结点作为一个原子操作,也就是同一时间只允许一个线程访问队列(读或者写)。

线程同步的三种方式

互斥锁(Mutexes)

mutexes(MUTual EXclusion locks),即互斥锁,是GNU/Linux提供的一种锁机制。它是一种排他锁,同一时间只有一个线程可以获取锁。如果一个线程获取了互斥锁,第二个线程试图获取同一个互斥锁,后者将被阻塞。只有当第一个线程释放锁,第二个线程才能恢复执行。好奇的读者会想,两个线程同时获取锁,也就是同时访问一个数据,会发生竞争条件吗?答案是不会,这是由操作系统确保的。

创建互斥锁

创建一个互斥锁的代码片段如下:

pthread_mutex_t mutex;
pthread_mutex_init (&mutex, NULL);
pthread_mutex_init的第二个参数用来指定互斥锁的属性,NULL表示使用默认属性。

也可以使用PTHREAD_MUTEX_INITIALIZER宏来创建具有默认属性的互斥锁,上面的代码片段可以等价地写成:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

获取与释放锁

一个线程通过调用pthread_mutex_lock来加锁,如果互斥锁处于unlocked的状态,它会变为locked并且该函数立刻返回。如果互斥锁已经被另一个线程加锁,pthread_mutex_lock会阻塞,直到该锁被另一个线程释放才返回。多个线程可能被同一个互斥锁阻塞,当该互斥锁被释放的时候,只有一个被阻塞的线程会变成非阻塞并对互斥锁加锁,选择哪一个线程是不可预知的。

调用pthread_mutex_unlock释放锁,这个函数应该总是被对互斥锁进行加锁的线程调用。

非阻塞互斥锁

有时候我们想判断一个互斥锁是否被锁住,而又不想被阻塞。比如一个线程想要获取互斥锁,但如果互斥锁已经被锁住,该线程不等待而是去做其他事情。对一个处于unlocked的互斥锁调用pthread_mutex_trylock,结果是对它进行加锁,就像调用pthread_mutex_lock一样。然而,如果这个互斥锁已经被另一个线程锁住,pthread_mutex_trylock不会阻塞,相反,它会立刻返回,错误码置为EBUSY

互斥锁与死锁

互斥锁提供了一种允许一个线程阻塞其他线程的机制。这同时也引入了另一类bug,叫做死锁。当一个或多个线程卡在了等待永远也不会发生的事情上,就发生了死锁。一种简单的死锁类型是一个线程连续两次获取同一个互斥锁。在这种情况下的行为由所使用的互斥锁类型决定。一共有三种互斥锁:

  • fast mutex(默认): 对fast mutex连续两次加锁会导致死锁。对互斥锁进行加锁会被阻塞,直到该互斥锁被释放。因为第二次加锁的互斥锁已经被该线程锁住,锁永远也不会被释放。
  • recursive mutex: 这种锁不会导致死锁。同一个线程可以安全地对recursive mutex多次加锁。该锁会记住持有锁的线程调用了多少次pthread_mutex_lock,那个线程必须调用相同次数pthread_mutex_unlock才能真正释放该锁。
  • error-checking mutex: GNU/Linux会对这种互斥锁检测并标记双锁,当第二次调用pthread_mutex_lock会返回失败码EDEADLK

默认是第一种锁,要创建后两种锁,需要先通过申明pthread_mutexattr_t变量创建一个互斥锁的属性对象,并对它的指针调用pthread_mutexattr_init。然后调用pthread_mutexattr_setkind_np,第一个参数是互斥锁属性对象的指针,第二个参数是互斥锁的类型:PTHREAD_MUTEX_RECURSIVE_NP表示recursive mutex,PTHREAD_MUTEX_ERRORCHECK_NP表示error-checking mutex。将这个互斥锁属性对象的指针传给pthread_mutex_init的第二个参数就可以创建一个这种类型的互斥锁,然后使用pthread_mutexattr_destroy销毁属性对象。创建一个error-checking 互斥锁的代码如下:

pthread_mutexattr_t attr;
pthread_mutex_t mutex;
pthread_mutexattr_init (&attr);
pthread_mutexattr_setkind_np (&attr, PTHREAD_MUTEX_ERRORCHECK_NP);
pthread_mutex_init (&mutex, &attr);
pthread_mutexattr_destroy (&attr);

示例

下面展示了任务队列例子的另一个版本,现在队列被一个互斥锁保护起来。在访问队列之前(不管读还是写),每个线程先获取互斥锁,只有当检查队列、移除一个任务的流程完成后才释放该锁。

#include <malloc.h>
#include <pthread.h>
struct job
{
    /* Link field for linked list. */
    struct job* next;
    /* Other fields describing work to be done... */
    int val;
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* A mutex protecting job_queue. */
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;

void process_job(struct job* new_job)
{
    printf("%u got %d\n", pthread_self(), new_job->val);
}
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
    while (1)
    {
        struct job* next_job;
        /* Lock the mutex on the job queue. */
        pthread_mutex_lock (&job_queue_mutex);
        /* Now it’s safe to check if the queue is empty. */
        if (job_queue == NULL)
            next_job = NULL;
        else
        {
            /* Get the next available job. */
            next_job = job_queue;
            /* Remove this job from the list. */
            job_queue = job_queue->next;
        }
        /* Unlock the mutex on the job queue because we’re done with the
        queue for now. */
        pthread_mutex_unlock (&job_queue_mutex);
        /* Was the queue empty? If so, end the thread. */
        if (next_job == NULL)
            break;
        /* Carry out the work. */
        process_job (next_job);
        /* Clean up. */
        free (next_job);
    }
    return NULL;
}
void enqueue_job (struct job* new_job)
{
    pthread_mutex_lock (&job_queue_mutex);
    new_job->next = job_queue;
    job_queue = new_job;
    pthread_mutex_unlock (&job_queue_mutex);
}

int main()
{
    // enqueue all the jobs in advance
    struct job* new_job;
    for(int n=1; n<=10; n++)
    {
        new_job = (struct job*) malloc (sizeof (struct job));
        new_job->val = n;
        enqueue_job(new_job);
    }
    // create two threads processing the jobs
    pthread_t thread_id1;
    pthread_create (&thread_id1, NULL, &thread_function, NULL);
    pthread_t thread_id2;
    pthread_create (&thread_id2, NULL, &thread_function, NULL);

    pthread_join (thread_id1, NULL);
    pthread_join (thread_id2, NULL);
    return 0;
}
因为使用了POSIX thread,编译时需要指定连接库pthread,下面的例子也一样:
gcc job-queue2.c -o job-queue2 -lpthread

所有对job_queue的访问操作都位于pthread_mutex_lockpthread_mutex_unlock之间,在这个区间之外访问保存在next_job中的job对象,因为此时该job已经被从队列中移除,所以其他线程无法访问。 注意,不能在判断队列为空后立刻跳出循环,那样的话会使得这个互斥锁永远无法释放,导致其他线程再也无法访问任务队列。

任务入队也需要加锁,这里为了简单起见,所以只在一个线程(主线程)中调用了enqueue_job

互斥锁是后面两种同步方式的基础,也就是说线程同步离不开互斥锁

信号量(semaphores)

在前面的例子中,工作线程从任务队列中取出下一个任务进行处理,直到队列中没有任务就退出了。这种模式只在任务被提前放到队列或者新任务以更快的速度加入队列的情况下才能正常工作。我们更希望队列为空的时候线程被阻塞,直到有新的任务可以处理。

信号量提供了一种方便的方式来实现这个目的。信号量是一种计数器,可以用来同步多个线程。与互斥锁一样,GNU/Linux确保检查和修改信号量的值是安全的,不会发生竞争条件。

每一个信号量有一个非负的整数计数器,支持以下两种基本的操作:

  • wait操作对信号量的值减1,如果这个值已经是0,该操作会被阻塞,直到信号量的值变为正(由于其他线程的动作)。当信号量的值变成正数后,它的值会被减1,然后这个wait操作才返回。
  • post操作对信号量的值加1,如果这个值之前是0,并且其他线程由于wait操作被阻塞在这个信号量上,其中一个线程会被唤醒,完成wait操作。

GNU/Linux提供了两种稍微不同的信号量,我们这里使用的是POSIX标准的信号量,使用信号量来进行多线程间通信。另一种实现用来实现进程间的通信。

信号量的使用

创建

一个信号量用一个sem_t变量表示,使用之前必须调用sem_init初始化,并且传一个sem_t变量的指针作为第一个参数,第二个参数固定为0,第三个参数是信号量的初始值。如果用完信号量,最好使用sem_destroy销毁。

wait&post

要wait一个信号量,使用sem_wait,post一个信号量,使用sem_post。与pthread_mutex_trylock类似,也有一个非阻塞的wait函数sem_trywait,如果wait操作因为信号量的值是0而阻塞,这个函数会立刻返回,错误码是EAGAIN

获取信号量的值

GNU/Linux也提供了一个获取信号量当前值的函数sem_getvalue,它将信号量的值放到第二个参数指向的int变量中。你不应该使用这个值来决定是否post或者wait这个信号量。这么做会导致竞争条件:因为另一个线程可能会在调用sem_getvalue和另一个信号量操作之间改变信号量的值。应当使用原子的post和wait函数。

示例

回到任务队列的例子来,我们使用一个信号量对队列中的任务计数。下面的代码使用信号量来控制队列:

#include <malloc.h>
#include <pthread.h>
#include <semaphore.h>
struct job
{
    /* Link field for linked list. */
    struct job* next;
    /* Other fields describing work to be done... */
    int val;
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* A mutex protecting job_queue. */
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;

/* A semaphore counting the number of jobs in the queue. */
sem_t job_queue_count;

/* Perform one-time initialization of the job queue. */
void initialize_job_queue ()
{
    /* The queue is initially empty. */
    job_queue = NULL;
    /* Initialize the semaphore which counts jobs in the queue. Its
    initial value should be zero. */
    sem_init (&job_queue_count, 0, 0);
}
void process_job(struct job* new_job)
{
    printf("%u got %d\n", pthread_self(), new_job->val);
}
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
    while (1)
    {
        struct job* next_job;
        /* Wait on the job queue semaphore. If its value is positive,
        indicating that the queue is not empty, decrement the count by
        1. If the queue is empty, block until a new job is enqueued. */
        sem_wait (&job_queue_count);
        /* Lock the mutex on the job queue. */
        pthread_mutex_lock (&job_queue_mutex);
        /* Because of the semaphore, we know the queue is not empty. Get
        the next available job. */
        next_job = job_queue;
        /* Remove this job from the list. */
        job_queue = job_queue->next;
        /* Unlock the mutex on the job queue because we’re done with the
        queue for now. */
        pthread_mutex_unlock (&job_queue_mutex);
        /* Carry out the work. */
        process_job (next_job);
        /* Clean up. */
        free (next_job);
    }
    return NULL;
}
/* Add a new job to the front of the job queue. */
void enqueue_job (/* Pass job-specific data here... */int val)
{
    struct job* new_job;
    /* Allocate a new job object. */
    new_job = (struct job*) malloc (sizeof (struct job));
    /* Set the other fields of the job struct here... */
    new_job->val = val;
    /* Lock the mutex on the job queue before accessing it. */
    pthread_mutex_lock (&job_queue_mutex);
    /* Place the new job at the head of the queue. */
    new_job->next = job_queue;
    job_queue = new_job;
    /* Post to the semaphore to indicate that another job is available. If
    threads are blocked, waiting on the semaphore, one will become
    unblocked so it can process the job. */
    sem_post (&job_queue_count);
    /* Unlock the job queue mutex. */
    pthread_mutex_unlock (&job_queue_mutex);
}

int main()
{
    int input;
    initialize_job_queue();
    pthread_t thread_id1;
    pthread_create (&thread_id1, NULL, &thread_function, NULL);
    pthread_t thread_id2;
    pthread_create (&thread_id2, NULL, &thread_function, NULL);
    while(scanf("%d", &input) && input!=EOF)
    {
        enqueue_job(input);
    };
    return 0;
}
在从队列取出一个任务之前,每个线程会先等待信号量,如果信号量的值是0,表示队列是空的,线程会被阻塞直到信号量的值变为正,也就是有任务被加到了队列中。enqueue_job函数用来往队列中添加一个任务,与thread_function一样,它需要在修改队列之前先加锁。添加一个任务之后,它对信号量执行一次post,表示有新的任务来了。 这个版本的线程永远也不会退出,如果没有任务可以处理,所有的线程都会阻塞在sem_wait

条件变量(condition variable)

前面展示了如何使用互斥锁保护一个被两个线程同时访问的共享变量,以及如何使用信号量实现一个共享计数器。条件变量是GNU/Linux提供的第三种同步机制,使用它,你可以实现更多复杂的条件来控制线程执行。

应用场景

假设你写了一个线程,该线程会在一个无限循环的每一次迭代中做某样事情。循环受一个flag的控制,只有当这个flag被设置了循环才执行,否则循环就暂停。

下面的代码通过轮询来实现,在循环每一次迭代,线程检查flag是否被设置。因为flag被多个线程访问,所以它被互斥锁保护起来。这种实现虽然是正确的,但并不高效。线程会消耗大量的CPU,只要flag没有被设置,它就会不断检查,每次都要获取互斥锁。我们希望的方式是:当flag没有被设置的时候线程被置为睡眠状态,当flag被设置后将线程唤醒。

#include <pthread.h>

int thread_flag;
pthread_mutex_t thread_flag_mutex;
void initialize_flag ()
{
    pthread_mutex_init (&thread_flag_mutex, NULL);
    thread_flag = 0;
}
/* Calls do_work repeatedly while the thread flag is set; otherwise spins. */
void* thread_function (void* thread_arg)
{
    while (1)
    {
        int flag_is_set;
        /* Protect the flag with a mutex lock. */
        pthread_mutex_lock (&thread_flag_mutex); flag_is_set = thread_flag; pthread_mutex_unlock (&thread_flag_mutex);
        if (flag_is_set)
            do_work ();
        /* Else don’t do anything. Just loop again. */
    }
    return NULL;
}
/* Sets the value of the thread flag to FLAG_VALUE. */
void set_thread_flag (int flag_value)
{
    /* Protect the flag with a mutex lock. */
    pthread_mutex_lock (&thread_flag_mutex);
    thread_flag = flag_value;
    pthread_mutex_unlock (&thread_flag_mutex);
}
条件变量让你能够实现在某种条件下线程得以执行,否则线程会被阻塞,当条件 改变 时,被阻塞的线程自动恢复执行。

使用条件变量可以让前面的例子变得更加高效:

  • 在循环里检查flag,如果flag没有被设置,线程就等待这个条件变量
  • set_thread_flag函数修改flag的值后发出信号。阻塞的线程会被唤醒,然后再次检查条件

结合互斥锁

这有一个问题:检查flag的值和发送信号或等待条件之间会有竞争条件。假设一个线程检查flag发现没有被设置,此时linux调度器将这个线程暂停,恢复执行主线程,恰好主线程在执行set_thread_flag函数。它设置flag的值,并发出信号。因为没有线程在等待这个条件,这个信号就丢失了。然后,linux调度器接着执行前一个线程,它等待在条件变量上,可能会一直阻塞下去。

为了解决这个问题,我们需要使用互斥锁将flag和条件变量锁在一起。每个条件变量必须和互斥锁结合使用,从而避免竞争条件。使用这种模式,线程的主函数包括这三步:

  • 获取互斥锁,然后读取flag的值
  • 如果flag被设置了,释放互斥锁,然后继续执行
  • 如果没有被设置,以原子方式释放锁并等待条件变量

关键就是第三步,GNU/Linux允许你以原子方式释放锁和等待条件变量,防止另一个线程介入。

条件变量的使用

一个条件变量用一个pthread_cond_t的实例表示,记住每个条件变量需要与一个互斥锁同时使用,下面是操作条件变量的函数:

  • int pthread_cond_init初始化一个条件变量,第一个参数是指向pthread_cond_t的指针,第二个参数是指向条件变量属性对象的指针,在GNU/Linux下会被忽略,因此该值通常为NULL。互斥锁必须单独初始化。
  • pthread_cond_signal释放条件满足的信号,阻塞在该条件变量上的一个线程会被唤醒,如果没有线程阻塞,该信号将被忽略。参数是指向pthread_cond_t的指针。类似的pthread_cond_broadcast会唤醒阻塞在该条件变量上的所有线程
  • pthread_cond_wait阻塞调用这个函数的线程,直到条件满足释放信号,第一个参数是pthread_cond_t对象的指针,第二个参数是pthread_mutex_t锁对象。pthread_cond_wait被调用的时候,当前的线程必须获得了锁,该函数会以原子方式释放锁并且阻塞在条件变量上。当条件变量释放出信号,被阻塞的线程会被唤醒,该函数会自动获取锁

与信号量比较

与信号量一样,线程会睡眠等待条件满足。当一个线程A等待一个条件变量,它会被阻塞,直到另一个线程B修改同一个条件变量。

与信号量不同,条件变量没有计数器或内存,线程A必须在线程B发出信号之前等待条件。如果线程B先释放信号,该信号会丢失,此时线程A等待条件变量将会被阻塞,直到另一个线程再次发出信号。 即使没有线程在等待,信号量也会记住唤醒的次数,而条件变量则会直接丢弃,除非此时有其他线程被它阻塞。另一方面,信号量的每个post操作只会唤醒一个线程,而使用pthread_cond_broadcast,条件变量可以同时唤醒任意多个线程。

对条件修改的流程

可能改变条件变量所保护的条件的任何操作,都应该按照下面的流程执行。(在我们的例子中,条件是flag的状态,因此每当flag被改变时都应该按照这些步骤进行。)

  1. 获取互斥锁
  2. 修改条件(在我们的例子中,就是设置flag的值)
  3. 对条件变量发送信号或广播消息
  4. 释放互斥锁

示例

下面的代码再次展示了前面的例子,现在使用条件变量保护flag。 注意,检查thread_flag之前先获取了锁,这个锁pthread_cond_wait阻塞前自动释放,唤醒后又自动获取

#include <stdio.h>
#include <pthread.h>

int thread_flag;
pthread_cond_t thread_flag_cv;
pthread_mutex_t thread_flag_mutex;

void initialize_flag ()
{
    /* Initialize the mutex and condition variable. */
    pthread_mutex_init (&thread_flag_mutex, NULL);
    pthread_cond_init (&thread_flag_cv, NULL);
    /* Initialize the flag value. */
    thread_flag = 0;
}

/* Sets the value of the thread flag to FLAG_VALUE. */
void set_thread_flag (int flag_value)
{
    /* Lock the mutex before accessing the flag value. */
    pthread_mutex_lock (&thread_flag_mutex);
    /* Set the flag value, and then signal in case thread_function is
    blocked, waiting for the flag to become set. However,
    thread_function can’t actually check the flag until the mutex is
    unlocked. */
    thread_flag = flag_value;
    pthread_cond_signal (&thread_flag_cv);
    /* Unlock the mutex. */
    pthread_mutex_unlock (&thread_flag_mutex);
}

void do_work(int flag_value)
{
    printf("%u got %d\n", pthread_self(), flag_value);
}

/* Calls do_work repeatedly while the thread flag is set; blocks if
the flag is clear. */
void* thread_function (void* thread_arg)
{
    int flag;
    /* Loop infinitely. */
    while (1) {
        /* Lock the mutex before accessing the flag value. */
        pthread_mutex_lock (&thread_flag_mutex);
        while (!thread_flag)
            /* The flag is clear. Wait for a signal on the condition
            variable, indicating that the flag value has changed. When the
            signal arrives and this thread unblocks, loop and check the
            flag again. */
            pthread_cond_wait (&thread_flag_cv, &thread_flag_mutex);
        /* When we’ve gotten here, we know the flag must be set. Unlock
        the mutex. */
        flag = thread_flag;
        thread_flag = 0;
        pthread_mutex_unlock (&thread_flag_mutex);
        /* Do some work. */
        do_work (flag);
    }
    return NULL;
}

int main()
{
    int flag;
    pthread_t thread_id1;
    pthread_create (&thread_id1, NULL, &thread_function, NULL);
    pthread_t thread_id2;
    pthread_create (&thread_id2, NULL, &thread_function, NULL);
    initialize_flag();
    while(scanf("%d", &flag) && flag!=EOF) {
        set_thread_flag(flag);
    };
    return 0;
}
使用条件变量保护的条件可以是任意复杂,甚至可以没有条件,仅仅作为一个线程唤醒另一个线程的机制。


qq email facebook github
© 2024 - Xurui Yan. All rights reserved
Built using pelican