GPT摘要 
这篇文章是Java多线程编程的学习大纲,涵盖了多线程编程的核心概念和高级特性。主要内容包括线程基础知识(创建、状态、中断、守护线程等)、线程同步机制(synchronized关键字、Monitor对象、锁优化、wait/notify机制)、并发问题(死锁、活锁、饥饿)、可重入锁(ReentrantLock及其特性)、Java内存模型(可见性、有序性、volatile关键字)、原子操作(CAS、AtomicInteger等原子类)、线程池(创建、任务提交、拒绝策略、各种预定义线程池)、并发工具类(AQS、读写锁、StampedLock、Semaphore、CountDownLatch、CyclicBarrier)等。文本还提到了一些同步模式设计模式(保护性暂停、生产者/消费者模式、顺序控制模式)以及不可变设计和享元模式在并发编程中的应用。整体来看,这是一份较为全面的Java并发编程知识体系,涵盖了从基础概念到高级特性的多个层面,适合系统学习Java多线程编程。
 
效率提升:
Java 线程 线程创建 
重写thread的run方法 
创建runnable 抽象出来任务 
FutureTask 带返回值  线程间通信 
 
核心Thread是创建一个线程,其中run方法或者Runnable只是代表具体的任务。如果main线程中调用Runnable.run该任务就是main执行的
extends Thread 并重写run方法,
1 2 3 4 5 6 7 8 9 Thread  t  =  new  Thread () {public  void  run ()  {
Runnable :当成参数传给thread,run方法默认会检查Runnable如果有就执行Runnable.run
1 2 3 4 5 6 7 8 9 10 11 12 Thread  thread  =  new  Thread (new  Runnable () {public  void  run ()  {Thread  thread  =  new  Thread (() -> {
FutureTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  class  ThreadDemo  {public  static  void  main (String[] args)  {Callable  call  =  new  MyCallable ();new  FutureTask <>(call);Thread  t  =  new  Thread (task);try  {String  s  =  task.get(); catch  (Exception e) {public  class  MyCallable  implements  Callable <String> {@Override public  String call ()  throws  Exception {return  Thread.currentThread().getName() + "->"  + "Hello World" ;
tasklist    jps   taskkill
top -H -p pid           jstack  查看进程中线程信息
每一个线程都有一个独立的栈,栈内每个函数都会有栈帧。main线程中的三个函数:
上下文切换 切换当前执行的线程
CPU时间片 
垃圾回收 
更高优先权 
线程主动调用sleep、yield、wait、 join、 park、 synchronized、 lock 
 
如何保存上下文信息?
线程内程序计数器 记录运行到哪里 
栈帧记录变量信息 
 
API 
方法 
说明 
 
 
public void start () 
启动一个新线程,Java虚拟机调用此线程的 run 方法 
 
public void run () 
线程启动后调用该方法 
 
public void setName(String name) 
给当前线程取名字 
 
public void getName() 
获取当前线程的名字 线程存在默认名称:子线程是 Thread-索引,主线程是 main 
 
public static Thread currentThread() 
获取当前线程对象,代码在哪个线程中执行 
 
public static void sleep (long time) 
让当前线程休眠多少毫秒再继续执行 Thread.sleep(0)  : 让操作系统立刻重新进行一次 CPU 竞争 
 
public static native void yield () 
提示线程调度器让出当前线程对 CPU 的使用 
 
public final int getPriority() 
返回此线程的优先级 
 
public final void setPriority (int priority) 
更改此线程的优先级,常用 1 5 10 
 
public void interrupt() 
中断这个线程,异常处理机制 
 
public static boolean interrupted() 
判断当前线程是否被打断,清除打断标记 
 
public boolean isInterrupted() 
判断当前线程是否被打断,不清除打断标记 
 
public final void join () 
等待这个线程结束 
 
public final void join(long millis) 
等待这个线程死亡 millis 毫秒,0 意味着永远等待 
 
public final native boolean isAlive() 
线程是否存活(还没有运行完毕) 
 
public final void setDaemon(boolean on) 
将此线程标记为守护线程或用户线程 
 
state 
操作系统层面 :新建 就绪 运行 阻塞(io) 终止java层面 :其中Runnable包含 就绪 运行 阻塞(io) 
程状态 
导致状态发生条件 
 
 
NEW(新建) 
线程刚被创建,但是并未启动,还没调用 start 方法,只有线程对象,没有线程特征 
 
Runnable(可运行) 
线程可以在 Java 虚拟机中运行的状态,可能正在运行自己代码,也可能没有,这取决于操作系统处理器,调用了 t.start() 方法:就绪(经典叫法) 
 
Blocked(阻塞) 
当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入 Blocked 状态;当该线程持有锁时,该线程将变成 Runnable 状态 synchronized      EntryList  
 
Waiting(无限等待) 
一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入 Waiting 状态. wait(WaitSet) join(原理wait) park 
 
Timed Waiting (限期等待) 
有几个方法有超时参数,调用将进入 Timed Waiting 状态,这一状态将一直保持到超时期满或者接收到唤醒通知。常用方法有 Thread.sleep(time) 、wait(time)   join(time) parkUntil(time) 
 
Teminated(结束) 
run 方法正常退出而死亡,或者因为没有捕获的异常终止了 run 方法而死亡 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public  enum  State  {
interrupt API 
public void interrupt():打断这个线程,异常处理机制public static boolean interrupted():判断当前线程是否被打断,打断返回 true,清除打断标记 public boolean isInterrupted():判断当前线程是否被打断,不清除打断标记 
线程处于阻塞 状态:如果线程当前处于阻塞状态,如调用了 Thread.sleep()、Object.wait()、join BlockingQueue.take() 等阻塞方法,调用 interrupt() 方法会中断线程的阻塞状态,抛出 InterruptedException 异常,打断标记。因为线程都不在运行,所以需要抛异常来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 public  static  void  main (String[] args)  throws  InterruptedException {Thread  t1  =  new  Thread (()->{try  {1000 );catch  (InterruptedException e) {"t1" );500 );" 打断状态: {}"  + t1.isInterrupted());
线程处于非阻塞 状态:如果线程当前处于非阻塞状态,调用 interrupt() 方法会将线程的中断状态设置为 true,但不会中断线程的执行。可以通过检查中断状态来决定是否终止线程的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 2000 );class  MyThread  extends  Thread  {@Override public  void  run ()  {while  (!Thread.currentThread().isInterrupted()) {"Thread is running." );"Thread interrupted. Exiting..." );
 
两阶段终止 功能:记录系统的利用率,但需要能停止下来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class  TPTInterrupt  {private  Thread thread;public  void  start () {new  Thread (() -> {while (true ) {Thread  current  =  Thread.currentThread();if (current.isInterrupted()) {"料理后事" );break ;try  {1000 );"将结果保存" );catch  (InterruptedException e) {"监控线程" );public  void  stop ()  {
打断park park阻塞线程类似于一直sleep,但被打断不会清空标记。需要标记为假时才生效
1 2 3 4 5 6 7 8 9 10 11 public  static  void  main (String[] args)  throws  Exception {Thread  t1  =  new  Thread (() -> {"park..." );"unpark..." );"打断状态:"  + Thread.currentThread().isInterrupted());"t1" );2000 );
join 
public final void join()等待该线程执行完成,原理上可以使用信号量PVjoin(long millis) 最大等待时间 
守护线程 当其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
t1.setDaemon(true);
垃圾回收器线程就是一种守护线程  
Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等待它们处理完当前请求 
 
共享模型之并发 访问共享变量时,代码的原子性(互斥)以及并发协调(同步)
共享问题 
synchronized 
线程安全分析  
Monitor  
wait/notify  
线程状态转换  
活跃性  
Lock 加以改进 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 static  int  counter  =  0 ;  Thread  t1  =  new  Thread (() -> {for  (int  i  =  0 ; i < 5000 ; i++) {"t1" );Thread  t2  =  new  Thread (() -> {for  (int  i  =  0 ; i < 5000 ; i++) {"t2" );
阻塞:synchronized、lock 
非阻塞:原子变量 
 
synchronized 对象锁,同一时刻只有一个线程获取到针对该对象的锁,获取失败进入等待队列
1 2 3 4 5 6 7 8 9 synchronized (对象) void  lock ()  { while  (xchg(&locked, 1 )) ; }void  unlock ()  { xchg(&locked, 0 ); }
加在成员方法上,锁对象   synchronized(this)
1 2 3 4 5 public  synchronized  void  methodName ()  {synchronized 了 也不能并行,因为对象被锁了而不是函数被锁了
加在静态方法上,锁类对象   synchronized(MyClass.class)
1 2 3 4 5 public  class  MyClass  {public  static  synchronized  void  staticMethod ()  {
局部变量理论是线程安全的(每个栈都有栈帧),成员变量和静态变量不是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  ThreadSafe  {public  final  void  method1 (int  loopNumber)  {new  ArrayList <>();for  (int  i  =  0 ; i < loopNumber; i++) {private  void  method2 (ArrayList<String> list)  {"1" );private  void  method3 (ArrayList<String> list)  {0 );
但如果方法是public并且在子类中被修改了,就可能出错,所以private 或者 final的修饰符是有必要的,满足开闭 原则
1 2 3 4 5 6 7 8 class  ThreadSafeSubClass  extends  ThreadSafe {@Override public  void  method3 (ArrayList<String> list)  {new  Thread (() -> {0 );
常见类 线程安全的类: 单一方法是安全的,组合不一定(想要安全还要额外上锁)
java.lang.String、 java.lang.Integer  不可变对象 
java.lang.StringBuffer 
java.lang.Float 
java.lang.Boolean 
java.util.Vector 
java.util.Hashtable 
java.util.concurrent.ConcurrentHashMap 
 
非线程安全的类:
java.lang.StringBuilder 
java.util.ArrayList 
java.util.LinkedList 
java.util.HashMap 
java.util.HashSet 
 
例 MyServlet只有一个,共享的。所以userService也是一个共享的。count不是安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  class  MyServlet  extends  HttpServlet  {private  UserService  userService  =  new  UserServiceImpl ();public  void  doGet (HttpServletRequest request, HttpServletResponse response)  {public  class  UserServiceImpl  implements  UserService  {private  int  count  =  0 ;public  void  update ()  {
单例中的成员变量都是共享的。  改成环绕通知中的局部变量就解决了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Aspect @Component public  class  MyAspect  {private  long  start  =  0L ;@Before("execution(* *(..))") public  void  before ()  {@After("execution(* *(..))") public  void  after ()  {long  end  =  System.nanoTime();"cost time:"  + (end-start));
dao中的数据库连接,不能共享否则被别人close了,所以每个查询都要在局部变量中获取一个
1 2 3 4 5 6 7 8 9 10 public  class  UserDaoImpl  implements  UserDao  {private  Connection  conn  =  null ;public  void  update ()  throws  SQLException {String  sql  =  "update user set password = ? where username = ?" ;"" ,"" ,"" );
转账 锁住类对象,简单的解决办法但效率不高
1 2 3 4 5 6 7 8 9 10 class  Account  {public  void  transfer (Account target, int  amount)  {synchronized (Account.class){  if  (this .money > amount) {this .setMoney(this .getMoney() - amount);
monitor 对象头 Mark Word
Monitor Monitor是JVM中提供的一个对象,负责管理某个对象的锁。我们的对象通过MarkWord中的指针指向monitor对象(一对一)  c++实现的
获取成功为Owner 
失败加入EntryList(还可以先进行自旋 几次,如果还失败才加入,减少上下文切换   自适应); 
在thread-2释放时唤醒一个(线程的阻塞和唤醒操作是在Java虚拟机内部 进行的,而不涉及到底层操作系统的系统调用) 
 
字节码角度 1 2 3 4 synchronized (lock) 
除了正常处理外,19~23为异常处理,也会释放锁
优化 优化:轻量级、偏向锁
轻量级锁 :在竞争比较少的情况下,每次上锁太麻烦了;房门上挂书包  对使用者透明 偏向锁 :直接在房门上课上名字,专属于谁 
轻量级锁 锁记录 :线程中负责记录 该线程锁住了哪些对象
加锁:如果对象没被锁(01),通过CAS 让对象头保留锁记录地址,锁记录保存原对象头信息
加锁失败:如果对象已经被锁了(00),锁膨胀 :申请一个monitor,对象头指向monitor,加入entrylist
解锁:CAS再交换回来,如果发现对象被重量级锁锁住了,就进入重量级锁解锁流程
 
偏向锁 轻量级锁问题:自己调用时,还需要指向CAS 操作(这次一定会失败),偏向锁优化掉这个操作
初始时默认状态就是该状态,但程序加载会有延时 
可以手动禁用,或者hashCode()时会禁用(因为放不下,而在轻量级锁记录 重量级monitor会记录hash) 
 
锁消除 锁压根不会发生冲突,则直接被优化掉
1 2 3 4 5 6 public  void  b ()  throws  Exception {Object  o  =  new  Object ();synchronized  (o) {
wait / notify 介绍 等价于万能条件变量法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Thread  threadA  =  new  Thread (() -> {synchronized  (lock) {"Thread A is waiting" );"Thread A is resumed" );Thread  threadB  =  new  Thread (() -> {synchronized  (lock) {"Thread B is performing some task" );
在获取锁后,发现不满足情况,lock.wait()释放锁并进入WaitSet
1 2 mutex_lock(&lk);if  (!cond) cond_wait(&cv, &lk); 
在被Owner lock.notify后,重新进入EntryList。notifyAll()唤醒全部
1 2 cond_signal(&cv);
和操作系统不同点 就是这里锁lk和唤醒信号cv都是lock对象
 
同样可能会存在错误叫醒的情况,while + 广播
1 2 3 4 5 6 7 8 9 synchronized (lock) {while (!cond) {synchronized (lock) {
wait sleep区别 
wait 是Object方法;sleep是Thread方法 
wait必须要先获取锁并且再释放锁,sleep不用且不会释放锁 
 
同步模式之保护性暂停 
一个线程等待另外一个线程结果 
JDK 中,join 的实现、Future 的实现,采用的就是此模式 
但如果是一直产生:消息队列(见生产者/消费者) 
 
如果用join实现,必须要下载线程结束,并且变量要设置为全局的
基本实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class  GuardedObject  {private  Object response;private  final  Object  lock  =  new  Object ();public  Object get ()  {synchronized  (lock) {while  (response == null ) {try  {catch  (InterruptedException e) {return  response;public  void  complete (Object response)  {synchronized  (lock) {this .response = response;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  static  void  main (String[] args)  {GuardedObject  guardedObject  =  new  GuardedObject ();new  Thread (() -> {"download complete..." );Object  response  =  guardedObject.get();
带超时 直接wait(time) break不行,因为存在虚假唤醒。记录等待时间防止多等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  Object get (long  millis)  {synchronized  (lock) {long  begin  =  System.currentTimeMillis();long  timePassed  =  0 ;while  (response == null ) {long  waitTime  =  millis - timePassed;"waitTime: {}" , waitTime);if  (waitTime <= 0 ) {"break..." );break ;try  {catch  (InterruptedException e) {"timePassed: {}, object is null {}" , timePassed, response == null );return  response;
扩展多个 多加一个中间者,实现多对多,但其中每一对还是一一对应的。解耦产生和消费;  PRC框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class  Mailboxes  {private  static  Map<Integer, GuardedObject> boxes = new  Hashtable <>();private  static  int  id  =  1 ;private  static  synchronized  int  generateId ()  {return  id++;public  static  GuardedObject getGuardedObject (int  id)  {return  boxes.remove(id);public  static  GuardedObject createGuardedObject ()  {GuardedObject  go  =  new  GuardedObject (generateId());return  go;public  static  Set<Integer> getIds ()  {return  boxes.keySet();
Join原理 1 2 3 4 5 6 7 线程对象也是对象synchronized  然后执行B.wait(delay);public  final  synchronized  void  join (long  millis) {0 );
异步模式之生产者/消费者 
生产者消费者不需要一一对应 
JDK中的阻塞队列原理 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public  class  MessageQueue  {private  LinkedList<Message> queue;private  int  capacity;public  MessageQueue (int  capacity) {new  LinkedList <>();this .capacity = capacity;public  Message get () {synchronized (queue){while  (queue.size() == 0 ){try  {catch  (InterruptedException e) {return  queue.getFirst();public  void  add (Message m) {synchronized (queue){while  (queue.size() == capacity){try  {catch  (InterruptedException e) {return  ;
Park & Unpark wait状态,有点像值最大为1的信号量   但是是以线程 为单位   不需要获取锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import  java.util.concurrent.locks.LockSupport;Thread  t1  =  new  Thread (() -> {"start..." );1 );"park..." );"resume..." );"t1" );2 );"unpark..." );
不需要monitor,唤醒比较精确 
可以先恢复再暂停 
 
原理 每个线程一个parker对象
其中有一个_counter(干粮数量)=0或者1
unpark:_counter++  if(线程在等待) {唤醒, _counter=0} 
park:_counter--   if ( _counter<0)  {wait , _counter=0} 
 
细粒度锁 睡觉和学习应该能并发,所以需要将锁细粒度化,而不是直接锁住this
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  BigRoom  {private  final  Object  studyRoom  =  new  Object ();private  final  Object  bedRoom  =  new  Object ();public  void  sleep ()  {synchronized  (bedRoom) {"sleeping 2 小时" );2 );public  void  study ()  {synchronized  (studyRoom) {"study 1 小时" );1 );
死锁 在一个线程需要获取多把锁时就可能导致
死锁检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 jps "Thread-1" :0x000000000361d378  (object 0x000000076b5bf1c0 , a java.lang.Object),"Thread-0" "Thread-0" :0x000000000361e768  (object 0x000000076b5bf1d0 , a java.lang.Object),"Thread-1" for  the threads listed above:"Thread-1" :1 (TestDeadLock.java:28 )0x000000076b5bf1c0 > (a java.lang.Object)0x000000076b5bf1d0 > (a java.lang.Object)2 /883049899. run(Unknown Source)745 )"Thread-0" :0 (TestDeadLock.java:15 )0x000000076b5bf1d0 > (a java.lang.Object)0x000000076b5bf1c0 > (a java.lang.Object)1 /495053715. run(Unknown Source)745 )1  deadlock.
活锁 活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。开发中可以增加随机时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 new  Thread (() -> {while  (count > 0 ) {0.2 );"count: {}" , count);"t1" ).start();new  Thread (() -> {while  (count < 20 ) {0.2 );"count: {}" , count);"t2" ).start();
饥饿 某个线程始终不能运行,如设置了线程优先级,优先级低的可能难以运行
ReentrantLock 
可中断 
可以设置超时时间      
可以设置为公平锁     先到先得而不是随机 
支持多个条件变量     相当于不同条件变量进入不同WaitSet    现在就完全相当于万能条件变量法 等价于synchronized+wait notifyall 升级 
 
都可以重入
java实现的
1 2 3 4 5 6 7 8 try  {finally  {
可中断 等待锁的过程中可以被叫醒
1 2 3 4 5 6 7 8 9 10 11 12 13 try  {catch  (InterruptedException e) {"等锁的过程中被打断" );return ;1 );
可超时 避免无限制的等待
1 2 3 4 5 6 7 8 9 10 11 if  (!lock.tryLock()) { "获取立刻失败,返回" );return ;try  {"获得了锁" );finally  {1 , TimeUnit.SECONDS) 
解决哲学家 获取锁时,如果右手获取不到,需要立马不等并且左手要解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if  (left.tryLock()) {try  {if  (right.tryLock()) {try  {finally  {finally  {
公平锁 会降低并发度 默认为false   lock = new ReentrantLock(true);
条件变量 之前等待队列只有一个,直接是lock对象
1 2 3 4 synchronized  (lock) {"Thread A is waiting" );
在lock基础上,创建一个condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private  ReentrantLock  lock  =  new  ReentrantLock ();private  Condition  condition  =  lock.newCondition();private  boolean  conditionMet  =  false ;public  void  await ()  throws  InterruptedException {try  {while  (!conditionMet) {"Condition is met. Resuming execution." );finally  {public  void  signal ()  {try  {true ;"Condition is signaled." );finally  {
同步模式之顺序控制 线程执行顺序 
wait notify:  还需要一个额外变量标记代表cond 
park unpark  非常简洁 
 
打印指定形状 例如打印abcabc   和打印🐟一个原理
直接万能条件变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  void  print (int  waitFlag, int  nextFlag, String str)  {synchronized  (this ) {while  (this .flag != waitFlag) {try  {this .wait();catch  (InterruptedException e) {this .notifyAll();
ReentrantLock + condition  每个线程都有等待的cond以及唤醒的cond 感觉没有必要 不如直接上面whlie+notifyAll
1 2 3 4 5 6 7 8 9 10 11 12 13 public  void  print (String str, Condition current, Condition next)  {this .lock();try  {catch  (InterruptedException e) {finally  {this .unlock();
park unpark:每次unpark下一个想打印的线程,需要一个数组以获得下一个线程
1 2 3 LockSupport.park();
 
小结 分析多线程访问共享资源时,哪些代码片段属于临界区 
使用 synchronized 互斥解决临界区的线程安全问题 
掌握 synchronized 锁对象语法  
掌握 synchronzied 加载成员方法this 和静态方法语法 this.getClass() 
掌握 wait/notify 同步方法 
 
使用 lock 互斥解决临界区的线程安全问题 
掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量    
相当于synchronzied 的升级  
 
学会分析变量的线程安全性、掌握常见线程安全类 的使用
 了解线程活跃性问题:死锁、活锁、饥饿 
应用方面  
互斥:使用 synchronized 或 Lock 达到共享资源互斥效果 
同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果 
 
原理方面  
monitor、synchronized 、wait/notify 原理  
synchronized 进阶原理   轻量级  偏向锁  锁消除 
park & unpark 原理 
 
模式方面  
同步模式之保护性暂停  
异步模式之生产者消费者  
同步模式之顺序控制 
 
内存模型 引入 通过volatile解决由于缓存引发的可见性问题,以及重排序引发的有序性问题
JMM java memory model
原子性 
可见性  不受缓存影响 
有序性  不受cpu指令并行优化影响 
 
JMM的主要内容包括:
主内存(Main Memory):主内存是所有线程共享的内存区域,用于存储共享变量。主内存中的数据对所有线程可见。 
工作内存(Working Memory):每个线程都有自己的工作内存,用于存储线程执行时需要使用的数据。工作内存中包含了主内存中的部分数据副本。 
内存间交互操作:JMM定义了一组规则,用于线程在主内存和工作内存之间进行数据交互。这些操作包括读取、写入和同步操作。 
顺序一致性(Sequential Consistency):JMM保证线程的执行结果与顺序一致的执行结果相同。即,对于一个线程来说,它的操作将按照程序中的顺序执行,并且对其他线程可见。 
可见性(Visibility):JMM保证一个线程对共享变量的修改对其他线程是可见的。这意味着一个线程对变量的修改,将会在之后的操作中对其他线程可见。 
原子性(Atomicity):JMM提供了一些原子性的保证。例如,对volatile变量的读写具有原子性,单个读写操作不会被线程中断。 
重排序(Reordering):JMM允许编译器和处理器对指令进行优化和重排序,但要求保持程序的顺序一致性和线程的可见性。 
 
可见性 
实际上停不下来,为什么? 
while中如果有sout,就可以停下来了 为什么? 
 
1 2 3 4 5 6 7 8 9 10 11 12 static  boolean  run  =  true ;public  static  void  main (String[] args)  throws  InterruptedException {Thread  t  =  new  Thread (() -> {while  (run) {1 );false ; 
线程中存储了run = true; 的副本
volatile :强制到主存中读取,修饰成员变量和静态成员变量。不能保证原子性,一个i++ 一个i– 还是会错,适合一个写其他读的情况 (轻量级的同步机制)synchronized :也可以实现必须去主存读取,但复杂度高。可以实现原子性(代码块内也可能重排序)
 
改进两阶段 使用volatile标记
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class  TPTVolatile  {private  Thread thread;private  volatile  boolean  stop  =  false ;public  void  start () {new  Thread (() -> {while (true ) {Thread  current  =  Thread.currentThread();if (stop) {"料理后事" );break ;try  {1000 );"将结果保存" );catch  (InterruptedException e) {"监控线程" );public  void  stop ()  {true ;
balking 监控线程只有一个,但如果有人多次start()其实会调用多个。balking避免重复执行某个操作   任务调度
加一个volatile变量if判断 ?   不行 不能保证原子 
加synchronized  可以实现,但每次都要synchronized同步比较慢(如用这个实现单例) 
缩小synchronized 范围。  为什么不直接用单例 单例是保证只有一个实例 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  class  MonitorService  {private  volatile  boolean  starting; public  void  start ()  {"尝试启动监控线程..." );synchronized  (this ) {if  (starting) {return ;true ;
有序性 流水线 在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序 和组合来实现指令级并行,单线程下正确,但多线程下有问题
1 2 3 4 5 6 7 int  a  =  10 ; int  b  =  20 ; int  a  =  10 ; int  b  =  a - 5 ; 
指令重排序问题 1 4为正常输出 但可能出现0(概率比较低)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int  num  =  0 ;boolean  ready  =  false ;public  void  actor1 (I_Result r)  {if (ready) {else  {1 ;public  void  actor2 (I_Result r)  { 2 ; true ; 
对变量ready添加volatile会禁用重排序
volatile原理 volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
对 volatile 变量的写指令后会加入写屏障  
对 volatile 变量的读指令前会加入读屏障 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  void  actor2 (I_Result r)  {2 ;true ; public  void  actor1 (I_Result r)  {if (ready) {else  {1 ;
可见性:
遇到写屏障(sfence),对所有共享变量的改动,都同步到主存当中 
遇到读屏障(lfence),去主存加载最新数据 
 
 
有序性:
 
 
double-check-locking 普通写法
1 2 3 4 5 6 7 8 9 10 11 12 public  final  class  Singleton  {private  Singleton ()  { }private  static  Singleton  INSTANCE  =  null ;public  static  synchronized  Singleton getInstance ()  {if ( INSTANCE != null  ){return  INSTANCE;new  Singleton ();return  INSTANCE;
double
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public  final  class  Singleton  {private  Singleton ()  { }private  static  volatile  Singleton  INSTANCE  =  null ;public  static  Singleton getInstance ()  {if  (INSTANCE != null ) { return  INSTANCE;synchronized  (Singleton.class) { if  (INSTANCE != null ) { return  INSTANCE;new  Singleton (); return  INSTANCE;
可见性进阶 
synchronized中的变量 
volatile修饰的变量 
在线程开始前修改变量(可以理解为创建副本) 
t1.join() 后,可以看到t1中的修改 
t1打断t2, t2.interrupt();  t2可以看到t1的写 
对变量默认值的写,其他线程可见 
具有传递性 ,在写屏障前的全部修改都可见 y = 10 x = 1  (x是volatile) 
 
单例习题 饿汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public  final  class  Singleton  implements  Serializable  {private  Singleton ()  {}private  static  final  Singleton  INSTANCE  =  new  Singleton ();public  static  Singleton getInstance ()  {return  INSTANCE;public  Object readResolve ()  {return  INSTANCE;
枚举类
1 2 3 4 5 6 7 8 9 enum  Singleton  { 
类加载实现懒汉式单例:
1 2 3 4 5 6 7 8 9 10 11 public  final  class  Singleton  {private  Singleton ()  { }private  static  class  LazyHolder  {static  final  Singleton  INSTANCE  =  new  Singleton ();public  static  Singleton getInstance ()  {return  LazyHolder.INSTANCE;
无锁并发 
本章内容 
CAS 与 volatile  
原子整数  
原子引用  
原子累加器  
Unsafe 
 
CAS可以实现锁,0代表空闲1代表占用,下一章连接池中使用atomic数组实现多个连接池的锁 
reentrantlock底层其实还是CAS,外带一个等待队列(park实现等待) 见原理部分 
 
Atomic 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 AtomicInteger  atomicInteger  =  new  AtomicInteger (0 );int  value  =  atomicInteger.get();10 );int  oldValue  =  atomicInteger.getAndSet(20 );boolean  result  =  atomicInteger.compareAndSet(20 , 30 );int  newValue  =  atomicInteger.getAndIncrement();5 );5 );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 while  (true ) {int  prev  =  balance.get();int  next  =  prev - amount;if  (balance.compareAndSet(prev, next)) {  break ;public  final  int  getAndAddInt (Object this , long  offset, int  var4=1 )  {int  old;do  {this .getIntVolatile(this , offset);while (!this .compareAndSwapInt(this , offset, old, old + 1 ));return  old;
CAS Compare And Swap:先比较是否是旧值,旧值没被修改才swap(乐观锁) 
核心一个函数:this和offset用于确定地址
1 public  final  native  boolean  compareAndSwapInt (Object this , long  offset, int  old, int  new) ;
底层:lock cmpxchg 指令(X86 架构)
变量存储在一个volatile 值中,因为每次都要保证可见性
1 private  volatile  int  value;
效率高 :不需要 锁的获取 以及 线程的上下文下切换,但需要更高的cpu资源,受限cpu内核数
原子整数 
AtomicBoolean  
AtomicInteger  
AtomicLong 
 
操作 原理都是while中尝试compareAndSwapInt
getAndIncrement();  incrementAndGet 
getAndAdd(10); 
getAndUpdate(p -> p * 2); 
 
原子引用 
AtomicReference  
AtomicMarkableReference  
AtomicStampedReference 
 
对象不是基本类型,提供CAS对对象进行操作,compare比较的是地址
AtomicReference 保证多线程环境下取钱操作正常,并且不需要加锁
1 2 3 4 5 6 7 8 9 10 private  AtomicReference<Double> balance = new  AtomicReference <>(0.0 );public  void  withdraw (double  amount)  {do  {while  (!balance.compareAndSet(oldValue, newValue));
ABA 有人修改了值,但又改回来了,如何察觉到被修改了呢?
AtomicStampedReference 添加版本号,每次操作版本号+1,除了值要匹配,版本号也要匹配
1 2 3 4 5 6 7 8 9 static  AtomicStampedReference<String> ref = new  AtomicStampedReference <>("A" , 0 );String  prev  =  ref.getReference();int  stamp  =  ref.getStamp();"C" , stamp, stamp + 1 )
AtomicMarkableReference 不关心换了几次,只关心有没有换。用一个bool 来判断
1 2 3 4 5 6 7 8 9 10 AtomicMarkableReference<T> ref = new  AtomicMarkableReference <>(initialRef, initialMark=true );T  currentRef  =  ref.getReference();boolean  currentMark  =  ref.isMarked();boolean  success  =  ref.compareAndSet(expectedRef, newRef, expectedMark, newMark);boolean [] mark = new  boolean [1 ];int  value  =  counter.get(mark);1 , mark[0 ], !mark[0 ])
原子数组 
AtomicIntegerArray  
AtomicLongArray  
AtomicReferenceArray 
 
前面的是保证引用的对象不变,现在需要保护引用对象的内部不被改变,例如数组对象的内容没有修改(多个引用对象),底层就是偏移量不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private  static  <T> void  demo (     Supplier<T> arraySupplier,     Function<T, Integer> lengthFun,     BiConsumer<T, Integer> putConsumer,     Consumer<T> printConsumer )  {new  ArrayList <>();T  array  =  arraySupplier.get();int  length  =  lengthFun.apply(array);for  (int  i  =  0 ; i < length; i++) {new  Thread (() -> {for  (int  j  =  0 ; j < 10000 ; j++) {try  {catch  (InterruptedException e) {
1 2 3 4 5 6 7 8 9 10 11 12 13 demo(new  int [10 ],new  AtomicIntegerArray (10 ),
字段更新器 
AtomicReferenceFieldUpdater // 域 字段  
AtomicIntegerFieldUpdater  
AtomicLongFieldUpdater 
 
某个对象内部的字段,保证原子操作。必须volatile
1 2 3 4 5 6 7 private  volatile  int  field;public  static  void  main (String[] args)  {AtomicIntegerFieldUpdater  fieldUpdater  = AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field" );Test5  test5  =  new  Test5 ();0 , 10 );
LongAdder 更高级的自增
1 2 3 4 5 6 for  (int  i  =  0 ; i < 5 ; i++) {new  LongAdder (), adder -> adder.increment());for  (int  i  =  0 ; i < 5 ; i++) {new  AtomicLong (), adder -> adder.getAndIncrement());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private  static  <T> void  demo (Supplier<T> adderSupplier, Consumer<T> action)  {T  adder  =  adderSupplier.get();long  start  =  System.nanoTime();new  ArrayList <>();for  (int  i  =  0 ; i < 40 ; i++) {new  Thread (() -> {for  (int  j  =  0 ; j < 500000 ; j++) {try  {catch  (InterruptedException e) {long  end  =  System.nanoTime();" cost:"  + (end - start)/1000_000 );
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
源码 1 2 3 4 5 6 transient  volatile  Cell[] cells;transient  volatile  long  base;transient  volatile  int  cellsBusy;
@sun.misc.Contended 防止cell伪共享
缓存行64 byte,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value)  一个缓存行可能有多个Cell,注解添加128B的pad防止在同一个缓存行
从 cpu 到 
大约需要的时钟周期 
 
 
寄存器 
1 cycle (4GHz 的 CPU 约为0.25ns) 
 
L1 
3~4 cycle 
 
L2 
10~20 cycle 
 
L3 
40~45 cycle 
 
内存 
120~240 cycle 
 
Unsafe 会操作内存,比较危险
获取unsafe
1 2 3 4 5 6 Unsafe.getUnsafe();  被CallerSensitive修饰,能在引导类加载器加载的类中访问 抛出 SecurityException Field  unsafeField  =  Unsafe.class.getDeclaredField("theUnsafe" );true );Unsafe  unsafe  =  (Unsafe) unsafeField.get(null );
操作field
1 2 3 4 5 6 7 Teacher  teacher  =  new  Teacher ();long  idOffset  =  unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id" ));long  nameOffset  =  unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name" ));0  , 1 );
模拟原子整数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class  AtomicData  {private  volatile  int  data;static  final  Unsafe unsafe;static  final  long  DATA_OFFSET;static  {try  {"data" ));catch  (NoSuchFieldException e) {throw  new  Error (e);public  AtomicData (int  data)  {this .data = data;public  void  decrease (int  amount)  {int  oldValue;while (true ) {if  (unsafe.compareAndSwapInt(this , DATA_OFFSET, oldValue, oldValue - amount)) {return ;public  int  getData ()  {return  data;
不可变 日期转换的问题 1 2 3 4 5 6 SimpleDateFormat  sdf  =  new  SimpleDateFormat ("yyyy-MM-dd" );"1951-04-21" ); synchronized  (sdf){  "1951-04-21" );
内部属性不可变实现安全
1 DateTimeFormatter  dtf  =  DateTimeFormatter.ofPattern("yyyy-MM-dd" );
不可变设计 1 2 3 4 5 6 7 8 9 10 11 12 public  final  class  String implements  java .io.Serializable, Comparable<String>, CharSequence {private  final  char  value[];private  int  hash; this .value = Arrays.copyOf(value, value.length); this .value = Arrays.copyOfRange(value, offset, offset+count);
享元模式 minimizes memory usage by sharing
包装类 在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的
1 2 3 4 5 6 7 public  static  Long valueOf (long  l)  {final  int  offset  =  128 ;if  (l >= -128  && l <= 127 ) { return  LongCache.cache[(int )l + offset];return  new  Long (l);
String 串池 BigDecimal BigInteger 模拟连接池! 有多个连接,所以需要array标记使用状态:AtomicArray
AtomicIntegerArray来标记连接状态,并且是线程安全的,注意需要使用cas修改标记
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class  Pool  {private  final  int  poolSize;private  Connection[] connections;private  AtomicIntegerArray states;public  Pool (int  poolSize)  {this .poolSize = poolSize;this .connections = new  Connection [poolSize];this .states = new  AtomicIntegerArray (new  int [poolSize]);for  (int  i  =  0 ; i < poolSize; i++) {new  MockConnection ("连接"  + (i+1 ));public  Connection borrow ()  {while (true ) {for  (int  i  =  0 ; i < poolSize; i++) {if (states.get(i) == 0 ) {if  (states.compareAndSet(i, 0 , 1 )) {"borrow {}" , connections[i]);return  connections[i];synchronized  (this ) {try  {"wait..." );this .wait();catch  (InterruptedException e) {public  void  free (Connection conn)  {for  (int  i  =  0 ; i < poolSize; i++) {if  (connections[i] == conn) {0 );synchronized  (this ) {"free {}" , conn);this .notifyAll();break ;class  MockConnection  implements  Connection  {
以上实现没有考虑:
连接的动态增长与收缩 
连接保活(可用性检测) 
等待超时处理 
分布式 hash 
 
对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apache
tomcat jdbc连接池比较简单易读
线程池 
线程池 ThreadPollExecutor Fork/Join 
JUC   Lock Semaphore CountdownLatch CyclicBarrier… 
第三方 
 
自己线程池 无救急线程
高并发下,并不是越大越好,而是需要充分发挥已有线程的潜力
需要一个线程set 
当一个线程结束后,查看有没有BlockingQueue有没有任务,有就run(实现复用) 
 
BlockingQueue 实现保存暂时没有执行的任务列表,任务队列相当于生产者消费者模型 ,但使用lock实现
获取一个任务take
为了实现如果等一段时间内还没有任务结束线程(不是一直死等),需要添加超时的等待take -> poll(timeout, unit) 
 
添加一个任务put
等待任务队列也不是无穷大,有一个capcity
当满了以后,有不同的策略 
put 死等,阻塞主线程 fullWaitSet.await()
offer(task, timeout, unit) 带超时等待
添加一个拒绝策略,策略模式
具体实现的策略可以是死等、超时…
1 2 3 4 5 6 7 8 9 10 11 12 (queue, task)->{1500 , TimeUnit.MILLISECONDS);"放弃{}" , task);throw  new  RuntimeException ("任务执行失败 "  + task);
 
 
 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 class  BlockingQueue <T> {private  Deque<T> queue = new  ArrayDeque <>();private  ReentrantLock  lock  =  new  ReentrantLock ();private  Condition  fullWaitSet  =  lock.newCondition();private  Condition  emptyWaitSet  =  lock.newCondition();private  int  capcity;public  BlockingQueue (int  capcity)  {this .capcity = capcity;public  T poll (long  timeout, TimeUnit unit)  {try  {long  nanos  =  unit.toNanos(timeout);while  (queue.isEmpty()) {try  {if  (nanos <= 0 ) {return  null ;catch  (InterruptedException e) {T  t  =  queue.removeFirst();return  t;finally  {public  T take ()  {try  {while  (queue.isEmpty()) {try  {catch  (InterruptedException e) {T  t  =  queue.removeFirst();return  t;finally  {public  void  put (T task)  {try  {while  (queue.size() == capcity) {try  {"等待加入任务队列 {} ..." , task);catch  (InterruptedException e) {"加入任务队列 {}" , task);finally  {public  boolean  offer (T task, long  timeout, TimeUnit timeUnit)  {try  {long  nanos  =  timeUnit.toNanos(timeout);while  (queue.size() == capcity) {try  {if (nanos <= 0 ) {return  false ;"等待加入任务队列 {} ..." , task);catch  (InterruptedException e) {"加入任务队列 {}" , task);return  true ;finally  {public  int  size ()  {try  {return  queue.size();finally  {public  void  tryPut (RejectPolicy<T> rejectPolicy, T task)  {try  {if (queue.size() == capcity) {this , task);else  { "加入任务队列 {}" , task);finally  {
ThreadPool 1 2 3 4 5 6 7 8 9 private  BlockingQueue<Runnable> taskQueue;private  HashSet<Worker> workers = new  HashSet <>();private  int  coreSize;private  RejectPolicy<Runnable> rejectPolicy;
创建任务: 
执行与任务完成: 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package  threadpoolself;class  ThreadPool  {private  BlockingQueue<Runnable> taskQueue;private  HashSet<Worker> workers = new  HashSet <>();private  int  coreSize;private  long  timeout;private  TimeUnit timeUnit;private  RejectPolicy<Runnable> rejectPolicy;public  void  execute (Runnable task)  {synchronized  (workers) {if (workers.size() < coreSize) {Worker  worker  =  new  Worker (task);"新增 worker{}, {}" , worker, task);else  {public  ThreadPool (int  coreSize, long  timeout, TimeUnit timeUnit, int  queueCapcity,                       RejectPolicy<Runnable> rejectPolicy)  {this .coreSize = coreSize;this .timeout = timeout;this .timeUnit = timeUnit;this .taskQueue = new  BlockingQueue <>(queueCapcity);this .rejectPolicy = rejectPolicy;class  Worker  extends  Thread {private  Runnable task;public  Worker (Runnable task)  {this .task = task;@Override public  void  run ()  {while (task != null  || (task = taskQueue.poll(timeout, timeUnit)) != null ) {try  {"正在执行...{}" , task);catch  (Exception e) {finally  {null ;synchronized  (workers) {"worker 被移除{}" , this );this );
线程池 引入 
 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
状态名 
高3位 
新任务 
处理阻塞队列任务 
说明 
 
 
RUNNING 
111 
Y 
Y 
 
SHUTDOWN 
000 
N 
Y 
不会接收新任务,但会处理阻塞队列剩余任务shutdown 
 
STOP 
001 
N 
N 
会中断正在执行的任务,并抛弃阻塞队列任务shutdownNow 
 
TIDYING 
010 
任务全执行完毕,活动线程为 0 即将进入终结 
 
TERMINATED 
011 
终结状态 
 
1 2 3 4 5 6 7 public  ThreadPoolExecutor (int  corePoolSize,                           int  maximumPoolSize,                           long  keepAliveTime,                           TimeUnit unit,                           BlockingQueue<Runnable> workQueue,                           ThreadFactory threadFactory,                           RejectedExecutionHandler handler) 
corePoolSize 核心线程数目 (最多保留的线程数) 
maximumPoolSize 最大线程数目   减去核心线程就是救急线程数量,多出来的会被销毁 
keepAliveTime 生存时间 - 针对救急线程 
unit 时间单位 - 针对救急线程 
workQueue 阻塞队列 
threadFactory 线程工厂 - 可以为线程创建时起个好名字 
handler 拒绝策略 
 
当阻塞队列满时,会先创建救济线程,再考虑拒绝策略 
拒绝策略 
AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
Netty 的实现,是创建一个新线程来执行任务
ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
 
newFixedThreadPool 1 2 3 4 5 public  static  ExecutorService newFixedThreadPool (int  nThreads)  {return  new  ThreadPoolExecutor (nThreads, nThreads,0L , TimeUnit.MILLISECONDS,new  LinkedBlockingQueue <Runnable>());
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间 
阻塞队列是无界的,可以放任意数量的任务 
任务量已知,任务耗时 
 
newCachedThreadPool 1 2 3 4 5 public  static  ExecutorService newCachedThreadPool ()  {return  new  ThreadPoolExecutor (0 , Integer.MAX_VALUE,60L , TimeUnit.SECONDS,new  SynchronousQueue <Runnable>());
全部都是救急线程(60s 后可以回收) 
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货) 
适合任务数比较密集,但每个任务执行时间较短的情况 
 
newSingleThreadExecutor 1 2 3 4 5 6 public  static  ExecutorService newSingleThreadExecutor ()  {return  new  FinalizableDelegatedExecutorService new  ThreadPoolExecutor (1 , 1 ,0L , TimeUnit.MILLISECONDS,new  LinkedBlockingQueue <Runnable>()));
使用场景:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
FinalizableDelegatedExecutorService限制了一些方法的暴露
提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void  execute (Runnable command) ;submit (Callable<T> task) ;   futer.get()获取结果(原理保护性暂停模式) 有异常返回异常invokeAll (Collection<? extends Callable<T>> tasks) throws  InterruptedException;invokeAll (Collection<? extends Callable<T>> tasks,                               long  timeout, TimeUnit unit) throws  InterruptedException;invokeAny (Collection<? extends Callable<T>> tasks) throws  InterruptedException, ExecutionException;invokeAny (Collection<? extends Callable<T>> tasks,                 long  timeout, TimeUnit unit) throws  InterruptedException, ExecutionException, TimeoutException;
其他 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void  shutdown () ;public  List<Runnable> shutdownNow ()  boolean  isShutdown () ;boolean  isTerminated () ;boolean  awaitTermination (long  timeout, TimeUnit unit)  throws  InterruptedException;
异步模式之工作线程 分工模式,不同的任务采用不同的线程池。如点餐线程池和做饭线程池
如果同一个线程池中,可能会没有做饭的导致点餐的一直在等待,导致饥饿。但jconsole死锁检测不到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ExecutorService  waiterPool  =  Executors.newFixedThreadPool(1 );ExecutorService  cookPool  =  Executors.newFixedThreadPool(1 );"处理点餐..." );"做菜" );return  cooking();try  {"上菜: {}" , f.get());catch  (InterruptedException | ExecutionException e) {
线程池数量 
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
CPU 密集 型运算
	通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因
I/O 密集型 运算
	CPU 容易闲下来(IO RPC),你可以利用多线程提高它的利用率。    时间占比可以用工具估算
	线程数 = 核数 * 期望 CPU 利用率(1.0) * 总时间(CPU计算时间+等待时间) / CPU 计算时间
 
任务调度 如何实现一些延时的任务,或者反复执行
Timmer 同一时间只能有一个任务在执行,出现异常后续不可以执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  static  void  main (String[] args)  {Timer  timer  =  new  Timer ();TimerTask  task1  =  new  TimerTask () {@Override public  void  run ()  {"task 1" );2 );TimerTask  task2  =  new  TimerTask () {@Override public  void  run ()  {"task 2" );1000 );1000 );
ScheduledExecutorService 延时任务或者重复执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ScheduledExecutorService  executor  =  Executors.newScheduledThreadPool(2 );"任务1,执行时间:"  + new  Date ());try  { Thread.sleep(2000 ); } catch  (InterruptedException e) { }1000 , TimeUnit.MILLISECONDS);"任务2,执行时间:"  + new  Date ());1000 , TimeUnit.MILLISECONDS);"running..." );2 , 1 , TimeUnit.SECONDS);"running..." );1 , 1 , TimeUnit.SECONDS);
异常处理 
Tomcat线程池 
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲 
Acceptor 只负责【接收新的 socket 连接】 
Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】 
一旦可读,封装一个任务对象(socketProcessor implement Runnable),提交给 Executor 线程池处理 
Executor 线程池中的工作线程最终负责【处理请求】 
 
线程池源码 Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  void  execute (Runnable command, long  timeout, TimeUnit unit)  {try  {super .execute(command);catch  (RejectedExecutionException rx) {if  (super .getQueue() instanceof  TaskQueue) {final  TaskQueue  queue  =  (TaskQueue)super .getQueue();try  {if  (!queue.force(command, timeout, unit)) {throw  new  RejectedExecutionException ("Queue capacity is full." );catch  (InterruptedException x) {throw  new  RejectedExecutionException (x);else  {throw  rx;
配置 Connector 配置
Executor 线程配置,优先级高于上面的配置
此外,对救急线程的激活逻辑做了修改,先创建救急线程而不是加入队列
Fork/Join 大任务拆分为算法上相同的小任务,小任务分配到不同线程从而并行
1 2 3 4 public  static  void  main (String[] args)  {ForkJoinPool  pool  =  new  ForkJoinPool (4 );new  AddTask1 (5 )));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class  AddTask1  extends  RecursiveTask <Integer> {int  n;public  AddTask1 (int  n)  {this .n = n;@Override public  String toString ()  {return  "{"  + n + '}' ;@Override protected  Integer compute ()  {if  (n == 1 ) {"join() {}" , n);return  n;AddTask1  t1  =  new  AddTask1 (n - 1 );"fork() {} + {}" , n, t1);int  result  =  n + t1.join();"join() {} + {} = {}" , n, t1, result);return  result;
拆分优化:拆分成 begin-mid,mid+1-end 能提高并行度
1 2 3 4 public  AddTask3 (int  begin, int  end)  {this .begin = begin;this .end = end;
并发工具 AQS 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
用 state   volatile属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取
getState - 获取 state 状态 
setState - 设置 state 状态 
compareAndSetState - cas 机制设置 state 状态 
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 
 
 
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList 
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet 
 
1 2 3 4 5 6 7 8 9 if  (!tryAcquire(arg)) {if  (tryRelease(arg)) {
需要实现以下方法: 不同的实现代表不同锁类型    AQS其他方法会调用下面的方法,详情见ReentrantLock
tryAcquire 
tryRelease 
tryAcquireShared 
tryReleaseShared 
isHeldExclusively 
 
队列 入队需要把当前Node变成tail,CAS操作防止并发影响
1 2 3 4 5 do  {Node  prev  =  tail;while (tail.compareAndSet(prev, node))
某个线程释放锁后,会唤醒Head的下一个,并尝试tryAcquire;成功后设置当前节点为head,并且出队列,失败继续等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if  (tryRelease(arg)) {  Node  h  =  head;if  (h != null  && h.waitStatus != 0 )return  true ;final  Node  p  =  node.predecessor();if  (p == head && tryAcquire(arg)) { null ; false ;return  interrupted;
自定义非重入锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 final  class  MySync  extends  AbstractQueuedSynchronizer  {@Override protected  boolean  tryAcquire (int  acquires)  {if  (acquires == 1 ){if  (compareAndSetState(0 , 1 )) {return  true ;return  false ;@Override protected  boolean  tryRelease (int  acquires)  {if (acquires == 1 ) {if (getState() == 0 ) {throw  new  IllegalMonitorStateException ();null );0 );return  true ;return  false ;protected  Condition newCondition ()  {return  new  ConditionObject ();@Override protected  boolean  isHeldExclusively ()  {return  getState() == 1 ;class  MyLock  implements  Lock  {static  MySync  sync  =  new  MySync ();@Override public  void  lock ()  {1 );@Override public  void  lockInterruptibly ()  throws  InterruptedException {1 );@Override public  boolean  tryLock ()  {return  sync.tryAcquire(1 );@Override public  boolean  tryLock (long  time, TimeUnit unit)  throws  InterruptedException {return  sync.tryAcquireNanos(1 , unit.toNanos(time));@Override public  void  unlock ()  {1 );@Override public  Condition newCondition ()  {return  sync.newCondition();
ReentrantLock 
获取锁 当前线程在获取失败后会park
有一个Node链表连接所有线程(有一个虚假head),前一个负责unpark后一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public  final  void  acquire (int  arg)  {if  (!tryAcquire(arg) &&final  boolean  acquireQueued (final  Node node, int  arg)  {boolean  failed  =  true ;try  {boolean  interrupted  =  false ;for  (;;) {final  Node  p  =  node.predecessor();if  (p == head && tryAcquire(arg)) { null ; false ;return  interrupted;if  (shouldParkAfterFailedAcquire(p, node) && true ;finally  {if  (failed)private  Node addWaiter (Node mode)  {Node  node  =  new  Node (Thread.currentThread(), mode);Node  pred  =  tail;if  (pred != null ) {if  (compareAndSetTail(pred, node)) {return  node;return  node;
释放锁 
设置state 
unpark队列中离head最近的Thread 
 
1 2 3 4 5 6 7 8 9 public  final  boolean  release (int  arg)  {if  (tryRelease(arg)) {  Node  h  =  head;if  (h != null  && h.waitStatus != 0 )return  true ;return  false ;
可重入原理 state:代表计数,获取时++,释放时–
可打断原理 获取锁时会进入park,如果被打断就立马抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private  void  doAcquireInterruptibly (int  arg) throws  InterruptedException {final  Node  node  =  addWaiter(Node.EXCLUSIVE);boolean  failed  =  true ;try  {for  (;;) {final  Node  p  =  node.predecessor();if  (p == head && tryAcquire(arg)) {null ; false ;return ;if  (shouldParkAfterFailedAcquire(p, node) &&throw  new  InterruptedException (); finally  {if  (failed)private  final  boolean  parkAndCheckInterrupt ()  {this ); return  Thread.interrupted();
公平性原理 1 2 3 4 5 6 tryAcquireif  (!hasQueuedPredecessors()  &&  0 , acquires)) {return  true ;
条件变量 每一个ConditionObject有一个Node等待队列,nextWaiter串起来,firstWaiter为第一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public  class  ConditionObject  implements  Condition , java.io.Serializable {private  static  final  long  serialVersionUID  =  1173984872572414699L ;private  transient  Node firstWaiter;private  transient  Node lastWaiter;Node  node  =  new  Node (Thread.currentThread(), Node.CONDITION);int  savedState  =  fullyRelease(node);this );
await
signal
读写锁 让读-读并发
1 2 3 private  ReentrantReadWriteLock  rw  =  new  ReentrantReadWriteLock ();private  ReentrantReadWriteLock.ReadLock  r  =  rw.readLock();private  ReentrantReadWriteLock.WriteLock  w  =  rw.writeLock();
1 2 3 4 5 6 7 8 DataContainer  dataContainer  =  new  DataContainer ();new  Thread (() -> {"t1" ).start();new  Thread (() -> {"t2" ).start();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class  CachedData  {volatile  boolean  cacheValid;final  ReentrantReadWriteLock  rwl  =  new  ReentrantReadWriteLock ();void  processCachedData ()  {if  (!cacheValid) {try  {if  (!cacheValid) {true ;finally  {try  {finally  {
读写锁实现缓存一致性 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
原理: 写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位,Node链表还是只有一个
没有仔细读源码
StampedLock 进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
1 2 3 4 5 long  stamp  =  lock.readLock();long  stamp  =  lock.writeLock();
1 2 3 4 5 6 long  stamp  =  lock.tryOptimisticRead();  if (!lock.validate(stamp)){
Semaphore 1 2 3 4 new  Semaphore (3 );
优化连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 this .semaphore = new  Semaphore (poolSize);public  Connection borrow ()  {try  {catch  (InterruptedException e) {for  (int  i  =  0 ; i < poolSize; i++) {if (states.get(i) == 0 ) {if  (states.compareAndSet(i, 0 , 1 )) {"borrow {}" , connections[i]);return  connections[i];return  null ;public  void  free (Connection conn)  {for  (int  i  =  0 ; i < poolSize; i++) {if  (connections[i] == conn) {0 );"free {}" , conn);break ;
原理 原理:state记录数量,cas操作。减完小于零时进入队列 doAcquireSharedInterruptibly
1 2 3 4 5 6 7 8 9 final  int  nonfairTryAcquireShared (int  acquires)  {for  (;;) {int  available  =  getState();int  remaining  =  available - acquires;if  (remaining < 0  ||return  remaining;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private  void  doAcquireSharedInterruptibly (int  arg)  throws  InterruptedException {final  Node  node  =  addWaiter(Node.SHARED);boolean  failed  =  true ;try  {for  (;;) {final  Node  p  =  node.predecessor();if  (p == head) {int  r  =  tryAcquireShared(arg);if  (r >= 0 ) {null ; false ;return ;if  (shouldParkAfterFailedAcquire(p, node) &&throw  new  InterruptedException ();finally  {if  (failed)
释放,检查唤醒后面的
1 2 3 4 5 if  (tryReleaseShared(arg)) {return  true ;return  false ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 protected  final  boolean  tryReleaseShared (int  releases)  {for  (;;) {int  current  =  getState();int  next  =  current + releases;if  (next < current) throw  new  Error ("Maximum permit count exceeded" );if  (compareAndSetState(current, next))return  true ;private  void  doReleaseShared ()  {for  (;;) {Node  h  =  head;if  (h != null  && h != tail) {int  ws  =  h.waitStatus;if  (ws == Node.SIGNAL) {if  (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 ))continue ;            else  if  (ws == 0  &&0 , Node.PROPAGATE))continue ;                if  (h == head)                   break ;
CountDownLatch 用来进行线程同步协作,等待所有线程完成倒计时。new CountDownLatch(3),await() 用来等待计数归零,countDown() 用来让计数减一
原理可以看源码,非常短countDown:state–   await:state不等于零就加入队列
可以用来实现等待线程完成,为什么不用join?线程池!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public  static  void  main (String[] args)  throws  InterruptedException {CountDownLatch  latch  =  new  CountDownLatch (3 );ExecutorService  service  =  Executors.newFixedThreadPool(4 );"begin..." );1 );"end...{}" , latch.getCount());"begin..." );1.5 );"end...{}" , latch.getCount());"begin..." );2 );"end...{}" , latch.getCount());try  {"waiting..." );"wait end..." );catch  (InterruptedException e) {
 应用:
CyclicBarrier 人满发车,可重复使用,state变成0后,会修改为init
await: state–  不为零就加入队列;为零就唤醒所有人,并重置state
1 2 3 4 5 6 7 8 9 CyclicBarrier  cb  =  new  CyclicBarrier (2 ); new  CyclicBarrier (2 , ()->{"发车了" 
注意线程池数量要和CyclicBarrier一样,否则可能出现同时两次都是task1的await触发