1.3.1.1.1. 并发理论基础

Cpu、内存、I/O 有一个核心矛盾一直存在,就是这三者的速度差异.

特意提到缓存导致的可见性问题,线程切换带来的原子性问题,编译优化带来的有序性问题,

1.3.1.1.1.1. 源头之一:缓存导致的可见性问题

image-20221123100821845

1.3.1.1.1.2. 源头之二:线程切换带来的原子性问题

image-20221123100846052

指令 1:首先,需要把变量 count 从内存加载到 CPU 的寄存器;

指令 2:之后,在寄存器中执行 +1 操作;

指令 3:最后,将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。

image-20221123105717202

1.3.1.1.1.3. 源头之三:编译优化带来的有序性问题

1.3.1.1.2. 可见性和有序性解决方法:

这些方法包括 volatile、synchronized 和 final 三个关键字,以及六项 Happens-Before 规则

1.3.1.1.2.1. Happens-Before 规则:前面一个操作的结果对后续操作是可见的

1.3.1.1.2.2. 1. 程序的顺序性规则

这条规则是指在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作

1.3.1.1.2.3. 2. volatile 变量规则

这条规则是指对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。

1.3.1.1.2.4. 3、传递性

这条规则是指如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。

1.3.1.1.2.5. 4、管程中锁的规则

这条规则是指对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。

管程是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。

1.3.1.1.2.6. 5. 线程 start() 规则

这条是关于线程启动的。它是指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。

1.3.1.1.2.7. 6. 线程 join() 规则

它是指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作

1.3.1.1.3. 原子性解决方法:不能用可变对象做锁

“同一时刻只有一个线程执行”这个条件非常重要,我们称之为互斥。如果我们能够保证对共享变量的修改是互斥的,那么,无论是单核 CPU 还是多核 CPU,就都能保证原子性了。

image-20221123111119505

synchronized : 当修饰静态方法的时候,锁定的是当前类的 Class 对象.当修饰非静态方法的时候,锁定的是当前实例对象 this。

锁能覆盖所有受保护资源,。如果资源之间有关联关系,就要选择一个粒度更大的锁,这个锁应该能够覆盖所有相关的资源


class Account {
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    synchronized(Account.class) {
      if (this.balance > amt) {
        this.balance -= amt;
        target.balance += amt;
      }
    }
  } 
}

1.3.1.1.3.1. 死锁

四个条件都发生时才会出现死锁:

1、互斥,共享资源 X 和 Y 只能被一个线程占用;

2、占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;

3、不可抢占,其他线程不能强行抢占线程 T1 占有的资源;

4、循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。

也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。

  • 对于“占用且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。

class Allocator {
  private List<Object> als =
    new ArrayList<>();
  // 一次性申请所有资源
  synchronized boolean apply(
    Object from, Object to){
    if(als.contains(from) ||
         als.contains(to)){
      return false;  
    } else {
      als.add(from);
      als.add(to);  
    }
    return true;
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
  }
}

class Account {
  // actr应该为单例
  private Allocator actr;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 一次性申请转出账户和转入账户,直到成功
    while(!actr.apply(this, target))
      ;
    try{
      // 锁定转出账户
      synchronized(this){              
        // 锁定转入账户
        synchronized(target){           
          if (this.balance > amt){
            this.balance -= amt;
            target.balance += amt;
          }
        }
      }
    } finally {
      actr.free(this, target)
    }
  } 
}
  • 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。

java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的

  • 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。

class Account {
  private int id;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    Account left = this        ①
    Account right = target;    ②
    if (this.id > target.id) { ③
      left = target;           ④
      right = this;            ⑤
    }                          ⑥
    // 锁定序号小的账户
    synchronized(left){
      // 锁定序号大的账户
      synchronized(right){ 
        if (this.balance > amt){
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

转账

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class MyLock {

    // 测试转账的main方法
    public static void main(String[] args) throws InterruptedException {

        Account src = new Account(10000);
        Account target = new Account(10000);
        CountDownLatch countDownLatch = new CountDownLatch(9999);
        for (int i = 0; i < 9999; i++) {
            new Thread(() -> {
                src.transactionToTarget(1, target);
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("src=" + src.getBanalce());
        System.out.println("target=" + target.getBanalce());
    }

    static class Account { //账户类
        public Account(Integer banalce) {
            this.banalce = banalce;
        }

        private Integer banalce;

        public void transactionToTarget(Integer money, Account target) {//转账方法
            Allocator.getInstance().apply(this, target);
            this.banalce -= money;
            target.setBanalce(target.getBanalce() + money);
            Allocator.getInstance().release(this, target);
        }

        public Integer getBanalce() {
            return banalce;
        }

        public void setBanalce(Integer banalce) {
            this.banalce = banalce;
        }
    }

    static class Allocator { //单例锁类
        private Allocator() {
        }

        private List<Account> locks = new ArrayList<>();

        public synchronized void apply(Account src, Account tag) {
            while (locks.contains(src) || locks.contains(tag)) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                }
            }
            locks.add(src);
            locks.add(tag);
        }

        public synchronized void release(Account src, Account tag) {
            locks.remove(src);
            locks.remove(tag);
            this.notifyAll();
        }

        public static Allocator getInstance() {
            return AllocatorSingle.install;
        }

        static class AllocatorSingle {
            public static Allocator install = new Allocator();
        }
    }
}

1.3.1.1.3.2. 活锁

有时线程虽然没有发生阻塞,但仍然会存在执行不下去的情况,这就是所谓的“活锁”

解决“活锁”的方案很简单,谦让时,尝试等待一个随机的时间就可以了。例如上面的那个例子,路人甲走左手边发现前面有人,并不是立刻换到右手边,而是等待一个随机的时间后,再换到右手边;同样,路人乙也不是立刻切换路线,也是等待一个随机的时间再切换。由于路人甲和路人乙等待的时间是随机的,所以同时相撞后再次相撞的概率就很低了。“等待一个随机时间”的方案虽然很简单,却非常有效,Raft 这样知名的分布式一致性算法中也用到了它。

1.3.1.1.3.3. 饥饿

所谓“饥饿”指的是线程因无法访问所需资源而无法执行下去的情况。“不患寡,而患不均”,如果线程优先级“不均”,在 CPU 繁忙的情况下,优先级低的线程得到执行的机会很小,就可能发生线程“饥饿”;持有锁的线程,如果执行的时间过长,也可能导致“饥饿”问题

解决“饥饿”问题的方案很简单,有三种方案:一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。

1.3.1.1.4. 线程

Java 语言中线程共有六种状态,分别是:

NEW(初始化状态)

RUNNABLE(可运行 / 运行状态)BLOCKED(阻塞状态)WAITING(无时限等待)TIMED_WAITING(有时限等待)

TERMINATED(终止状态)

image-20221123155038577

最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]

1.3.1.1.4.1. Semaphore(信号量)

Semaphore 可以允许多个线程访问一个临界区。

init():设置计数器的初始值。

down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。

up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。


static int count;
//初始化信号量
static final Semaphore s 
    = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}


class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      //将当前线程插入等待队列
      //阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      //移除等待队列中的某个线程T
      //唤醒线程T
    }
  }
}



class ObjPool<T, R> {
  final List<T> pool;
  // 用信号量实现限流器
  final Semaphore sem;
  // 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}
// 创建对象池
ObjPool<Long, String> pool = 
  new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

1.3.1.1.4.2. ReadWriteLock

允许多个线程同时读共享变量;

只允许一个线程写共享变量;

如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

1.3.1.1.4.3. StampedLock

写锁、悲观读锁和乐观读.StampedLock 不支持重入

使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()。这个规则一定要记清楚。

其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下。


final StampedLock sl = 
  new StampedLock();

// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
  //省略业务相关代码
} finally {
  sl.unlockRead(stamp);
}

// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
  //省略业务相关代码
} finally {
  sl.unlockWrite(stamp);
}




final StampedLock sl = 
  new StampedLock();

// 乐观读
long stamp = 
  sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验stamp
if (!sl.validate(stamp)){
  // 升级为悲观读锁
  stamp = sl.readLock();
  try {
    // 读入方法局部变量
    .....
  } finally {
    //释放悲观读锁
    sl.unlockRead(stamp);
  }
}
//使用方法局部变量执行业务操作
......

results matching ""

    No results matching ""