chapter6 真实 code
6.1 扩展的 同步
(1) Barrier (栅栏)
(2) 读-写 锁
思想
(1) 支持 <=1 个 write 线程 + >=1 个 read 线程 并发
&& 纯 >=1 个 read 线程 并行
(2) read 与 write 不能同时, 必须 串行
// ====== 1 rwlock.h
#ifndef RW_LOCK_H
#define RW_LOCK_H
#include <pthread.h>
typedef struct RwlockTag
{
pthread_mutex_t mutex;
pthread_cond_t readCv;
pthread_cond_t writeCv;
int valid;
int rActiveThreadNum;
int wActiveThreadNum;
int rWaitThreadNum;
int wWaitThreadNum;
} RwLock;
#define RWLOCK_VALID 0xfacade
#define RWL_INITIALIZER \
{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, \
PTHREAD_COND_INITIALIZER, RWLOCK_VALID, 0, 0, 0, 0}
int rwl_init(RwLock *rwlock);
int rwl_destroy(RwLock *rwlock);
int rwl_readlock(RwLock *rwlock);
int rwl_readtrylock(RwLock *rwlock);
int rwl_readunlock(RwLock *rwlock);
int rwl_writelock(RwLock *rwlock);
int rwl_writetrylock(RwLock *rwlock);
int rwl_writeunlock(RwLock *rwlock);
#endif
// ====== 2. rwlock.c
#include "rwlock.h"
// 思路: 当前线程 readlock 时,
// 若有 激活的 write 线程, 则
// 1] rWaitThreadNum++
// 2] 当前 read 线程 wait on readCv
// 3] 被唤醒后 rWaitThreadNum--
// else
// 4] rActiveThreadNum++ => 可同时有 多个 active read 线程
int rwl_readlock(RwLock *rwl)
{
int status;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock(&rwl->mutex); // ===
/*
if (status != 0)
return status;
*/
if (rwl->wActiveThreadNum)
{
// (1)
rwl->rWaitThreadNum++;
// pthread_cleanup_push(rwl_readcleanup, (void *)rwl);
while (rwl->wActiveThreadNum)
{
// (2)
status = pthread_cond_wait(&rwl->readCv, &rwl->mutex);
/*
if (status != 0)
break;
*/
}
// pthread_cleanup_pop(0);
//(3)
rwl->rWaitThreadNum--;
}
if (status == 0)
rwl->rActiveThreadNum++; // (4)
pthread_mutex_unlock(&rwl->mutex); // ===
}
// 思路: 当前线程 writelock 时,
// 若有 激活的 激活的 write 或 read 线程, 则
// 1] wWaitThreadNum++
// 2] 当前 write 线程 wait on writeCv
// 3] 被唤醒后 wWaitThreadNum--
// else
// 4] wActiveThreadNum = 1 => 多线程中 只能有1个 active write 线程
int rwl_writelock(RwLock *rwl)
{
int status;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock(&rwl->mutex); // ===
/*
if (status != 0)
return status;
*/
if (rwl->wActiveThreadNum || rwl->rActiveThreadNum > 0)
{
// (1)
rwl->wWaitThreadNum++;
// pthread_cleanup_push(rwl_writecleanup, (void *)rwl);
while (rwl->wActiveThreadNum || rwl->rActiveThreadNum > 0)
{
// (2)
status = pthread_cond_wait(&rwl->writeCv, &rwl->mutex);
/*
if (status != 0)
break;
*/
}
// pthread_cleanup_pop(0);
// (3)
rwl->wWaitThreadNum--; // === ++ () -> while 后 -- => wWaitThreadNum 没啥用
}
if (status == 0)
rwl->wActiveThreadNum = 1; // (4) => Note: 多线程中 只能有1个 write 线程
pthread_mutex_unlock(&rwl->mutex); // ===
return status;
}
int rwl_readunlock(RwLock *rwl)
{
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
pthread_mutex_lock(&rwl->mutex); // ===
// (1) rActiveThreadNum--
rwl->rActiveThreadNum--;
// (2) 需要时 (rActiveThreadNum == 0 && wWaitThreadNum > 0) signal writeCv
if (rwl->rActiveThreadNum == 0 && rwl->wWaitThreadNum > 0)
pthread_cond_signal(&rwl->writeCv);
pthread_mutex_unlock(&rwl->mutex); // ===
return 0;
}
int rwl_writeunlock(RwLock *rwl)
{
int status;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
status = pthread_mutex_lock(&rwl->mutex); // ===
/*
if (status != 0)
return status;
*/
// (1)
rwl->wActiveThreadNum = 0;
// (2) 需要时 rWaitThreadNum > 0, 要么 wWaitThreadNum)
// 要么 broadcast(唤醒多个) readCv, 要么 signal(唤醒1个) writeCv
if (rwl->rWaitThreadNum > 0)
{
status = pthread_cond_broadcast(&rwl->readCv);
/*
if (status != 0) {
pthread_mutex_unlock(&rwl->mutex);
return status;
}
*/
}
else if (rwl->wWaitThreadNum > 0)
{
status = pthread_cond_signal(&rwl->writeCv);
/*
if (status != 0) {
pthread_mutex_unlock(&rwl->mutex);
return status;
}
*/
}
status = pthread_mutex_unlock(&rwl->mutex); // ===
return status;
}
int rwl_readtrylock(RwLock *rwl)
{
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
pthread_mutex_lock(&rwl->mutex);
if (rwl->wActiveThreadNum)
status = EBUSY;
else
rwl->rActiveThreadNum++;
pthread_mutex_unlock(&rwl->mutex);
return 0;
}
int rwl_writetrylock(RwLock *rwl)
{
int status, status2;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
pthread_mutex_lock(&rwl->mutex); // ===
if (rwl->wActiveThreadNum || rwl->rActiveThreadNum > 0)
status = EBUSY;
else
rwl->wActiveThreadNum = 1;
pthread_mutex_unlock(&rwl->mutex); // ===
return 0;
}
int rwl_init(RwLock *rwl)
{
rwl->rActiveThreadNum = rwl->wActiveThreadNum = 0;
rwl->rWaitThreadNum = rwl->wWaitThreadNum = 0;
pthread_mutex_init(&rwl->mutex, NULL);
pthread_cond_init(&rwl->readCv, NULL);
pthread_cond_init(&rwl->writeCv, NULL);
rwl->valid = RWLOCK_VALID;
return 0;
}
int rwl_destroy(RwLock *rwl)
{
int status, status1, status2;
if (rwl->valid != RWLOCK_VALID)
return EINVAL;
pthread_mutex_lock(&rwl->mutex);
if (rwl->rActiveThreadNum > 0 || rwl->wActiveThreadNum)
{
pthread_mutex_unlock(&rwl->mutex);
return EBUSY;
}
if (rwl->rWaitThreadNum > 0 || rwl->wWaitThreadNum > 0)
{
pthread_mutex_unlock(&rwl->mutex);
return EBUSY;
}
rwl->valid = 0;
pthread_mutex_unlock(&rwl->mutex);
pthread_mutex_destroy(&rwl->mutex);
pthread_cond_destroy(&rwl->readCv);
pthread_cond_destroy(&rwl->writeCv);
return 0;
}
/*
static void rwl_readcleanup(void *arg)
{
RwLock *rwl = (RwLock *)arg;
rwl->rWaitThreadNum--;
pthread_mutex_unlock(&rwl->mutex);
}
static void rwl_writecleanup(void *arg)
{
RwLock *rwl = (RwLock *)arg;
rwl->wWaitThreadNum--;
pthread_mutex_unlock(&rwl->mutex);
}
*/
// ====== 3 rwlock_main.c
#include "rwlock.h"
#include "errors.h"
#define THREADS 5
#define DATASIZE 15 // 数据 量/份数
#define ITERATIONS 10000
// (1) ThreadBlock
typedef struct ThreadTag
{
pthread_t threadId; // 每个 线程块 带1个 线程标识符
int threadIndex;
int updateNum;
int readNum;
int interval; // 决定 线程在 循环中 何时 write, 何时 read
} ThreadBlock;
// (2) Data
typedef struct DataTag
{
RwLock rwLock; // 每个 data 带 1个 rwLock
int value;
int updateNum;
} Data;
// (3) n1 个 线程(块)
ThreadBlock threadBlocks[THREADS];
// (4) n2 个 data
Data data[DATASIZE];
// (5) 每个线程 循环处理 1~n2 data 若干次, 每个 data 是 read 还是 write 由 interval 与 迭代序号决定, 多数 write, 少数 read
void *thread_routine(void *arg)
{
ThreadBlock *self = (ThreadBlock*)arg;
int repeats = 0;
int iter;
int element = 0;
int status;
for (iter = 0; iter < ITERATIONS; iter++)
{
if (iter % self->interval == 0) // 多数循环 write
{
rwl_writelock(&data[element].rwLock); // ===
data[element].value = self->threadIndex;
data[element].updateNum++;
self->updateNum++;
rwl_writeunlock(&data[element].rwLock); // ===
}
else // 少数循环 read
{
rwl_readlock(&data[element].rwLock); // ===
self->readNum++;
// data unchanged
if (data[element].value == self->threadIndex)
repeats++;
rwl_readunlock(&data[element].rwLock); // ===
}
element++;
if (element >= DATASIZE)
element = 0;
}
if (repeats > 0)
printf("ThreadBlock %d found unchanged elements %d times\n",
self->threadIndex, repeats);
return NULL;
}
int main()
{
int count;
int i;
int status;
unsigned int seed = 1;
int thread_updates = 0;
int data_updates = 0;
// (1) data init
for (i = 0; i < DATASIZE; i++)
{
data[i].data = 0;
data[i].updateNum = 0;
rwl_init(&data[i].rwLock);
}
// (2) threadBlock init & thread create
for (count = 0; count < THREADS; count++)
{
threadBlocks[count].threadIndex = count;
threadBlocks[count].updateNum = 0;
threadBlocks[count].readNum = 0;
threadBlocks[count].interval = rand_r(&seed) % 7;
pthread_create(&threadBlocks[count].threadId,
NULL, thread_routine, (void *)&threadBlocks[count]);
}
// (3) wait threads
for (count = 0; count < THREADS; count++)
{
pthread_join(threadBlocks[count].threadId, NULL);
thread_updates += threadBlocks[count].updateNum;
printf("%02d: interval %d, updated %d, readNum %d\n",
count, threadBlocks[count].interval,
threadBlocks[count].updateNum, threadBlocks[count].readNum);
}
for (i = 0; i < DATASIZE; i++)
{
data_updates += data[i].updateNum;
printf("data %02d: value %d, %d updateNum\n",
i, data[i].data,
data[i].updateNum);
rwl_destroy(&data[i].rwLock);
}
printf("%d thread updateNum, %d data updateNum\n", thread_updates, data_updates);
return 0;
}
6.2 工作队列 管理器
wrok threads 接受来自 公共 queue 的 work requests, 并(可能) 并行处理 它们
将 work 提供给 一组线程 时,
work queue manager 可视为 work crew manager
可能视为
在 后台 work 的 queue
因为 caller 几乎完全看不到 work crew 的存在
create work queue 时, 可指定 最大 并行数
work queue 可创建的 最大 "engine" threads 数
|
|
处理 requests
work queue manager 据 work 数量, 可 start/stop 线程
thread do nothing 时, 等待 短时间 -> 终止
多段 ?
维持 不做事线程 的 成本
创建 新线程 的 成本
// ====== 1. workq.h
#ifndef WORKQ_H
#define WORKQ_H
#include <pthread.h>
// WorkQue 内部 已排到 item
typedef struct WorkQueElemTag
{
struct WorkQueElemTag *next;
void *request;
} WorkQueElem;
// WorkQue 外部表示
/*
workerThreadNum
++
after a worker thread created in workqAdd()
--
1] worker thread 中 find workQue empty(first == NULL) && entered workqDestroy()
2] worker thread wait timedout && entered workqDestroy()
*/
typedef struct WorkQueTag
{
pthread_mutex_t mutex;
pthread_cond_t cv;
WorkQueElem *first, *last; // 1) work/request queue
int valid; // 2) wq status
int quit;
int workerThreadNum; // 3) thread revalent: active thread num
int parallelism; // max value
void (*pEngineRoutine)(void *); // all thread has the same threadRoutine
pthread_attr_t attr;
} WorkQue;
#define WORKQ_VALID 0xdec2018
int workqInit(WorkQue *wq, int threadNum, void (*pEngineRoutine)(void *arg));
int workqDestroy(WorkQue *wq);
int workqAdd(WorkQue *wq, void *request);
#endif
// ====== 2. workq.c
#include <time.h>
#include "workq.h"
#include <stdio.h>
#include <stdlib.h>
#include <errno.h> // ETIMEDOUT
int workqInit(WorkQue *wq, int threadNum, void (*pEngineRoutine)(void *arg))
{
pthread_attr_init(&wq->attr);
// Note: all worker thread detached
pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED);
pthread_mutex_init(&wq->mutex, NULL);
pthread_cond_init(&wq->cv, NULL);
wq->quit = 0;
wq->first = wq->last = NULL;
wq->parallelism = threadNum;
wq->workerThreadNum = 0;
//wq->idle = 0;
wq->pEngineRoutine = pEngineRoutine;
wq->valid = WORKQ_VALID;
return 0;
}
int workqDestroy(WorkQue *wq)
{
if (wq->valid != WORKQ_VALID)
return 1;
pthread_mutex_lock(&wq->mutex);
wq->valid = 0;
// workerThreadNum > 0 会 一直 wait on cv, until workerThreadNum <= 0
if (wq->workerThreadNum > 0)
{
wq->quit = 1;
while (wq->workerThreadNum > 0)
pthread_cond_wait(&wq->cv, &wq->mutex);
}
pthread_mutex_unlock(&wq->mutex);
pthread_mutex_destroy(&wq->mutex);
pthread_cond_destroy(&wq->cv);
pthread_attr_destroy(&wq->attr);
return 0; //(status ? status : (status1 ? status1 : status2));
}
// Note: 不作 interface
static void *workqServer(void *arg)
{
struct timespec timeout;
WorkQue *wq = (WorkQue *)arg;
WorkQueElem *wqElem;
int status, timedout;
printf("=== A worker is starting\n");
pthread_mutex_lock(&wq->mutex); // === lock
while (1)
{
timedout = 0;
printf("Worker waiting for work\n");
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += 2;
// (1) wq empty && not quit -> cv.wait with timedout, until not empty
// Note: woke up by this func but other worker thread
while (wq->first == NULL && !wq->quit)
{
status = pthread_cond_timedwait(&wq->cv, &wq->mutex, &timeout); // === block with unlock, woke up with lock
if (status == ETIMEDOUT)
{
printf("Worker wait timed out\n");
timedout = 1;
break;
}
}
printf("Work queue: 0x%p, quit: %d\n", wq->first, wq->quit);
// (2) extract wq's head work
wqElem = wq->first;
if (wqElem != NULL)
{
// (3) wq's que head/tail ptr move
wq->first = wqElem->next;
if (wq->last == wqElem)
wq->last = NULL;
pthread_mutex_unlock(&wq->mutex); // ===
printf("Worker calling pEngineRoutine\n");
// (4) Note: call back user provided worker routine to process this work/request
wq->pEngineRoutine(wqElem->request);
// (5) free previous head work
free(wqElem);
pthread_mutex_lock(&wq->mutex); // ===
}
// (6) if wq empty(first== NULL) && not quit -> workerThreadNum--
// if(workerThreadNum ==0) -> broadcast all worker threads waiting on cv
// Note workerThreadNum 会 < 0: 因为 被唤醒的 worker thread 若 发现 wq empty && quit == 1, 仍会 --
if (wq->first == NULL && wq->quit)
{
printf("Worker shutting down\n");
wq->workerThreadNum--;
if (wq->workerThreadNum == 0)
pthread_cond_broadcast(&wq->cv);
pthread_mutex_unlock(&wq->mutex);
// (7) thread routine exit
return NULL;
}
if (wq->first == NULL && timedout)
{
printf("pEngineRoutine terminating due to timeout.\n");
wq->workerThreadNum--;
break;
}
}
pthread_mutex_unlock(&wq->mutex);
printf("worker exiting\n");
return NULL;
}
int workqAdd(WorkQue *wq, void *request)
{
WorkQueElem *wqElem;
pthread_t id;
int status;
if (wq->valid != WORKQ_VALID)
return 1;
// (1) malloc WorkQueElem & fill it with request
wqElem = (WorkQueElem *)malloc(sizeof(WorkQueElem));
wqElem->request = request;
wqElem->next = NULL;
pthread_mutex_lock(&wq->mutex); // ===
// (2) insert WorkQueElem to wq's tail
if (wq->first == NULL)
wq->first = wqElem;
else
wq->last->next = wqElem;
wq->last = wqElem;
// (3) 若 active thread num < 并发数, 才 create 新 worker/server/engine thread -> workerThreadNum++ idle++
if(wq->workerThreadNum < wq->parallelism)
{
printf("Creating new worker\n");
pthread_create(&id, &wq->attr, workqServer, (void*)wq);
wq->workerThreadNum++;
}
pthread_mutex_unlock(&wq->mutex); // ===
return 0;
}
//====== 3. workq_main.c
#include "workq.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
// ====== client func
#define ReqestNum 4
// (1) request struct
typedef struct RequestDataTag
{
int value;
int power;
} Request;
// (2) workThread use info
typedef struct WorkThreadUseInfoTag
{
struct WorkThreadUseInfoTag *next; // next workThreadBlock in workThreadBlockList
pthread_t threadId;
int callNum;
} WorkThreadUseInfo;
// (3) TSD: listHead / key / mutex
WorkThreadUseInfo *enginListHead = NULL;
pthread_key_t engineKey;
pthread_mutex_t engineeListMutex = PTHREAD_MUTEX_INITIALIZER;
WorkQue workq;
void engineeThreadDtor(void *pTSD)
{
WorkThreadUseInfo *pEngine = (WorkThreadUseInfo*)pTSD;
pthread_mutex_lock(&engineeListMutex); // ===
// head insert to enginList
pEngine->next = enginListHead;
enginListHead = pEngine;
pthread_mutex_unlock(&engineeListMutex); // ===
printf("=== thread %d is terminating\n", (int)pEngine->threadId);
}
void engineRoutineDo(Request *request)
{
int result = 1;
printf("WorkThreadUseInfo: computing %d^%d\n", request->value, request->power);
for (int count = 0; count < request->power; count++)
result *= request->value;
printf("result = %d\n", result);
}
// Note: user provided worker routine
void engineRoutine(void *arg) // backgroundWorkRoutine
{
WorkThreadUseInfo *pEngine;
Request *request = (Request*)arg;
// (1)
pEngine = pthread_getspecific(engineKey);
if (pEngine == NULL)
{
// (2)
pEngine = (WorkThreadUseInfo*)malloc(sizeof(WorkThreadUseInfo));
// (3) Note: associate key with TSD's pTSD
// <=> set engineeThreadDtor()'s arg
pthread_setspecific(engineKey, (void*)pEngine);
// (4)
pEngine->threadId = pthread_self();
pEngine->callNum = 1;
}
else
pEngine->callNum++;
engineRoutineDo(request);
free(arg);
}
// workqAdd n 个 request
void *requestRoutine(void *arg)
{
Request *reqest;
int count;
unsigned int seed = (unsigned int) time(NULL);
for (count = 0; count < ReqestNum; count++)
{
// (1) allocate request
reqest = (Request*)malloc(sizeof(Request));
reqest->value = rand_r(&seed) % 5;
reqest->power = rand_r(&seed) % 3;
printf("====== Request: %d^%d\n", reqest->value, reqest->power);
// (2) workqAdd() the request
workqAdd(&workq, (void*)reqest);
sleep(rand_r(&seed) % 5);
}
return NULL;
}
// ======
int main()
{
pthread_t threadId;
WorkThreadUseInfo *pEngine;
int count = 0, callNum = 0;
int status;
// (1) TSD key create
pthread_key_create(&engineKey, engineeThreadDtor);
// (2) workqInit: 最多 4 个 worker thread
workqInit(&workq, 3, engineRoutine);
// (3) create
pthread_create(&threadId, NULL, requestRoutine, NULL);
pthread_join(threadId, NULL);
// (4)
workqDestroy(&workq);
// (5) report worker thread use info
pEngine = enginListHead;
while (pEngine != NULL)
{
count++;
callNum += pEngine->callNum;
printf("pEngineRoutine %d: %d callNum\n", count, pEngine->callNum);
pEngine = pEngine->next;
}
printf("%d pEngineRoutine threads processed %d callNum\n", count, callNum);
return 0;
}
// print
====== Request: 2^0
Creating new worker
=== A worker is starting
Worker waiting for work
Work queue: 0x0x7f57ec000f90, quit: 0
Worker calling pEngineRoutine
WorkThreadUseInfo: computing 2^0
result = 1
Worker waiting for work
====== Request: 4^0
Creating new worker
Worker wait timed out
Work queue: 0x0x7f57ec0010f0, quit: 0
Worker calling pEngineRoutine
WorkThreadUseInfo: computing 4^0
result = 1
pEngineRoutine terminating due to timeout.
worker exiting
=== thread -241699072 is terminating
=== A worker is starting
Worker waiting for work
Worker wait timed out
Work queue: 0x(nil), quit: 0
pEngineRoutine terminating due to timeout.
worker exiting
====== Request: 4^2
Creating new worker
=== A worker is starting
Worker waiting for work
Work queue: 0x0x7f57ec0010f0, quit: 0
Worker calling pEngineRoutine
WorkThreadUseInfo: computing 4^2
result = 16
Worker waiting for work
====== Request: 0^1
Creating new worker
=== A worker is starting
Worker waiting for work
Work queue: 0x0x7f57ec000f90, quit: 0
Worker calling pEngineRoutine
WorkThreadUseInfo: computing 0^1
result = 0
Worker waiting for work
Worker wait timed out
Work queue: 0x(nil), quit: 0
pEngineRoutine terminating due to timeout.
worker exiting
=== thread -250091776 is terminating
Worker wait timed out
Work queue: 0x(nil), quit: 1
Worker shutting down
pEngineRoutine 1: 1 callNum
pEngineRoutine 2: 2 callNum
2 pEngineRoutine threads processed 3 callNum
##6.3 如何对待 现有库 ?
(1) 修改 为 线程安全
(2) 与 遗留库 共存
#chapter7 POSIX 适应 线程
7.6 信号
1 信号 行为
2 信号 掩码
3 pthread_kill
4 sigwait 和 sigwaitinfo
5 SIGEV_THREAD
6 信号量: 用 信号捕获函数 进行 同步
chapter8 避免 debug 的 提示
8.1 避免 不正确的 code
1 避免依赖于 `线程惯性 (inertia)`
2 线程 race 风险很大, 不要下赌注
3 协作 以 避免 死锁
4 当心 优先级 反转
5 谓词间 不要 共享 cv
6 共享栈 和 相关内存 被损
8.2 避免 性能问题
1 小心 `并发 顺序化`
2 使用 合适数量 的 mutexes
(1) 太多 mutexes 没帮助
3 不要 与 缓存块 争
网友评论