在分布式系统中经常会使用到共享内存,然后多个进程并行读写同一块共享内存,这样就会造成并发冲突的问题, 一般的常规做法是加锁,但是锁对性能的影响非常大,所以就搞出来了一个无锁队列。
无锁队列的关键原理是CPU提供了一个指令CAS,这条指令是一个原子指令,执行过程中不允许被打断。
它接受三个参数:需要修改值的地址、旧值、新值;CAS指令执行时先读取该地址的当前值,将当前值于旧值进行比较, 如果相等则说明此值没有被修改过将当前值更新为新值,操作成功;
如果不相等则说明此值被其他进程修改过,直接返回失败,此时需要程序重新读取最新的值,再次调用CAS指令进行更新,重复此步骤直到成功为止, 或者设置一个重试次数,到了重试次数之后返回失败。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>
typedef unsigned long long uint64_t;
typedef unsigned int uint32_t;
#ifdef __GNUC__
/* 这里使用的是GCC编译器对CAS指令封装之后的一个内建函数 */
#define atomic_cas(p, old, new) __sync_bool_compare_and_swap (p, old, new)
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#else
#error "atomic operations are supported only by GCC"
#endif
#define SHM_KEY 123456
struct attr_node
{
unsigned int id;
unsigned int value;
};
void *shm_create(uint64_t shm_key, int ele_size, int ele_count)
{
uint32_t size = ele_count*ele_count;
/* 创建共享内存 */
int shmid = shmget(shm_key, size, IPC_CREAT);
if (shmid < 0) {
perror("shmget error :");
return NULL;
}
/* 映射地址 */
void *addr = shmat(shmid, NULL, 0);
if(addr == (void *)-1) {
perror("shmat error :");
/* 删除shmid */
if (shmctl(shmid, IPC_RMID, NULL) == -1) {
perror("shmctl IPC_RMID error :");
}
return NULL;
}
return addr;
}
int shm_write(void *shm_addr, int id, uint32_t value, uint32_t old_value)
{
struct attr_node *node = (struct attr_node *)shm_addr + id;
int success = atomic_cas(&(node->value), old_value, value);
if (success) {
return 0;
} else {
return -1;
}
}
int shm_read(void *shm_addr, void *buf, int id)
{
void *start_addr = (struct attr_node *)shm_addr + id;
memcpy(buf, start_addr, sizeof(struct attr_node));
return 0;
}
int main(int argc, char *argv[])
{
if (argc < 2){
printf("usage: ./write id\n");
return -1;
}
void *shm_addr = shm_create(SHM_KEY, sizeof(struct attr_node), 1);
if (shm_addr == NULL) {
printf("sq_create error\n");
return -1;
}
struct attr_node *node = NULL;
node = malloc(sizeof(struct attr_node));
if (node == NULL){
printf("malloc error\n");
return -1;
}
int id = 0;
if (strcmp(argv[1], "clear") == 0) {
id = atoi(argv[2]);
shm_read(shm_addr, node, id);
shm_write(shm_addr, id, 0, node->value);
shm_read(shm_addr, node, id);
printf("id = %d\nnode->value = %u\n", id, node->value);
return 0;
} else {
id = atoi(argv[1]);
}
uint32_t count = 0;
uint32_t i = 0;
for (i = 0; i < 100000; i++)
{
/* 如果更新失败,重新读取最新的值再次重试,也可以限制一个重试次数 */
for (;;) {
shm_read(shm_addr, node, id);
uint32_t old_value = node->value;
++node->value;
int ret = shm_write(shm_addr, id, node->value, old_value);
if (ret == 0)
break;
count++;
}
}
printf("conflict count = %d\n", count);
shm_read(shm_addr, node, id);
printf("node->value = %u\n", node->value);
return 0;
}
原作者代码上面代码中调用函数shm_creat时第三个参数传的是10,其实是没有必要的,因为我们接下来可以使用脚本演示对序号0处的内存进行同时读写。
#! /bin/bash
./write 0 &
./write 0 &
./write 0 &
./write 0 &
程序执行的结果,每次的输出都不一样,但是最终值都是400000:
[root@centos-7 workspace]# ./write.sh
conflict count = 11992
node->value = 120946
[root@centos-7 workspace]# conflict count = 22405
node->value = 249251
conflict count = 18057
node->value = 318179
conflict count = 58404
node->value = 400000
reference: