1.3.1.1.1. 并发理论基础
Cpu、内存、I/O 有一个核心矛盾一直存在,就是这三者的速度差异.
特意提到缓存导致的可见性问题,线程切换带来的原子性问题,编译优化带来的有序性问题,
1.3.1.1.1.1. 源头之一:缓存导致的可见性问题
1.3.1.1.1.2. 源头之二:线程切换带来的原子性问题
指令 1:首先,需要把变量 count 从内存加载到 CPU 的寄存器;
指令 2:之后,在寄存器中执行 +1 操作;
指令 3:最后,将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。
1.3.1.1.1.3. 源头之三:编译优化带来的有序性问题
1.3.1.1.2. 可见性和有序性解决方法:
这些方法包括 volatile、synchronized 和 final 三个关键字,以及六项 Happens-Before 规则
1.3.1.1.2.1. Happens-Before 规则:前面一个操作的结果对后续操作是可见的
1.3.1.1.2.2. 1. 程序的顺序性规则
这条规则是指在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作
1.3.1.1.2.3. 2. volatile 变量规则
这条规则是指对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
1.3.1.1.2.4. 3、传递性
这条规则是指如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。
1.3.1.1.2.5. 4、管程中锁的规则
这条规则是指对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。
管程是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。
1.3.1.1.2.6. 5. 线程 start() 规则
这条是关于线程启动的。它是指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
1.3.1.1.2.7. 6. 线程 join() 规则
它是指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作
1.3.1.1.3. 原子性解决方法:不能用可变对象做锁
“同一时刻只有一个线程执行”这个条件非常重要,我们称之为互斥。如果我们能够保证对共享变量的修改是互斥的,那么,无论是单核 CPU 还是多核 CPU,就都能保证原子性了。
synchronized : 当修饰静态方法的时候,锁定的是当前类的 Class 对象.当修饰非静态方法的时候,锁定的是当前实例对象 this。
锁能覆盖所有受保护资源,。如果资源之间有关联关系,就要选择一个粒度更大的锁,这个锁应该能够覆盖所有相关的资源
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
1.3.1.1.3.1. 死锁
四个条件都发生时才会出现死锁:
1、互斥,共享资源 X 和 Y 只能被一个线程占用;
2、占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
3、不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
4、循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。
- 对于“占用且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。
class Allocator {
private List<Object> als =
new ArrayList<>();
// 一次性申请所有资源
synchronized boolean apply(
Object from, Object to){
if(als.contains(from) ||
als.contains(to)){
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
}
}
class Account {
// actr应该为单例
private Allocator actr;
private int balance;
// 转账
void transfer(Account target, int amt){
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))
;
try{
// 锁定转出账户
synchronized(this){
// 锁定转入账户
synchronized(target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
actr.free(this, target)
}
}
}
- 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的
- 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。
class Account {
private int id;
private int balance;
// 转账
void transfer(Account target, int amt){
Account left = this ①
Account right = target; ②
if (this.id > target.id) { ③
left = target; ④
right = this; ⑤
} ⑥
// 锁定序号小的账户
synchronized(left){
// 锁定序号大的账户
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
转账
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class MyLock {
// 测试转账的main方法
public static void main(String[] args) throws InterruptedException {
Account src = new Account(10000);
Account target = new Account(10000);
CountDownLatch countDownLatch = new CountDownLatch(9999);
for (int i = 0; i < 9999; i++) {
new Thread(() -> {
src.transactionToTarget(1, target);
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("src=" + src.getBanalce());
System.out.println("target=" + target.getBanalce());
}
static class Account { //账户类
public Account(Integer banalce) {
this.banalce = banalce;
}
private Integer banalce;
public void transactionToTarget(Integer money, Account target) {//转账方法
Allocator.getInstance().apply(this, target);
this.banalce -= money;
target.setBanalce(target.getBanalce() + money);
Allocator.getInstance().release(this, target);
}
public Integer getBanalce() {
return banalce;
}
public void setBanalce(Integer banalce) {
this.banalce = banalce;
}
}
static class Allocator { //单例锁类
private Allocator() {
}
private List<Account> locks = new ArrayList<>();
public synchronized void apply(Account src, Account tag) {
while (locks.contains(src) || locks.contains(tag)) {
try {
this.wait();
} catch (InterruptedException e) {
}
}
locks.add(src);
locks.add(tag);
}
public synchronized void release(Account src, Account tag) {
locks.remove(src);
locks.remove(tag);
this.notifyAll();
}
public static Allocator getInstance() {
return AllocatorSingle.install;
}
static class AllocatorSingle {
public static Allocator install = new Allocator();
}
}
}
1.3.1.1.3.2. 活锁
有时线程虽然没有发生阻塞,但仍然会存在执行不下去的情况,这就是所谓的“活锁”
解决“活锁”的方案很简单,谦让时,尝试等待一个随机的时间就可以了。例如上面的那个例子,路人甲走左手边发现前面有人,并不是立刻换到右手边,而是等待一个随机的时间后,再换到右手边;同样,路人乙也不是立刻切换路线,也是等待一个随机的时间再切换。由于路人甲和路人乙等待的时间是随机的,所以同时相撞后再次相撞的概率就很低了。“等待一个随机时间”的方案虽然很简单,却非常有效,Raft 这样知名的分布式一致性算法中也用到了它。
1.3.1.1.3.3. 饥饿
所谓“饥饿”指的是线程因无法访问所需资源而无法执行下去的情况。“不患寡,而患不均”,如果线程优先级“不均”,在 CPU 繁忙的情况下,优先级低的线程得到执行的机会很小,就可能发生线程“饥饿”;持有锁的线程,如果执行的时间过长,也可能导致“饥饿”问题
解决“饥饿”问题的方案很简单,有三种方案:一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。
1.3.1.1.4. 线程
Java 语言中线程共有六种状态,分别是:
NEW(初始化状态)
RUNNABLE(可运行 / 运行状态)BLOCKED(阻塞状态)WAITING(无时限等待)TIMED_WAITING(有时限等待)
TERMINATED(终止状态)
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
1.3.1.1.4.1. Semaphore(信号量)
Semaphore 可以允许多个线程访问一个临界区。
init():设置计数器的初始值。
down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
static int count;
//初始化信号量
static final Semaphore s
= new Semaphore(1);
//用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
}
}
}
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
1.3.1.1.4.2. ReadWriteLock
允许多个线程同时读共享变量;
只允许一个线程写共享变量;
如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
1.3.1.1.4.3. StampedLock
写锁、悲观读锁和乐观读.StampedLock 不支持重入
使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()。这个规则一定要记清楚。
其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下。
final StampedLock sl =
new StampedLock();
// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
//省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
//省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
final StampedLock sl =
new StampedLock();
// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
.....
} finally {
//释放悲观读锁
sl.unlockRead(stamp);
}
}
//使用方法局部变量执行业务操作
......