Skip to content

Instantly share code, notes, and snippets.

@tqinli
Last active February 24, 2023 22:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tqinli/db1b892a97cfa0bc41fb8b0b0b156b7e to your computer and use it in GitHub Desktop.
Save tqinli/db1b892a97cfa0bc41fb8b0b0b156b7e to your computer and use it in GitHub Desktop.
A repro for pthread_cond_wait/signal's issue of losing signal
// To compile
// gcc -g -o pthread_cond_repro ./pthread_cond_repro.c -lpthread
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <assert.h>
// !!!The following code mimics a .net core critical section implementation!!!
// See also: https://github.com/dotnet/coreclr/blob/release/3.0/src/pal/src/sync/cs.cpp
// Helper to mimic https://docs.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-interlockedcompareexchange
// NOTE InterlockedCompareExchange as Exchange before Comperand
typedef int32_t LONG;
LONG InterlockedCompareExchange(
LONG volatile *Destination,
LONG Exchange,
LONG Comperand)
{
LONG result =
__sync_val_compare_and_swap(
Destination, /* The pointer to a variable whose value is to be compared with. */
Comperand, /* The value to be compared */
Exchange /* The value to be stored */);
return result;
}
LONG InterlockedIncrement(LONG volatile *Destination)
{
LONG result = __sync_fetch_and_add(Destination, 1);
return result;
}
#define TRUE 1
#define FALSE 0
#define LOCK_BIT 0x1L
#define LOCK_AWAKENED_WAITER 0x2L
#define LOCK_WAITER_INC 0x4L
typedef struct CriticalSection
{
// lock_data:
// 1) last bit is lock bit (0x1 or LOCK_BIT),
// set by a succes EnterCS, and unset by LeaveCS.
// 2) 2nd bit is awaken waiter bit (0x2 or LOCK_AWAKENED_WAITER)
// set by LeaveCS when it detects there are waiter, and sent
// the signal to wake up one waiter. unset by the waiter
// actually woke by signaler.
// 3) waiter count is maintained by the rest, e.g.
// lock_data >> 2 == waiter count.
volatile LONG lock_data;
pthread_mutex_t mutex;
pthread_cond_t condition;
int iPredicate;
} CriticalSection;
typedef enum WaiterReturnState {
ReturnWaiterAwakened,
WaiterDidntWait
} WaiterReturnState;
void DoActualWait(CriticalSection *cs) {
int iRet;
iRet = pthread_mutex_lock(&cs->mutex);
assert(iRet == 0);
while (0 == cs->iPredicate) {
iRet = pthread_cond_wait(&cs->condition, &cs->mutex);
assert(iRet == 0);
}
cs->iPredicate = 0;
iRet = pthread_mutex_unlock(&cs->mutex);
assert(iRet == 0);
}
WaiterReturnState WaitOnCS(CriticalSection *cs, LONG lWaitInc) {
LONG lVal, lNewVal, lOldVal;
LONG localOrder;
// spin until waiter count is added to g_cslock_data
do {
lVal = cs->lock_data;
// since we have to load lock_data anyways, we can do another check
// if not locked, let's bail out indicating wait didn't happen
if (0 == (lVal & LOCK_BIT)) return WaiterDidntWait;
// we use +, instead of |, since this is incrementing waiter count
lNewVal = lVal + lWaitInc;
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal);
} while (lOldVal != lVal);
DoActualWait(cs);
return ReturnWaiterAwakened;
}
void WakeupOneWaiter(CriticalSection *cs) {
int iRet;
iRet = pthread_mutex_lock(&cs->mutex);
assert(iRet == 0);
cs->iPredicate = 1;
pthread_cond_signal(&cs->condition);
assert(iRet == 0);
pthread_mutex_unlock(&cs->mutex);
assert(iRet == 0);
}
void InitCS(CriticalSection *cs) {
cs->lock_data = 0;
cs->iPredicate = 0;
int iRet;
iRet = pthread_mutex_init(&cs->mutex, NULL);
assert(iRet == 0);
iRet = pthread_cond_init(&cs->condition, NULL);
assert(iRet == 0);
}
void EnterCS(CriticalSection *cs) {
LONG lVal, lNewVal, lOldVal;
LONG lBitsToChange = LOCK_BIT; // 0x1
LONG lWaitInc = LOCK_WAITER_INC; // 0x4
WaiterReturnState rs;
LONG localOrder;
while (TRUE)
{
lVal = cs->lock_data;
while (0 == (lVal & LOCK_BIT)) {
// CS is not locked: try to lock it
lNewVal = lVal ^ lBitsToChange;
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal);
if (lOldVal == lVal) goto CS_entered;
// Some thread raced with us, update value for next loop
lVal = lOldVal;
}
rs = WaitOnCS(cs, lWaitInc);
if (rs == ReturnWaiterAwakened) {
// Set the lock bit, and at the same time reset the awakened waiter bit
lBitsToChange = LOCK_BIT | LOCK_AWAKENED_WAITER; // 0x3
// For thread returned from waiting, 0x2 is decremented by
// the pthread_cond_signal thread, so this thread only need to do 0x2
lWaitInc = LOCK_AWAKENED_WAITER; // 0x2
}
}
CS_entered:
return;
}
void LeaveCS(CriticalSection *cs) {
LONG lVal, lNewVal, lOldVal;
lVal = cs->lock_data;
LONG localOrder;
while (TRUE) {
if (lVal == LOCK_BIT || lVal & LOCK_AWAKENED_WAITER) {
// if waiter count == 0, or a waiter has already been awakened
// simply reset the lock bit and return
lNewVal = lVal & ~LOCK_BIT;
// try unlock
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal);
if (lOldVal == lVal) goto CS_left;
}
else {
// decrement waiter count, set awakened waiter bit, unset lock bit
lNewVal = lVal - LOCK_WAITER_INC + LOCK_AWAKENED_WAITER - LOCK_BIT;
// try unlock
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal);
if (lOldVal == lVal) {
WakeupOneWaiter(cs);
goto CS_left;
}
}
lVal = lOldVal;
}
CS_left:
return;
}
// !!!The following code try to repro the pthread_cond_wait/signal bug!!!
int total_threads = 4;
int g_counter = 0;
LONG loop_round = 0;
LONG threads_finished = 0;
CriticalSection g_cs;
int __thread ThreadId;
void *LoopCriticalSectionThread(void *val)
{
ThreadId = (int)(long)val;
printf("LoopCriticalSectionThread - %d started\n", ThreadId);
LONG private_round = 0;
while (TRUE)
{
// Enter CS
EnterCS(&g_cs);
for (int i = 0; i < 1000; ++i) g_counter += ThreadId;
LeaveCS(&g_cs);
// Mark this thread finished
InterlockedIncrement(&threads_finished);
// Before start the next iteration, wait until all other
// threads to finish this round
while (private_round == loop_round)
{
sleep(0);
}
assert(private_round + 1 == loop_round);
++private_round;
}
}
void *RefereeThread(void *val)
{
printf("RefereeThread - %s started\n", (char *)val);
while (TRUE)
{
if (total_threads == InterlockedCompareExchange(&threads_finished, 0, total_threads))
{
sleep(0);
// if all threads finished, reset the counter
// and increment the loop_round, to kick of another round
InterlockedIncrement(&loop_round);
}
sleep(0);
}
}
void MonitorCounter()
{
while (TRUE)
{
sleep(2);
printf("Monitor - g_counter %d, loop_round %d, threads_finished %d\n", g_counter, loop_round, threads_finished);
}
}
int main(int argc, char **argv)
{
if (argc >= 2)
total_threads = atoi(argv[1]);
else
total_threads = 1.5 * sysconf(_SC_NPROCESSORS_ONLN);
printf("Total Threads Count; %d\n", total_threads);
// Initialize CS and try once
InitCS(&g_cs);
EnterCS(&g_cs);
LeaveCS(&g_cs);
int tid = 0;
int iRet;
pthread_t threads[total_threads + 1];
// Create threads
iRet = pthread_create(&threads[tid++], NULL, RefereeThread, NULL);
for (int i = 0; i < total_threads; ++i)
{
iRet = pthread_create(&threads[tid++], NULL, LoopCriticalSectionThread, (void *)(long)(i + 1));
assert(iRet == 0);
}
// Monitor the counter
MonitorCounter();
}
@AnnoyingUserName
Copy link

It can be reproduced on my machine, but is it really a bug of pthread? Consider the following scenario:
For simplicity, we assume that there are only two threads. Thread A arrived L92, and switched to thread B, which blocked at L158. Then thread B got the cs->lock_data and went to L172(because cs->lock_data has been added 4 just a moment ago by thread A). After that thread B continued to run for a while, and it called pthread_cond_signal before thread A has called pthread_cond_wait. Finally, thread A would always wait a signal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment