本文共 5435 字,大约阅读时间需要 18 分钟。
osq是mcs算法在kernel中的一个实现,目前主要用在读写信号量和mutex自旋锁的等待队列中。osq的全称是optimistic_spin_queuestruct optimistic_spin_queue { /* * Stores an encoded value of the CPU # of the tail node in the queue. * If the queue is empty, then it's set to OSQ_UNLOCKED_VAL. */ atomic_t tail;};struct optimistic_spin_node { struct optimistic_spin_node *next, *prev; int locked; /* 1 if lock acquired */ int cpu; /* encoded CPU # + 1 value */};从optimistic_spin_node 来看这个肯定是每个cpu 都有一个optimistic_spin_node,并且是组成双链表的方式。osq的函数主要有3个,分别如下:static inline void osq_lock_init(struct optimistic_spin_queue *lock){ atomic_set(&lock->tail, OSQ_UNLOCKED_VAL);}extern bool osq_lock(struct optimistic_spin_queue *lock);extern void osq_unlock(struct optimistic_spin_queue *lock);static inline bool osq_is_locked(struct optimistic_spin_queue *lock){ return atomic_read(&lock->tail) != OSQ_UNLOCKED_VAL;}可以看出只要是初始化函数osq_lock_init,获得锁的函数osq_lock,释放锁的函数osq_unlock,以及判断是否加锁的函数osq_is_locked其中初始化函数仅仅设置lock->tail 为OSQ_UNLOCKED_VAL,也就是初始化为未加锁的状态.我们重点看看bool osq_lock(struct optimistic_spin_queue *lock){ #由于osq_node 是一个percpu变量,因此这里获得当前cpu的node struct optimistic_spin_node *node = this_cpu_ptr(&osq_node); struct optimistic_spin_node *prev, *next; #encode_cpu对当前cpu的num加1 int curr = encode_cpu(smp_processor_id()); int old; node->locked = 0; node->next = NULL; node->cpu = curr; /* * We need both ACQUIRE (pairs with corresponding RELEASE in * unlock() uncontended, or fastpath) and RELEASE (to publish * the node fields we just initialised) semantics when updating * the lock tail. */ #交换tail和curr的值,并返回原来tail的值 old = atomic_xchg(&lock->tail, curr); #如果old等于OSQ_UNLOCKED_VAL,说明没有人持有锁,直接返回true if (old == OSQ_UNLOCKED_VAL) return true; #走到这里说明已经有人在使用这个锁,这里得到使用这个锁的cpu prev = decode_cpu(old); node->prev = prev; /* * osq_lock() unqueue * * node->prev = prev osq_wait_next() * WMB MB * prev->next = node next->prev = prev // unqueue-C * * Here 'node->prev' and 'next->prev' are the same variable and we need * to ensure these stores happen in-order to avoid corrupting the list. */ smp_wmb(); #把当前节点插入到prev->next 中,表示这是下一个要获取锁的节点 WRITE_ONCE(prev->next, node); /* * Normally @prev is untouchable after the above store; because at that * moment unlock can proceed and wipe the node element from stack. * * However, since our nodes are static per-cpu storage, we're * guaranteed their existence -- this allows us to apply * cmpxchg in an attempt to undo our queueing. */ #等待锁被释放 while (!READ_ONCE(node->locked)) { /* * If we need to reschedule bail... so we can block. * Use vcpu_is_preempted() to avoid waiting for a preempted * lock holder: */ #如果有更高优先级抢占则退出while 循环 if (need_resched() || vcpu_is_preempted(node_cpu(node->prev))) goto unqueue; cpu_relax(); } return true;unqueue: /* * Step - A -- stabilize @prev * * Undo our @prev->next assignment; this will make @prev's * unlock()/unqueue() wait for a next pointer since @lock points to us * (or later). */ for (;;) { #这个if条件成立说明没有人来修改链表,说明上一次只是被调度出去了,但是没有人来修改链表,则退出 if (prev->next == node && cmpxchg(&prev->next, node, NULL) == node) break; /* * We can only fail the cmpxchg() racing against an unlock(), * in which case we should observe @node->locked becomming * true. */ #判断当前节点释放持有锁,如果持有锁则正确退出 if (smp_load_acquire(&node->locked)) return true; cpu_relax(); /* * Or we race against a concurrent unqueue()'s step-B, in which * case its step-C will write us a new @node->prev pointer. */ #说明当前节点没有持有锁,则往前遍历知道找到持有锁的node prev = READ_ONCE(node->prev); } /* * Step - B -- stabilize @next * * Similar to unlock(), wait for @node->next or move @lock from @node * back to @prev. */ #这里有个for循环等待锁被释放,类似于osq_unlock next = osq_wait_next(lock, node, prev); if (!next) return false; #从链表中删除当前节点 /* * Step - C -- unlink * * @prev is stable because its still waiting for a new @prev->next * pointer, @next is stable because our @node->next pointer is NULL and * it will wait in Step-A. */ WRITE_ONCE(next->prev, prev); WRITE_ONCE(prev->next, next); return false;}osq_wait_next的分析如下:static inline struct optimistic_spin_node *osq_wait_next(struct optimistic_spin_queue *lock, struct optimistic_spin_node *node, struct optimistic_spin_node *prev){ struct optimistic_spin_node *next = NULL; int curr = encode_cpu(smp_processor_id()); int old; /* * If there is a prev node in queue, then the 'old' value will be * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if * we're currently last in queue, then the queue will then become empty. */ old = prev ? prev->cpu : OSQ_UNLOCKED_VAL; #死循环知道锁为释放 for (;;) { #如果我们是最后一个node则退出表示释放锁 if (atomic_read(&lock->tail) == curr && atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) { /* * We were the last queued, we moved @lock back. @prev * will now observe @lock and will complete its * unlock()/unqueue(). */ break; } /* * We must xchg() the @node->next value, because if we were to * leave it in, a concurrent unlock()/unqueue() from * @node->next might complete Step-A and think its @prev is * still valid. * * If the concurrent unlock()/unqueue() wins the race, we'll * wait for either @lock to point to us, through its Step-B, or * wait for a new @node->next from its Step-C. */ #如果node->next不为null,且node->next的值页不为null,则退出,表示释放锁 if (node->next) { next = xchg(&node->next, NULL); if (next) break; } cpu_relax(); } return next;}
转载地址:http://dnnmi.baihongyu.com/