C++并发编程 7:原子操作
原子操作提供了在多线程环境中无需显式同步就能安全访问共享数据的机制。原子操作是不可分割的操作,要么完全执行,要么完全不执行,不会被其他线程中断。
原子性保证了操作的完整性:
- 不可中断性:操作在执行过程中不会被其他线程中断
- 可见性:一个线程对原子变量的修改对其他线程立即可见
- 有序性:通过内存序约束确保操作的执行顺序
std::atomic 基础
C++11引入了std::atomic模板类,为基本数据类型提供原子操作支持。
#include <atomic>
#include <thread>
#include <iostream>
std::atomic<int> counter{0};
void increment() {
for (int i = 0; i < 1000; ++i) {
counter.fetch_add(1); // 原子递增
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter: " << counter.load() << std::endl; // 输出: Counter: 2000
return 0;
}
基本原子操作
std::atomic<int> atomic_var{10};
// 基本操作
int old_val = atomic_var.load(); // 原子读取
atomic_var.store(20); // 原子写入
int prev = atomic_var.exchange(30); // 原子交换,返回旧值
// 算术操作(仅限整数类型)
int old1 = atomic_var.fetch_add(5); // 原子加法,返回旧值
int old2 = atomic_var.fetch_sub(3); // 原子减法,返回旧值
atomic_var += 2; // 等价于 fetch_add(2)
atomic_var++; // 原子递增
// 位运算操作
int old3 = atomic_var.fetch_and(0xFF); // 原子按位与
int old4 = atomic_var.fetch_or(0x0F); // 原子按位或
int old5 = atomic_var.fetch_xor(0xF0); // 原子按位异或
Compare-And-Swap (CAS) 操作
CAS是最重要的原子操作之一,它比较内存位置的值与期望值,如果相等则更新为新值。
#include <atomic>
#include <iostream>
std::atomic<int> value{100};
void cas_example() {
int expected = 100;
int new_value = 200;
// 强比较:严格比较,不进行类型转换
bool success = value.compare_exchange_strong(expected, new_value);
if (success) {
std::cout << "CAS成功,值已更新为: " << value.load() << std::endl;
} else {
std::cout << "CAS失败,当前值: " << value.load()
<< ",期望值: " << expected << std::endl;
}
}
void weak_cas_example() {
int expected = 200;
int new_value = 300;
// 弱比较:可能在某些架构上虚假失败,通常用于循环中
while (!value.compare_exchange_weak(expected, new_value)) {
// 如果失败,expected会被更新为当前值
expected = 200; // 重置期望值
}
std::cout << "弱CAS成功,最终值: " << value.load() << std::endl;
}
无锁数据结构示例:无锁栈
#include <atomic>
#include <memory>
template<typename T>
class lock_free_stack {
private:
struct node {
T data;
std::shared_ptr<node> next;
node(T const& data_) : data(data_) {}
};
std::atomic<std::shared_ptr<node>> head;
public:
void push(T const& data) {
std::shared_ptr<node> const new_node = std::make_shared<node>(data);
new_node->next = head.load();
// 使用CAS确保原子性更新
while (!head.compare_exchange_weak(new_node->next, new_node));
}
std::shared_ptr<T> pop() {
std::shared_ptr<node> old_head = head.load();
while (old_head &&
!head.compare_exchange_weak(old_head, old_head->next));
return old_head ? std::make_shared<T>(old_head->data) : nullptr;
}
};
内存序(Memory Ordering)
内存序定义了原子操作的同步和排序约束,是理解原子操作行为的关键。
内存序类型
enum class memory_order {
relaxed, // 松散序:只保证操作的原子性,不提供同步或排序约束
consume, // 消费序:已被弃用,通常被实现为acquire
acquire, // 获得序:读操作,防止后续读写操作重排到此操作之前
release, // 释放序:写操作,防止之前读写操作重排到此操作之后
acq_rel, // 获得-释放序:读-修改-写操作,同时具有acquire和release语义
seq_cst // 顺序一致性:最强的排序约束,默认内存序
};
内存序详解
1. memory_order_relaxed(松散序)
#include <atomic>
#include <thread>
#include <iostream>
std::atomic<int> x{0};
std::atomic<int> y{0};
void thread1() {
x.store(1, std::memory_order_relaxed);
y.store(1, std::memory_order_relaxed);
}
void thread2() {
int y_val = y.load(std::memory_order_relaxed);
int x_val = x.load(std::memory_order_relaxed);
// 可能出现 y_val == 1 && x_val == 0 的情况
// 因为relaxed不提供排序保证
std::cout << "x: " << x_val << ", y: " << y_val << std::endl;
}
2. memory_order_acquire/release(获得-释放语义)
#include <atomic>
#include <thread>
std::atomic<bool> ready{false};
int data = 0;
void producer() {
data = 42; // 1. 写入数据
ready.store(true, std::memory_order_release); // 2. 发布数据可用信号
// release保证:步骤1不会重排到步骤2之后
}
void consumer() {
while (!ready.load(std::memory_order_acquire)); // 3. 等待数据可用
// acquire保证:步骤4不会重排到步骤3之前
std::cout << "Data: " << data << std::endl; // 4. 读取数据
}
3. memory_order_seq_cst(顺序一致性)
std::atomic<int> x{0};
std::atomic<int> y{0};
std::atomic<int> z{0};
// 所有操作都使用seq_cst(默认)
void thread1() {
x.store(1);
}
void thread2() {
y.store(1);
}
void thread3() {
while (!x.load());
if (y.load() == 0) z++;
}
void thread4() {
while (!y.load());
if (x.load() == 0) z++;
}
// 在seq_cst下,z的最终值只能是0、1或2,不可能是其他值
同步原语与原子操作
std::atomic_flag - 最简单的原子类型
#include <atomic>
#include <thread>
#include <iostream>
class spinlock {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() {
while (flag.test_and_set(std::memory_order_acquire));
}
void unlock() {
flag.clear(std::memory_order_release);
}
};
spinlock spin;
int shared_data = 0;
void critical_section() {
for (int i = 0; i < 1000; ++i) {
spin.lock();
++shared_data;
spin.unlock();
}
}
原子操作实现的条件变量
#include <atomic>
#include <thread>
#include <chrono>
class atomic_condition {
std::atomic<bool> condition{false};
public:
void wait() {
while (!condition.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
void notify() {
condition.store(true, std::memory_order_release);
}
void reset() {
condition.store(false, std::memory_order_relaxed);
}
};
实际应用场景
1. 高性能计数器
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
class HighPerformanceCounter {
std::atomic<uint64_t> count{0};
public:
void increment() {
count.fetch_add(1, std::memory_order_relaxed);
}
uint64_t get() const {
return count.load(std::memory_order_relaxed);
}
// 批量递增,减少原子操作次数
void add(uint64_t n) {
count.fetch_add(n, std::memory_order_relaxed);
}
};
// 使用示例
HighPerformanceCounter counter;
void worker() {
for (int i = 0; i < 1000000; ++i) {
counter.increment();
}
}
2. 无锁队列(Single Producer, Single Consumer)
#include <atomic>
#include <array>
template<typename T, size_t Size>
class SPSCQueue {
private:
std::array<T, Size> buffer;
std::atomic<size_t> head{0};
std::atomic<size_t> tail{0};
public:
bool push(const T& item) {
size_t current_tail = tail.load(std::memory_order_relaxed);
size_t next_tail = (current_tail + 1) % Size;
if (next_tail == head.load(std::memory_order_acquire)) {
return false; // 队列满
}
buffer[current_tail] = item;
tail.store(next_tail, std::memory_order_release);
return true;
}
bool pop(T& item) {
size_t current_head = head.load(std::memory_order_relaxed);
if (current_head == tail.load(std::memory_order_acquire)) {
return false; // 队列空
}
item = buffer[current_head];
head.store((current_head + 1) % Size, std::memory_order_release);
return true;
}
size_t size() const {
size_t current_tail = tail.load(std::memory_order_acquire);
size_t current_head = head.load(std::memory_order_acquire);
return (current_tail >= current_head) ?
(current_tail - current_head) :
(Size - current_head + current_tail);
}
};
3. 原子操作实现的读写锁
#include <atomic>
#include <thread>
class AtomicRWLock {
std::atomic<int32_t> state{0};
static constexpr int32_t WRITE_LOCK = -1;
static constexpr int32_t UNLOCKED = 0;
public:
void read_lock() {
int32_t expected = state.load(std::memory_order_relaxed);
do {
while (expected == WRITE_LOCK) {
std::this_thread::yield();
expected = state.load(std::memory_order_relaxed);
}
} while (!state.compare_exchange_weak(
expected, expected + 1,
std::memory_order_acquire,
std::memory_order_relaxed));
}
void read_unlock() {
state.fetch_sub(1, std::memory_order_release);
}
void write_lock() {
int32_t expected = UNLOCKED;
while (!state.compare_exchange_weak(
expected, WRITE_LOCK,
std::memory_order_acquire,
std::memory_order_relaxed)) {
expected = UNLOCKED;
std::this_thread::yield();
}
}
void write_unlock() {
state.store(UNLOCKED, std::memory_order_release);
}
};
4. 线程安全的单例模式
#include <atomic>
#include <memory>
template<typename T>
class AtomicSingleton {
private:
static std::atomic<T*> instance;
static std::atomic<bool> initialized;
public:
static T* get_instance() {
T* ptr = instance.load(std::memory_order_acquire);
if (ptr == nullptr) {
// 双重检查锁定
static std::atomic_flag creation_flag = ATOMIC_FLAG_INIT;
while (creation_flag.test_and_set(std::memory_order_acquire));
ptr = instance.load(std::memory_order_relaxed);
if (ptr == nullptr) {
ptr = new T();
instance.store(ptr, std::memory_order_release);
}
creation_flag.clear(std::memory_order_release);
}
return ptr;
}
};
template<typename T>
std::atomic<T*> AtomicSingleton<T>::instance{nullptr};
template<typename T>
std::atomic<bool> AtomicSingleton<T>::initialized{false};
5. 原子操作实现的生产者-消费者模型
#include <atomic>
#include <thread>
#include <vector>
template<typename T>
class AtomicBuffer {
private:
std::vector<T> buffer;
std::atomic<size_t> read_pos{0};
std::atomic<size_t> write_pos{0};
std::atomic<size_t> available{0};
size_t capacity;
public:
AtomicBuffer(size_t size) : buffer(size), capacity(size) {}
bool produce(const T& item) {
if (available.load(std::memory_order_acquire) >= capacity) {
return false; // 缓冲区满
}
size_t pos = write_pos.fetch_add(1, std::memory_order_acq_rel) % capacity;
buffer[pos] = item;
available.fetch_add(1, std::memory_order_release);
return true;
}
bool consume(T& item) {
if (available.load(std::memory_order_acquire) == 0) {
return false; // 缓冲区空
}
size_t pos = read_pos.fetch_add(1, std::memory_order_acq_rel) % capacity;
item = buffer[pos];
available.fetch_sub(1, std::memory_order_release);
return true;
}
size_t size() const {
return available.load(std::memory_order_acquire);
}
};