Java多线程篇(4)——wait/notify和park/unPark

2023-09-22 11:32:23

Object - wait/notify

object.wait()

在这里插入图片描述
ObjectSynchronizer::wait
在这里插入图片描述
从这段代码可以得到两个信息
1:wait() 底层是对象锁(就是synchronized底层实现的那个对象锁)。这也正是 wait/notify 要在同步代码块内的原因。
2:wait() 的调用会使得对象锁立马膨胀成重量级锁(因为需要使用mutex阻塞线程)。

ObjectMonitor::wait

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   //...
   
   // 封装成ObjectWaiter对象
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();
   
   //加入 WaitSet 
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;

   //...
   
   //释放当前线程占用的对象锁
   exit (true, Self) ;
   
   //...
   
   	   //阻塞当前线程
       if (node._notified == 0) {
         if (millis <= 0) {
            Self->_ParkEvent->park () ;
         } else {
            ret = Self->_ParkEvent->park (millis) ;
         }
       }

//...
}

wait主要干了三件事:
1:封装objectWaiter对象并加入 WaitSet
2:释放对象锁
3:调用 ParkEvent.park() 阻塞当前线程(底层调用 pthread_mutex_lock )


object.notify()

同理
ObjectSynchronizer::notify
在这里插入图片描述
ObjectMonitor::notify

void ObjectMonitor::notify(TRAPS) {
  //...

  //Policy 移动策略,默认为 2 
  int Policy = Knob_MoveNotifyee ;

  //取出waitSet的第一个
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;

  //根据 Policy 策略移动ObjectWaiter到cxq或者entryList或直接唤醒
  //  Policy == 0 :头插entrylist
  //  Policy == 1 :尾插entrylist
  //  Policy == 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列
  //  Policy == 3 :直接插入cxq 
  //  其他:直接唤醒线程,让线程直接调用enterI
  if (iterator != NULL) {
     //...

	 // Policy == 0 :头插entrylist
     if (Policy == 0) {
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     }
     // Policy == 1 :尾插entrylist
     else if (Policy == 1) {
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            ObjectWaiter * Tail ;
            for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
            assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
            Tail->_next = iterator ;
            iterator->_prev = Tail ;
            iterator->_next = NULL ;
        }
     } 
     // Policy == 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列
     else if (Policy == 2) {
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     }
     // Policy == 3 :直接插入cxq 
     else if (Policy == 3) {
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
                iterator->_next = NULL ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                   break ;
                }
            } else {
                while (Tail->_next != NULL) Tail = Tail->_next ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
                break ;
            }
        }
     }
     // 否则直接唤醒线程,让线程直接调用enterI
     else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;
        ev->unpark() ;
     }

     //...
}

notify主要就是根据 Policy 策略来决定以何种方式唤醒目标线程:
Policy = 0 :头插entrylist
Policy = 1 :尾插entrylist
Policy = 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列(默认策略)
Policy = 3 :直接插入cxq
其他:直接唤醒线程,让线程直接调用enterI


LockSupport - park/unpark

核心设计思想是 ”许可“,park消费许可,unpark生产许可(同一时间最多最有一个许可)。
_counter:许可
_con:条件变量
_mutex:互斥锁

LockSupport.park()

在这里插入图片描述
每个线程都内置了一个 parker,通过 Parker.park() 方法进行阻塞

Parker与ParkEvent的功能类型,在源码的注释中也提了,计划将Parker合并到ParkEvent
注释原文: In the future we’ll want to think about eliminating Parker and using ParkEvent instead. There’s considerable duplication between the two services.

Parker::park

void Parker::park(bool isAbsolute, jlong time) {
  //原子替换_counter为0,如果之前_counter为1则直接返回,不阻塞当前线程
  if (Atomic::xchg(0, &_counter) > 0) return;

  //...
  
  //如果线程被终止,也直接返回
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  //解析时间参数
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }

  //...

  //如果线程被终止或者获取mutex锁失败直接返回
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  //走到这里说明获mutex锁成功

  //获锁后再次检查_counter是否大于0,如果是直接消费许可,无需等待。
  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    OrderAccess::fence();
    return;
  }

  //...
  
  //调用 pthread_cond_wait 通过 _con 和 _mutex 配合使用阻塞当前线程直至满足_con条件
  if (time == 0) {
    _cur_index = REL_INDEX;
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  }
  //有超时时间的话,就调用 safe_cond_timedwait , 不管有没有满足条件,一旦超时都会唤醒
  else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  
  //设置_counter为0并释放 mutex 锁
  _counter = 0 ;
  status = pthread_mutex_unlock(_mutex) ;
  
  //...
}

pthread_cond_wait :阻塞当前线程,直至条件满足。并且阻塞后会自动释放锁,唤醒后又会自动获取锁。

为什么 pthread_cond_wait 需要搭配互斥锁使用?
条件不成立会进入阻塞,假如在进入阻塞的这个期间,条件又成立了,此时最终的结果就是条件成立,但一直在阻塞。同理,唤醒的时候也需搭配互斥锁才能保证不漏掉临界条件。

总结:
1:将许可置为0,同时检查之前许可是否为1,如果为1直接返回
2:获取mutex互斥锁
3:在条件变量上阻塞当前线程(阻塞会自动释放锁,唤醒会自动获取锁)
4:线程被唤醒后重置许可为0,并释放互斥锁


LockSupport.unPark()

同理
Parker::unpark

void Parker::unpark() {
  int s, status ;

  //获取mutex锁
  status = pthread_mutex_lock(_mutex);
  
  assert (status == 0, "invariant") ;
  s = _counter;
  
  //设置许可为1
  _counter = 1;
  
  //如果许可原本小于1表示线程可能被阻塞(parked),需要唤醒线程。
  if (s < 1) {
    
	//如果线程确实被阻塞(即 _cur_index 不等于-1)
	//调用 pthread_cond_signal 唤醒线程
    if (_cur_index != -1) {	
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        int index = _cur_index;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[index]);
        assert (status == 0, "invariant");
      }
    }
    
    //反之什么都不做,直接释放锁返回 
    else {
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  }
  
  //如果许可原本为1,什么都不做,直接释放锁返回 
  else {
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

总结:
1:获取互斥锁
2:置许可为1
3:唤醒在条件变量上等待的线程
4:释放互斥锁


wait/notify是一种等待/通知机制。等待啥?线程对互斥资源的等待。通知啥?通知线程互斥资源已经没人在用可以去抢占了。因为描述的是对互斥资源的竞争,所以wait/notify在object上(任何对象都可以是互斥资源)。
而park/unPark是一种对线程的精准控制,他更多的是描述线程之间的先后顺序(比如生产者线程和消费者线程)。

park/unPark相对于wait/notify
更直观,以thread为操作对象。
更精准,可以指定唤醒那个线程。
更简单,无需在synchronized代码块内。
更灵活,unpark方法可以在park方法前调用。

更多推荐

算法leetcode|76. 最小覆盖子串(rust重拳出击)

文章目录76.最小覆盖子串:样例1:样例2:样例3:提示:进阶:分析:在这里插入图片描述题解:rust:go:c++:python:java:76.最小覆盖子串:给你一个字符串s、一个字符串t。返回s中涵盖t所有字符的最小子串。如果s中不存在涵盖t所有字符的子串,则返回空字符串""。注意:对于t中重复字符,我们寻找的子

外包干了2个月,技术退步明显...

先说一下自己的情况,大专生,19年通过校招进入湖南某软件公司,干了接近4年的功能测试,今年8月份,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试,已经让我变得不思进取,谈了2年的女朋友也因为我的心态和工资和我分手了。于是,我决定要改变现状,冲击下大厂。刚开始准备

[C/C++]天天酷跑超详细教程-中篇

个人主页:北·海🎐CSDN新晋作者🎉欢迎👍点赞✍评论⭐收藏✨收录专栏:C/C++🤝希望作者的文章能对你有所帮助,有不足的地方请在评论区留言指正,大家一起学习交流!🤗天天酷跑,一款童年游戏,主要是进行跳跃操作,和躲避障碍物,中篇主要实现人物的下蹲,随机障碍物的生成以及优化main函数里面的sleep(30)一.

Python操作Excel教程(图文教程,超详细)Python xlwings模块详解,

「作者主页」:士别三日wyx「作者简介」:CSDNtop100、阿里云博客专家、华为云享专家、网络安全领域优质创作者「推荐专栏」:小白零基础《Python入门到精通》xlwings模块详解1、快速入门1、打开Excel2、创建工作簿2.1、使用工作簿2.2、操作工作簿3、创建工作表3.1、使用工作表3.2、操作工作表3

C语言练习题解析:挑战与突破,开启编程新篇章!(3)

💓博客主页:江池俊的博客⏩收录专栏:C语言刷题专栏👉专栏推荐:✅C语言初阶之路✅C语言进阶之路💻代码仓库:江池俊的代码仓库🎉欢迎大家点赞👍评论📝收藏⭐文章目录🌴选择题🌴编程题📌记负均正📌旋转数组的最小数字🎈前言:本专栏每篇练习将包括5个选择题+2个编程题,将涵盖C语言的不同方面,包括基础语法、数据

Linux:工具(vim,gcc/g++,make/Makefile,yum,git,gdb)

目录---工具功能1.vim1.1vim的模式1.2vim常见指令2.gcc/g++2.1预备知识2.2gcc的使用3.make,Makefilemake.Makefile的使用4.yum--yum三板斧5.git--git三板斧--Linux下提交代码到远程仓库6.gdb6.1gdb的常用指令学习目标:1.知道这些工

C语言:选择+编程(每日一练)

目录选择题:题一:题二:题三:题四:题五:编程题:题一:珠玑妙算思路一:题二:两数之和思路一:本人实力有限可能对一些地方解释和理解的不够清晰,可以自己尝试读代码,或者评论区指出错误,望海涵!感谢大佬们的一键三连!感谢大佬们的一键三连!感谢大佬们的一键三连!选择题:题一:1、有以下函数,该函数的功能是()intfun(c

python-wordcloud词云

导入模块fromwordcloudimportWordCloudimportjiebaimportimageioimportmatplotlib.pyplotaspltfromPILimportImageGrabimportnumpyasnpwordcloud以空格为分隔符号,来将文本分隔成单词PILpillow模块i

网络安全(黑客)自学

前言一、什么是网络安全网络安全可以基于攻击和防御视角来分类,我们经常听到的“红队”、“渗透测试”等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。无论网络、Web、移动、桌面、云等哪个领域,都有攻与防两面性,例如Web安全技术,既有Web渗透,也有Web防御技术(WAF)。作为一个合格的网络安全

Spring WebSocket 认证与授权:掌控安全通道,迈向巅峰之旅!

一、需要了解的事项http和WebSocket的安全链和安全配置是完全独立的。SpringAuthenticationProvider根本不参与Websocket身份验证。将要给出的示例中,身份验证不会发生在HTTP协商端点上,因为JavaScriptSTOMP(websocket)库不会随HTTP请求一起发送必要的身

ARM DAY3

硬件模块与总线连接:各种硬件模块(如GPIO控制器)与CPU(或内核)通过总线进行连接。这个总线负责数据和指令的传输。特殊功能寄存器(SFRs)的角色:每个硬件模块内部都有一组特殊功能寄存器(SFRs)。这些寄存器是硬件模块的一部分,用于存储该模块的当前状态和配置信息。它们在特定的内存地址中有其对应的位置。使用LDR读

热文推荐