首页 » 开发 » Linux线程

Linux线程基础

使用Linux线程库需要包含头文件:

#include <pthread.h>

编译时需要添加-lpthread选项:

$ gcc thread.c -lpthread

Linux线程API

创建线程:

int pthread_create( pthread_t* thread_id, 
                    const pthread_attr_t* attr,
                    void* (*func)(void*),
                    void* arg);

退出线程:

int pthread_exit(void* ret);

合并线程。通常是主线程等待工作线程退出时调用pthread_join(),若工作线程尚未退出,则主线程将阻塞在pthread_join():

int pthread_join(pthread_t thread_id, void** ret);

一段简单的线程使用示例:

#include <stdio.h>
#include <pthread.h>

void* thread1(void* arg)
{
    printf("start thread1\n");
    return ((void*)1);
}

void* thread2(void* arg)
{
    printf("start thread2\n");
    pthread_exit((void*)2);
}

int main()
{
    pthread_t tid1, tid2;
    void *tret;

    pthread_create(&tid1, NULL, thread1, NULL);
    pthread_create(&tid2, NULL, thread2, NULL);
    pthread_join(tid1, &tret);
    printf("join thread1, return: %d\n", (int)tret);
    pthread_join(tid2, &tret);
    printf("join thread2, return: %d\n", (int)tret);

    return 0;
}

获取线程ID。在主进程中可以通过pthread_create()获取新创建线程的ID,在线程内部,可以通过pthread_self()获取线程ID。在Linux上,pthread_create()通过clone()实现,因此创建线程等同于创建进程,但pthread_self()返回的线程ID,不是我们通过ps/top查看到的进程号,要获取外部所见的进程号,需要用syscall()调用:

pthread_t pthread_self(void);

int syscall(SYS_gettid);

测试:

#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <pthread.h>

void* thread(void* arg)
{
    printf("new thread, pthread_self() return: %u\n", (unsigned int)pthread_self());
    printf("new thread, getpid() return: %u\n", (unsigned int)getpid());
    printf("new thread, syscall() return: %u\n", (unsigned int)syscall(SYS_gettid));
    sleep(60);
    pthread_exit((void*)0);
}

int main()
{
    pthread_t tid;

    pthread_create(&tid, NULL, thread, NULL);
    printf("main get pthread id: %u\n", (unsigned int)tid);
    printf("main getpid: %u\n", (unsigned int)getpid());
    pthread_join(tid, NULL);

    return 0;
}

编译运行,并用ps查看:

$ gcc tid.c -lpthread
$ ./a.out &
[3] 3113
main get pthread id: 3086818192
main getpid: 3113
new thread, pthread_self() return: 3086818192
new thread, getpid() return: 3113
new thread, syscall() return: 3114
$ ps -eLf
UID        PID  PPID   LWP  C NLWP STIME TTY          TIME CMD
bailing   3113  2787  3113  0    2 20:32 pts/1    00:00:00 ./a.out
bailing   3113  2787  3114  0    2 20:32 pts/1    00:00:00 ./a.out

从输出可见,getpid()在主线程和新线程中都获得同样的值(3113)。在新线程中,syscall()获取的线程号(3114)可通过ps查看(在LWP即Light Weight Process轻量级进程一列)。相对而言,pthread_self()返回的线程ID似乎没那么重要(可以在日志输出区分不同线程等)。

线程同步

Linux线程同步的几种基本方式包括:互斥锁(mutex)、读写锁(read-write lock)、条件变量(condition vaiables)。

互斥锁

int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr);
int pthread_mutex_destory(pthread_mutex_t* mutex);

int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);

如果mutex已锁,pthread_mutex_lock()将阻塞调用线程,pthread_mutex_trylock()则返回EBUSY。若mutex未锁,则调用进程锁住mutex。

读写锁

int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr);
int pthread_rwlock-destory(pthread_rwlock_t* rwlock);

int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);

int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock);

条件变量。mutex的本质是锁,而条件变量的本质是等待。

int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t* attr);
int pthread_cond_destory(pthread_cond_t* cond);

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timewait(  pthread_cond_t* cond, 
                            pthread_mutex_t* mutex,
                            const struct timespec* timeout);

int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);

pthread_cond_wait()将原子性地执行2个操作:

  • 将mutex解锁。
  • 线程休眠。当有其他线程调用pthread_cond_signal()/pthread_cond_broadcast()时,从pthread_cond_wait()返回。

注意,在pthread_cond_wait()返回之时,线程又再次对mutex加锁。我们可以这样来理解pthread_cond_wait():

pthread_cond_wait(cond, mutex):
    unlock(mutex)   // unlock first

    // sleep until pthread_cond_signal() is called by other thread

    lock(mutex);    // lock again
    return ;

条件变量的典型应用场景是生产者-消费者问题:

生产者

pthread_mutex_lock(&mutex);
queue.push(item);       // set condition true
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

消费者。注意这里的while循环:如果队列为空,所有消费者线程都会陷入pthread_cond_wait(),进入等待;从等待返回时,还要再次检查队列是否为空,因为线程可能被错误地唤醒(spurious wakeups)。

pthread_mutex_lock(&mutex);
while(queue.empty())    // condition is false
    pthread_cond_wait(&cond, &mutex);
queue.pop();
pthread_mutex_unlock(&mutex);

记录锁(Record Locking)

记录锁常用于进程间同步。记录(Record)通常是对应用而言的,这里的记录指文件中的一个字节区间(byte range)。POSIX记录锁的接口是fcntl函数:

#include <fnctl.h>

int fcntl(int fd, int cmd, ... /* struct flock *arg */);

flock的定义如下:

struct flock {
    short   l_type;     /* F_RDLCK, F_WRLCK, F_UNLCK */
    short   l_whence;   /* SEEK_SET, SEEK_CUR, SEEK_END */
    off_t   l_start;
    off_t   l_len;      /* 字节数,0表示整个文件长 */
    pid_t   l_pid;      /* PID returned by F_GETLK */
};

cmd的取值可以是:

  • F_SETLK - 若l_type是F_RDLCK/F_WRLCK则加锁,若l_type是F_UNLCK则解锁,取决于flock参数的设置。若加锁失败,返回-1,errno被置为EACCES或EAGAIN。
  • F_SETLKW - 阻塞版的F_SETLK(W为wait之意)。
  • F_GETLK - 获取锁信息。

多线程程序设计的经验

写锁是互斥锁(exclusive locks),读锁是共享锁(shared locks)。根据锁的粒度(granularity),分为粗粒度的锁(coarse-grained locks)和细粒度的锁(fined-grained locks)。还有一种分类是建议锁(advisory locks)和强制锁(mandatory locks)。

陈硕曾提到过线程池的阻抗匹配原则。设系统的CPU个数为C,程序密集计算的比重为P,则线程池的大小可设置为:

ThreadNumber = C / P

若有1个线程可跑满1个CPU,则8核的机子启动8个线程即可跑满(P=100%,C=8);若1个线程跑满占50%的CPU,则8核的机子启动16个线程即可跑满(P=50%,C=8)。若P<20%这个公式不再成立,可将线程个数设置为一个固定值,如N * C

多线程下单例模式的使用。单例模式的大体框架:

static Instance* Instance::get() 
{
    if(!_inst) {
        _inst = new Instance();
    }
    return _inst;
}

在多线程环境下可能出现的问题是,创建对象(new Instance())这步并非一个原子操作,它可能的实现是:

_inst = (Instance*)malloc(sz);  // 分配内存,并赋值给_inst
_inst->Instance();           // 在_inst所指内存构建对象

也就是分为3个步骤:

  • 申请一块内存。
  • 将内存地址赋予_inst。
  • 在_inst所指的地址上构建Instance对象。

所谓的Double-Checked Locking也未必能解决这个问题:

static Instance* Instance::get() 
{
    if(!_inst) {
        scoped_lock lock(_guard);   // 锁在这里而不是外层,避免低效
        if(!_inst) {
            _inst = new Instance();
        }
    }
    return _inst;
}

即使两次检测也未必保证正确创建对象,因为申请内存并赋值给_inst后,另一个线程检查_inst != NULL,就返回使用,但此时对象尚未构建完成。更好的实现是:

static Instance* Instance::get() 
{
    if(!_inst) {
        scoped_lock lock(_guard);   // 锁在这里而不是外层,避免低效
        if(!_inst) {
            Instance* _tmp = new Instance();
            _inst = _tmp;           // 对象已构建完毕,在这里赋值
        }
    }
    return _inst;
}

分享

0