You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

740 lines
111 KiB

<!DOCTYPE html><html lang="zh-Hans"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width"><meta name="theme-color" content="#222"><meta name="generator" content="Hexo 5.4.0"><link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png"><link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png"><link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png"><link rel="mask-icon" href="/images/logo.svg" color="#222"><meta name="google-site-verification" content="2X6S9P7CAjXjVvw8YyQR8pCu-B0oEu7O9quLgxXuWyA"><meta name="baidu-site-verification" content="dV8JGNzi0c"><script>setTimeout(function(){var d=document.createElement("script");d.setAttribute("data-ad-client","ca-pub-7480618470784058"),d.async=!0,d.src="https://pagead2.googlesyndication.com/pagead/js/adsbygoogle.js",document.body.appendChild(d)},5e3),window.addEventListener?window.addEventListener("load",downloadJsAtOnload,!1):window.attachEvent?window.attachEvent("onload",downloadJsAtOnload):window.onload=downloadJsAtOnload</script><link rel="stylesheet" href="/css/main.css"><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@fortawesome/fontawesome-free@5.15.3/css/all.min.css" integrity="sha256-2H3fkXt6FEmrReK448mDVGKb3WW2ZZw35gI7vqHOE4Y=" crossorigin="anonymous"><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@fancyapps/fancybox@3.5.7/dist/jquery.fancybox.min.css" integrity="sha256-Vzbj7sDDS/woiFS3uNKo8eIuni59rjyNGtXfstRzStA=" crossorigin="anonymous"><script class="next-config" data-name="main" type="application/json">{"hostname":"nicksxs.me","root":"/","images":"/images","scheme":"Pisces","version":"8.6.1","exturl":false,"sidebar":{"position":"left","display":"post","padding":18,"offset":12},"copycode":false,"bookmark":{"enable":false,"color":"#222","save":"auto"},"fancybox":true,"mediumzoom":false,"lazyload":true,"pangu":false,"comments":{"style":"tabs","active":null,"storage":true,"lazyload":false,"nav":null},"motion":{"enable":false,"async":false,"transition":{"post_block":"fadeIn","post_header":"fadeInDown","post_body":"fadeInDown","coll_header":"fadeInLeft","sidebar":"fadeInUp"}},"prism":false,"i18n":{"placeholder":"Searching...","empty":"We didn't find any results for the search: ${query}","hits_time":"${hits} results found in ${time} ms","hits":"${hits} results found"}}</script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/config.min.js"></script><meta name="description" content="首先看下官方的小 demo public static void main(String[] args) throws InterruptedException, MQClientException &amp;#123; &#x2F;* * Instantiate with specified consumer group name. * 首先是new"><meta property="og:type" content="article"><meta property="og:title" content="聊一下 RocketMQ 的 DefaultMQPushConsumer 源码"><meta property="og:url" content="https://nicksxs.me/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/index.html"><meta property="og:site_name" content="Nicksxs&#39;s Blog"><meta property="og:description" content="首先看下官方的小 demo public static void main(String[] args) throws InterruptedException, MQClientException &amp;#123; &#x2F;* * Instantiate with specified consumer group name. * 首先是new"><meta property="og:locale"><meta property="og:image" content="https://mystore-1255223546.cos.ap-chengdu.myqcloud.com/uPic/QdeiVv.png"><meta property="article:published_time" content="2020-06-26T14:21:20.000Z"><meta property="article:modified_time" content="2020-06-26T14:21:20.000Z"><meta property="article:author" content="Nicksxs"><meta property="article:tag" content="MQ"><meta property="article:tag" content="消息队列"><meta property="article:tag" content="RocketMQ"><meta property="article:tag" content="削峰填谷"><meta property="article:tag" content="中间件"><meta property="article:tag" content="源码解析"><meta property="article:tag" content="DefaultMQPushConsumer"><meta name="twitter:card" content="summary"><meta name="twitter:image" content="https://mystore-1255223546.cos.ap-chengdu.myqcloud.com/uPic/QdeiVv.png"><link rel="canonical" href="https://nicksxs.me/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/"><script class="next-config" data-name="page" type="application/json">{"sidebar":"","isHome":false,"isPost":true,"lang":"zh-Hans","comments":true,"permalink":"https://nicksxs.me/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/","path":"2020/06/26/聊一下-RocketMQ-的-Consumer/","title":"聊一下 RocketMQ 的 DefaultMQPushConsumer 源码"}</script><script class="next-config" data-name="calendar" type="application/json">""</script><title>聊一下 RocketMQ 的 DefaultMQPushConsumer 源码 | Nicksxs's Blog</title><script async src="https://www.googletagmanager.com/gtag/js?id=UA-61358619-1"></script><script class="next-config" data-name="google_analytics" type="application/json">{"tracking_id":"UA-61358619-1","only_pageview":false}</script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/third-party/analytics/google-analytics.min.js"></script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/third-party/analytics/baidu-analytics.min.js"></script><script async src="https://hm.baidu.com/hm.js?20f33b3c0c0eff9b1522999c0015646d"></script><noscript><link rel="stylesheet" href="/css/noscript.css"></noscript><link rel="alternate" href="/atom.xml" title="Nicksxs's Blog" type="application/atom+xml"></head><body itemscope itemtype="http://schema.org/WebPage"><div class="headband"></div><main class="main"><header class="header" itemscope itemtype="http://schema.org/WPHeader"><div class="header-inner"><div class="site-brand-container"><div class="site-nav-toggle"><div class="toggle" aria-label="Toggle navigation bar" role="button"><span class="toggle-line"></span> <span class="toggle-line"></span> <span class="toggle-line"></span></div></div><div class="site-meta"><a href="/" class="brand" rel="start"><i class="logo-line"></i><h1 class="site-title">Nicksxs's Blog</h1><i class="logo-line"></i></a><p class="site-subtitle" itemprop="description">What hurts more, the pain of hard work or the pain of regret?</p></div><div class="site-nav-right"><div class="toggle popup-trigger"></div></div></div><nav class="site-nav"><ul class="main-menu menu"><li class="menu-item menu-item-home"><a href="/" rel="section"><i class="fa fa-home fa-fw"></i>Home</a></li><li class="menu-item menu-item-about"><a href="/about/" rel="section"><i class="fa fa-user fa-fw"></i>About</a></li><li class="menu-item menu-item-tags"><a href="/tags/" rel="section"><i class="fa fa-tags fa-fw"></i>Tags</a></li><li class="menu-item menu-item-categories"><a href="/categories/" rel="section"><i class="fa fa-th fa-fw"></i>Categories</a></li><li class="menu-item menu-item-archives"><a href="/archives/" rel="section"><i class="fa fa-archive fa-fw"></i>Archives</a></li><li class="menu-item menu-item-sitemap"><a href="/sitemap.xml" rel="section"><i class="fa fa-sitemap fa-fw"></i>Sitemap</a></li><li class="menu-item menu-item-commonweal"><a href="/404/" rel="section"><i class="fa fa-heartbeat fa-fw"></i>Commonweal 404</a></li></ul></nav></div><div class="toggle sidebar-toggle" role="button"><span class="toggle-line"></span> <span class="toggle-line"></span> <span class="toggle-line"></span></div><aside class="sidebar"><div class="sidebar-inner sidebar-overview-active"><ul class="sidebar-nav"><li class="sidebar-nav-toc">Table of Contents</li><li class="sidebar-nav-overview">Overview</li></ul><div class="sidebar-panel-container"><div class="post-toc-wrap sidebar-panel"></div><div class="site-overview-wrap sidebar-panel"><div class="site-overview"><div class="site-author site-overview-item animated" itemprop="author" itemscope itemtype="http://schema.org/Person"><img class="site-author-image" itemprop="image" alt="Nicksxs" src="/uploads/avatar.jpg"><p class="site-author-name" itemprop="name">Nicksxs</p><div class="site-description" itemprop="description">learn from zero,技术博客,Nicksxs,史学森</div></div><div class="site-state-wrap site-overview-item animated"><nav class="site-state"><div class="site-state-item site-state-posts"><a href="/archives/"><span class="site-state-item-count">119</span> <span class="site-state-item-name">posts</span></a></div><div class="site-state-item site-state-categories"><a href="/categories/"><span class="site-state-item-count">136</span> <span class="site-state-item-name">categories</span></a></div><div class="site-state-item site-state-tags"><a href="/tags/"><span class="site-state-item-count">246</span> <span class="site-state-item-name">tags</span></a></div></nav></div><div class="links-of-author site-overview-item animated"><span class="links-of-author-item"><a href="https://github.com/nicksxs" title="GitHub → https:&#x2F;&#x2F;github.com&#x2F;nicksxs" rel="noopener" target="_blank"><i class="fab fa-github fa-fw"></i>GitHub</a> </span><span class="links-of-author-item"><a href="mailto:nicksxs1202@gmail.com" title="E-Mail → mailto:nicksxs1202@gmail.com" rel="noopener" target="_blank"><i class="fa fa-envelope fa-fw"></i>E-Mail</a></span></div><div class="cc-license site-overview-item animated" itemprop="license"><a href="https://creativecommons.org/licenses/by-nc-sa/4.0/" class="cc-opacity" rel="noopener" target="_blank"><img src="https://cdn.jsdelivr.net/npm/@creativecommons/vocabulary@2020.11.3/assets/license_badges/small/by_nc_sa.svg" alt="Creative Commons"></a></div><script charset="utf-8" src="/js/tagcloud.js"></script><script charset="utf-8" src="/js/tagcanvas.js"></script><div class="widget-wrap"><div id="myCanvasContainer" class="widget tagcloud"><canvas width="250" height="250" id="resCanvas"><ul class="tag-list" itemprop="keywords"><li class="tag-list-item"><a class="tag-list-link" href="/tags/2019/" rel="tag">2019</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/2020/" rel="tag">2020</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/2021/" rel="tag">2021</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/2PC/" rel="tag">2PC</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/360-%E5%85%A8%E5%AE%B6%E6%A1%B6/" rel="tag">360 全家桶</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/3PC/" rel="tag">3PC</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/AOP/" rel="tag">AOP</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Adaptive/" rel="tag">Adaptive</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Apollo/" rel="tag">Apollo</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/AutoConfiguration/" rel="tag">AutoConfiguration</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Binary-Tree/" rel="tag">Binary Tree</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Broker/" rel="tag">Broker</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/C/" rel="tag">C</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/C/" rel="tag">C++</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/CachedThreadPool/" rel="tag">CachedThreadPool</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Comparator/" rel="tag">Comparator</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/DFS/" rel="tag">DFS</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/DP/" rel="tag">DP</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/DefaultMQPushConsumer/" rel="tag">DefaultMQPushConsumer</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Design-Patterns/" rel="tag">Design Patterns</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Distributed-Lock/" rel="tag">Distributed Lock</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Docker/" rel="tag">Docker</a><span class="tag-list-count">4</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Dockerfile/" rel="tag">Dockerfile</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Dubbo/" rel="tag">Dubbo</a><span class="tag-list-count">4</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/EagerThreadPool/" rel="tag">EagerThreadPool</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Evict/" rel="tag">Evict</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Filter/" rel="tag">Filter</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/FixedThreadPool/" rel="tag">FixedThreadPool</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/G1/" rel="tag">G1</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/GC/" rel="tag">GC</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Garbage-First-Collector/" rel="tag">Garbage-First Collector</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Gogs/" rel="tag">Gogs</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Homebrew/" rel="tag">Homebrew</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Inorder-Traversal/" rel="tag">Inorder Traversal</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Interceptor/" rel="tag">Interceptor</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/JMap/" rel="tag">JMap</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/JPS/" rel="tag">JPS</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/JStack/" rel="tag">JStack</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/JVM/" rel="tag">JVM</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Java/" rel="tag">Java</a><span class="tag-list-count">19</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Leetcode-42/" rel="tag">Leetcode 42</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/LimitedThreadPool/" rel="tag">LimitedThreadPool</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Linked-List/" rel="tag">Linked List</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Lowest-Common-Ancestor-of-a-Binary-Tree/" rel="tag">Lowest Common Ancestor of a Binary Tree</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/MQ/" rel="tag">MQ</a><span class="tag-list-count">6</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Mac/" rel="tag">Mac</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Maven/" rel="tag">Maven</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Mybatis/" rel="tag">Mybatis</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Mysql/" rel="tag">Mysql</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/NameServer/" rel="tag">NameServer</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/PHP/" rel="tag">PHP</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Preorder-Traversal/" rel="tag">Preorder Traversal</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/RPC/" rel="tag">RPC</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Redis/" rel="tag">Redis</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/RocketMQ/" rel="tag">RocketMQ</a><span class="tag-list-count">6</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Rotate-Image/" rel="tag">Rotate Image</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Rust/" rel="tag">Rust</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/SPI/" rel="tag">SPI</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Servlet/" rel="tag">Servlet</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Singleton/" rel="tag">Singleton</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Spring/" rel="tag">Spring</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/SpringBoot/" rel="tag">SpringBoot</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Sql%E6%B3%A8%E5%85%A5/" rel="tag">Sql注入</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Stream/" rel="tag">Stream</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Synchronized/" rel="tag">Synchronized</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Thread-dump/" rel="tag">Thread dump</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/ThreadLocal/" rel="tag">ThreadLocal</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/ThreadPool/" rel="tag">ThreadPool</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Tomcat/" rel="tag">Tomcat</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Trapping-Rain-Water/" rel="tag">Trapping Rain Water</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/WeakReference/" rel="tag">WeakReference</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Web/" rel="tag">Web</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/Webhook/" rel="tag">Webhook</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/aqs/" rel="tag">aqs</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/await/" rel="tag">await</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/bloom-filter/" rel="tag">bloom filter</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/c/" rel="tag">c++</a><span class="tag-list-count">14</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/cgroup/" rel="tag">cgroup</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/cluster/" rel="tag">cluster</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/condition/" rel="tag">condition</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/docker/" rel="tag">docker</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/dp/" rel="tag">dp</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/echo/" rel="tag">echo</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/environment/" rel="tag">environment</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/gap-lock/" rel="tag">gap lock</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/gc/" rel="tag">gc</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/grep/" rel="tag">grep</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/hadoop/" rel="tag">hadoop</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/icu4c/" rel="tag">icu4c</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/im/" rel="tag">im</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/is-not-null/" rel="tag">is not null</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/is-null/" rel="tag">is null</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/j-u-c/" rel="tag">j.u.c</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/java/" rel="tag">java</a><span class="tag-list-count">16</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/jvm/" rel="tag">jvm</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/leetcode/" rel="tag">leetcode</a><span class="tag-list-count">25</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/leetcode-155/" rel="tag">leetcode 155</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/linked-list/" rel="tag">linked list</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/linux/" rel="tag">linux</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/lock/" rel="tag">lock</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/mfc/" rel="tag">mfc</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/min-stack/" rel="tag">min stack</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/mq/" rel="tag">mq</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/mvcc/" rel="tag">mvcc</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/mysql/" rel="tag">mysql</a><span class="tag-list-count">5</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/namespace/" rel="tag">namespace</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/next-key-lock/" rel="tag">next-key lock</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/nginx/" rel="tag">nginx</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/nullsfirst/" rel="tag">nullsfirst</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/openresty/" rel="tag">openresty</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/php/" rel="tag">php</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/procedure/" rel="tag">procedure</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/python/" rel="tag">python</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/read-view/" rel="tag">read view</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/redis/" rel="tag">redis</a><span class="tag-list-count">11</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/signal/" rel="tag">signal</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/sort/" rel="tag">sort</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/spark/" rel="tag">spark</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/stack/" rel="tag">stack</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/string/" rel="tag">string</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/swoole/" rel="tag">swoole</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/top/" rel="tag">top</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/uname/" rel="tag">uname</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/unlock/" rel="tag">unlock</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/value/" rel="tag">value</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/websocket/" rel="tag">websocket</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/zsh/" rel="tag">zsh</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%89%E9%98%B6%E6%AE%B5%E6%8F%90%E4%BA%A4/" rel="tag">三阶段提交</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%8D%E5%8F%AF%E5%8F%98%E5%BC%95%E7%94%A8/" rel="tag">不可变引用</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%9C%E4%BA%AC%E5%A5%A5%E8%BF%90%E4%BC%9A/" rel="tag">东京奥运会</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%A4%E9%98%B6%E6%AE%B5%E6%8F%90%E4%BA%A4/" rel="tag">两阶段提交</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%AD%E5%B1%B1%E8%B7%AF/" rel="tag">中山路</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%AD%E5%BA%8F/" rel="tag">中序</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%AD%E9%97%B4%E4%BB%B6/" rel="tag">中间件</a><span class="tag-list-count">4</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B8%BE%E9%87%8D/" rel="tag">举重</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%B9%92%E4%B9%93%E7%90%83/" rel="tag">乒乓球</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%BA%8C%E5%8F%89%E6%A0%91/" rel="tag">二叉树</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%BA%92%E6%96%A5%E9%94%81/" rel="tag">互斥锁</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%BB%A3%E7%A0%81%E9%A2%98%E8%A7%A3/" rel="tag">代码题解</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E4%BF%AE%E7%94%B5%E8%84%91%E7%9A%84/" rel="tag">修电脑的</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%81%8F%E5%90%91%E9%94%81/" rel="tag">偏向锁</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%81%A5%E5%BA%B7%E7%A0%81/" rel="tag">健康码</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%85%AC%E4%BA%A4/" rel="tag">公交</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%85%AC%E4%BA%A4%E8%BD%A6/" rel="tag">公交车</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%86%85%E5%AD%98%E5%88%86%E5%B8%83/" rel="tag">内存分布</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%86%85%E5%AD%98%E6%B3%84%E6%BC%8F/" rel="tag">内存泄漏</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%87%86%E5%A4%87/" rel="tag">准备</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%87%8F%E8%82%A5/" rel="tag">减肥</a><span class="tag-list-count">6</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1/" rel="tag">分布式事务</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81/" rel="tag">分布式锁</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%88%87%E7%89%87/" rel="tag">切片</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%88%9D%E5%A7%8B%E5%8C%96/" rel="tag">初始化</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%89%8A%E5%B3%B0%E5%A1%AB%E8%B0%B7/" rel="tag">削峰填谷</a><span class="tag-list-count">4</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%89%8D%E5%BA%8F/" rel="tag">前序</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8A%A0%E5%A1%9E/" rel="tag">加塞</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8A%A0%E8%BD%BD/" rel="tag">加载</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8D%95%E4%BE%8B/" rel="tag">单例</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8D%9A%E5%AE%A2%EF%BC%8C%E6%96%87%E7%AB%A0/" rel="tag">博客,文章</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8E%A6%E9%97%A8/" rel="tag">厦门</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8F%8C%E4%BA%B2%E5%A7%94%E6%B4%BE/" rel="tag">双亲委派</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8F%91%E8%A1%8C%E7%89%88/" rel="tag">发行版</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8F%A3%E7%BD%A9/" rel="tag">口罩</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%8F%AF%E5%8F%98%E5%BC%95%E7%94%A8/" rel="tag">可变引用</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%90%90%E6%A7%BD/" rel="tag">吐槽</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%9E%83%E5%9C%BE%E5%9B%9E%E6%94%B6/" rel="tag">垃圾回收</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%9F%BA%E7%A1%80%E8%AE%BE%E6%96%BD/" rel="tag">基础设施</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%AE%89%E5%85%A8/" rel="tag">安全</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%AE%B9%E9%94%99%E6%9C%BA%E5%88%B6/" rel="tag">容错机制</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%AF%84%E7%94%9F%E8%99%AB/" rel="tag">寄生虫</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B0%84%E5%87%BB/" rel="tag">射击</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B0%8F%E6%8A%80%E5%B7%A7/" rel="tag">小技巧</a><span class="tag-list-count">4</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B1%80%E5%8F%A3%E8%A1%97/" rel="tag">局口街</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B8%83%E9%9A%86%E8%BF%87%E6%BB%A4%E5%99%A8/" rel="tag">布隆过滤器</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B9%B2%E6%B4%BB/" rel="tag">干活</a><span class="tag-list-count">5</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B9%B4%E4%B8%AD%E6%80%BB%E7%BB%93/" rel="tag">年中总结</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B9%B4%E7%BB%88%E6%80%BB%E7%BB%93/" rel="tag">年终总结</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B9%B6%E5%8F%91/" rel="tag">并发</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B9%B8%E7%A6%8F%E4%BA%86%E5%90%97/" rel="tag">幸福了吗</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%B9%BB%E8%AF%BB/" rel="tag">幻读</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%BA%94%E7%94%A8/" rel="tag">应用</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%BC%80%E8%BD%A6/" rel="tag">开车</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%BC%B1%E5%BC%95%E7%94%A8/" rel="tag">弱引用</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E5%BD%B1%E8%AF%84/" rel="tag">影评</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%89%80%E6%9C%89%E6%9D%83/" rel="tag">所有权</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%89%93%E5%8D%A1/" rel="tag">打卡</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%89%B6%E6%A2%AF/" rel="tag">扶梯</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%8A%80%E6%9C%AF/" rel="tag">技术</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%8B%96%E6%9B%B4/" rel="tag">拖更</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%8E%92%E5%BA%8F/" rel="tag">排序</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%8E%A5%E9%9B%A8%E6%B0%B4/" rel="tag">接雨水</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84/" rel="tag">数据结构</a><span class="tag-list-count">11</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%96%B0%E8%AF%AD%E8%A8%80/" rel="tag">新语言</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%97%85%E6%B8%B8/" rel="tag">旅游</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%9B%BE%E5%8E%9D%E5%9E%B5/" rel="tag">曾厝垵</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%9C%80%E5%B0%8F%E6%A0%88/" rel="tag">最小栈</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%9D%80%E4%BA%BA%E8%AF%9B%E5%BF%83/" rel="tag">杀人诛心</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%9D%AD%E5%B7%9E/" rel="tag">杭州</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%A0%87%E8%AE%B0%E6%95%B4%E7%90%86/" rel="tag">标记整理</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%A4%8D%E7%89%A9%E5%9B%AD/" rel="tag">植物园</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%B2%99%E8%8C%B6%E9%9D%A2/" rel="tag">沙茶面</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%B3%A8%E8%A7%A3/" rel="tag">注解</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%B5%B7%E8%9B%8E%E7%85%8E/" rel="tag">海蛎煎</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/" rel="tag">消息队列</a><span class="tag-list-count">6</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%B7%98%E6%B1%B0%E7%AD%96%E7%95%A5/" rel="tag">淘汰策略</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%BA%90%E7%A0%81/" rel="tag">源码</a><span class="tag-list-count">11</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/" rel="tag">源码解析</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%94%9F%E6%B4%BB/" rel="tag">生活</a><span class="tag-list-count">21</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%94%B5%E7%93%B6%E8%BD%A6/" rel="tag">电瓶车</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%96%AB%E6%83%85/" rel="tag">疫情</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%9F%A9%E9%98%B5/" rel="tag">矩阵</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%B1%BB%E5%8A%A0%E8%BD%BD/" rel="tag">类加载</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%B3%9F%E5%BF%83%E4%BA%8B/" rel="tag">糟心事</a><span class="tag-list-count">4</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%B4%A2%E5%BC%95/" rel="tag">索引</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%BA%BF%E7%A8%8B%E6%B1%A0/" rel="tag">线程池</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%BC%93%E5%AD%98/" rel="tag">缓存</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%BC%93%E5%AD%98%E5%87%BB%E7%A9%BF/" rel="tag">缓存击穿</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%BC%93%E5%AD%98%E7%A9%BF%E9%80%8F/" rel="tag">缓存穿透</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%BC%93%E5%AD%98%E9%9B%AA%E5%B4%A9/" rel="tag">缓存雪崩</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E7%BE%8E%E5%9B%BD/" rel="tag">美国</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%80%81%E7%94%B5%E8%84%91/" rel="tag">老电脑</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%87%AA%E5%8A%A8%E8%A3%85%E9%85%8D/" rel="tag">自动装配</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%87%AA%E6%97%8B/" rel="tag">自旋</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%87%AA%E9%80%82%E5%BA%94%E6%8B%93%E5%B1%95/" rel="tag">自适应拓展</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%A3%85%E7%94%B5%E8%84%91/" rel="tag">装电脑</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%A7%84%E5%88%99/" rel="tag">规则</a><span class="tag-list-count">3</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%A7%A3%E6%9E%90/" rel="tag">解析</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%AE%BE%E8%AE%A1%E6%A8%A1%E5%BC%8F/" rel="tag">设计模式</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%AF%BB%E4%B9%A6/" rel="tag">读书</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%AF%BB%E5%90%8E%E6%84%9F/" rel="tag">读后感</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%B6%B3%E7%90%83/" rel="tag">足球</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%B7%91%E6%AD%A5/" rel="tag">跑步</a><span class="tag-list-count">6</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%B7%AF%E6%94%BF%E8%A7%84%E5%88%92/" rel="tag">路政规划</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%B7%B3%E6%B0%B4/" rel="tag">跳水</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%B8%A9%E8%B8%8F/" rel="tag">踩踏</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%BD%AC%E4%B9%89/" rel="tag">转义</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%BD%BB%E9%87%8F%E7%BA%A7%E9%94%81/" rel="tag">轻量级锁</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%BF%87%E6%9C%9F%E7%AD%96%E7%95%A5/" rel="tag">过期策略</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E8%BF%90%E5%8A%A8/" rel="tag">运动</a><span class="tag-list-count">8</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%80%92%E5%BD%92/" rel="tag">递归</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%87%8D%E9%87%8F%E7%BA%A7%E9%94%81/" rel="tag">重量级锁</a><span class="tag-list-count">2</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%93%BE%E6%8E%A5/" rel="tag">链接</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%A2%98%E8%A7%A3/" rel="tag">题解</a><span class="tag-list-count">11</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%A9%AC%E6%88%8F%E5%9B%A2/" rel="tag">马戏团</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%AA%8C%E8%AF%81/" rel="tag">验证</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%AB%98%E9%80%9F/" rel="tag">高速</a><span class="tag-list-count">1</span></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/%E9%BC%93%E6%B5%AA%E5%B1%BF/" rel="tag">鼓浪屿</a><span class="tag-list-count">1</span></li></ul></canvas></div></div><div class="links-of-blogroll site-overview-item animated"><div class="links-of-blogroll-title"><i class="fa fa-globe fa-fw"></i> Links</div><ul class="links-of-blogroll-list"><li class="links-of-blogroll-item"><a href="https://covermusic.cn/" title="https:&#x2F;&#x2F;covermusic.cn" rel="noopener" target="_blank">69伙伴</a></li></ul></div></div></div></div></div></aside><div class="sidebar-dimmer"></div></header><div class="back-to-top" role="button" aria-label="Back to top"><i class="fa fa-arrow-up"></i> <span>0%</span></div><a href="https://github.com/nicksxs" class="github-corner" title="Follow me on GitHub" aria-label="Follow me on GitHub" rel="noopener" target="_blank"><svg width="80" height="80" viewBox="0 0 250 250" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin:130px 106px" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg></a><noscript><div class="noscript-warning">Theme NexT works best with JavaScript enabled</div></noscript><div class="main-inner post posts-expand"><div class="post-block"><article itemscope itemtype="http://schema.org/Article" class="post-content" lang="zh-Hans"><link itemprop="mainEntityOfPage" href="https://nicksxs.me/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/"><span hidden itemprop="author" itemscope itemtype="http://schema.org/Person"><meta itemprop="image" content="/uploads/avatar.jpg"><meta itemprop="name" content="Nicksxs"><meta itemprop="description" content="learn from zero,技术博客,Nicksxs,史学森"></span><span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization"><meta itemprop="name" content="Nicksxs's Blog"></span><header class="post-header"><h1 class="post-title" itemprop="name headline">聊一下 RocketMQ 的 DefaultMQPushConsumer 源码</h1><div class="post-meta-container"><div class="post-meta"><span class="post-meta-item"><span class="post-meta-item-icon"><i class="far fa-calendar"></i> </span><span class="post-meta-item-text">Posted on</span> <time title="Created: 2020-06-26 22:21:20" itemprop="dateCreated datePublished" datetime="2020-06-26T22:21:20+08:00">2020-06-26</time> </span><span class="post-meta-item"><span class="post-meta-item-icon"><i class="far fa-folder"></i> </span><span class="post-meta-item-text">In</span> <span itemprop="about" itemscope itemtype="http://schema.org/Thing"><a href="/categories/MQ/" itemprop="url" rel="index"><span itemprop="name">MQ</span></a> </span>, <span itemprop="about" itemscope itemtype="http://schema.org/Thing"><a href="/categories/RocketMQ/" itemprop="url" rel="index"><span itemprop="name">RocketMQ</span></a> </span>, <span itemprop="about" itemscope itemtype="http://schema.org/Thing"><a href="/categories/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/" itemprop="url" rel="index"><span itemprop="name">消息队列</span></a> </span>, <span itemprop="about" itemscope itemtype="http://schema.org/Thing"><a href="/categories/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/RocketMQ/" itemprop="url" rel="index"><span itemprop="name">RocketMQ</span></a> </span>, <span itemprop="about" itemscope itemtype="http://schema.org/Thing"><a href="/categories/%E4%B8%AD%E9%97%B4%E4%BB%B6/" itemprop="url" rel="index"><span itemprop="name">中间件</span></a> </span>, <span itemprop="about" itemscope itemtype="http://schema.org/Thing"><a href="/categories/%E4%B8%AD%E9%97%B4%E4%BB%B6/RocketMQ/" itemprop="url" rel="index"><span itemprop="name">RocketMQ</span></a> </span></span><span id="/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/" class="post-meta-item leancloud_visitors" data-flag-title="聊一下 RocketMQ 的 DefaultMQPushConsumer 源码" title="Views"><span class="post-meta-item-icon"><i class="far fa-eye"></i> </span><span class="post-meta-item-text">Views: </span><span class="leancloud-visitors-count"></span> </span><span class="post-meta-item" title="Views" id="busuanzi_container_page_pv"><span class="post-meta-item-icon"><i class="far fa-eye"></i> </span><span class="post-meta-item-text">Views: </span><span id="busuanzi_value_page_pv"></span> </span><span class="post-meta-item"><span class="post-meta-item-icon"><i class="far fa-comment"></i> </span><span class="post-meta-item-text">Disqus: </span><a title="disqus" href="/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/#disqus_thread" itemprop="discussionUrl"><span class="post-comments-count disqus-comment-count" data-disqus-identifier="2020/06/26/聊一下-RocketMQ-的-Consumer/" itemprop="commentCount"></span></a></span></div></div></header><div class="post-body" itemprop="articleBody"><p>首先看下官方的小 demo</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public static void main(String[] args) throws InterruptedException, MQClientException &#123;
&#x2F;*
* Instantiate with specified consumer group name.
* 首先是new 一个对象出来,然后指定 Consumer 的 Group
* 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
*&#x2F;
DefaultMQPushConsumer consumer &#x3D; new DefaultMQPushConsumer(&quot;please_rename_unique_group_name_4&quot;);
&#x2F;*
* Specify name server addresses.
* &lt;p&#x2F;&gt;
* 这里可以通知指定环境变量或者设置对象参数的形式指定名字空间服务的地址
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* &lt;pre&gt;
* &#123;@code
* consumer.setNamesrvAddr(&quot;name-server1-ip:9876;name-server2-ip:9876&quot;);
* &#125;
* &lt;&#x2F;pre&gt;
*&#x2F;
&#x2F;*
* Specify where to start in case the specified consumer group is a brand new one.
* 指定消费起始点
*&#x2F;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
&#x2F;*
* Subscribe one more more topics to consume.
* 指定订阅的 topic 跟 tag,注意后面的是个表达式,可以以 tag1 || tag2 || tag3 传入
*&#x2F;
consumer.subscribe(&quot;TopicTest&quot;, &quot;*&quot;);
&#x2F;*
* Register callback to execute on arrival of messages fetched from brokers.
* 注册具体获得消息后的处理方法
*&#x2F;
consumer.registerMessageListener(new MessageListenerConcurrently() &#123;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs,
ConsumeConcurrentlyContext context) &#123;
System.out.printf(&quot;%s Receive New Messages: %s %n&quot;, Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
&#125;
&#125;);
&#x2F;*
* Launch the consumer instance.
* 启动消费者
*&#x2F;
consumer.start();
System.out.printf(&quot;Consumer Started.%n&quot;);
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>然后就是看看 start 的过程了</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">&#x2F;**
* This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
*
* @throws MQClientException if there is any client error.
*&#x2F;
@Override
public void start() throws MQClientException &#123;
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null !&#x3D; traceDispatcher) &#123;
try &#123;
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
&#125; catch (MQClientException e) &#123;
log.warn(&quot;trace dispatcher start failed &quot;, e);
&#125;
&#125;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>具体的逻辑在<code>this.defaultMQPushConsumerImpl.start()</code>,这个 defaultMQPushConsumerImpl 就是</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">&#x2F;**
* Internal implementation. Most of the functions herein are delegated to it.
*&#x2F;
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span></span></code></pre><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public synchronized void start() throws MQClientException &#123;
switch (this.serviceState) &#123;
case CREATE_JUST:
log.info(&quot;the consumer [&#123;&#125;] start beginning. messageModel&#x3D;&#123;&#125;, isUnitMode&#x3D;&#123;&#125;&quot;, this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
&#x2F;&#x2F; 这里比较巧妙,相当于想设立了个屏障,防止并发启动,不过这里并不是悲观锁,也不算个严格的乐观锁
this.serviceState &#x3D; ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() &#x3D;&#x3D; MessageModel.CLUSTERING) &#123;
this.defaultMQPushConsumer.changeInstanceNameToPID();
&#125;
&#x2F;&#x2F; 这个mQClientFactory,负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成
this.mQClientFactory &#x3D; MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
&#x2F;&#x2F; 这个 rebalanceImpl 主要负责决定,当前的consumer应该从哪些Queue中消费消息;
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
&#x2F;&#x2F; 长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑
this.pullAPIWrapper &#x3D; new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() !&#x3D; null) &#123;
this.offsetStore &#x3D; this.defaultMQPushConsumer.getOffsetStore();
&#125; else &#123;
switch (this.defaultMQPushConsumer.getMessageModel()) &#123;
case BROADCASTING:
this.offsetStore &#x3D; new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore &#x3D; new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
&#125;
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
&#125;
&#x2F;&#x2F; offsetStore 维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) &#123;
this.consumeOrderly &#x3D; true;
this.consumeMessageService &#x3D;
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
&#125; else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) &#123;
this.consumeOrderly &#x3D; false;
this.consumeMessageService &#x3D;
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
&#125;
&#x2F;&#x2F; 实现所谓的&quot;Push-被动&quot;消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息;
this.consumeMessageService.start();
boolean registerOK &#x3D; mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) &#123;
this.serviceState &#x3D; ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException(&quot;The consumer group[&quot; + this.defaultMQPushConsumer.getConsumerGroup()
+ &quot;] has been created before, specify another name please.&quot; + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
&#125;
mQClientFactory.start();
log.info(&quot;the consumer [&#123;&#125;] start OK.&quot;, this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState &#x3D; ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(&quot;The PushConsumer service state not OK, maybe started once, &quot;
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
&#125;
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>然后我们往下看主要的目光聚焦<code>mQClientFactory.start()</code></p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public void start() throws MQClientException &#123;
synchronized (this) &#123;
switch (this.serviceState) &#123;
case CREATE_JUST:
this.serviceState &#x3D; ServiceState.START_FAILED;
&#x2F;&#x2F; If not specified,looking address from name server
if (null &#x3D;&#x3D; this.clientConfig.getNamesrvAddr()) &#123;
this.mQClientAPIImpl.fetchNameServerAddr();
&#125;
&#x2F;&#x2F; Start request-response channel
&#x2F;&#x2F; 这里主要是初始化了个网络客户端
this.mQClientAPIImpl.start();
&#x2F;&#x2F; Start various schedule tasks
&#x2F;&#x2F; 定时任务
this.startScheduledTask();
&#x2F;&#x2F; Start pull service
&#x2F;&#x2F; 这里重点说下
this.pullMessageService.start();
&#x2F;&#x2F; Start rebalance service
this.rebalanceService.start();
&#x2F;&#x2F; Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info(&quot;the client factory [&#123;&#125;] start OK&quot;, this.clientId);
this.serviceState &#x3D; ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException(&quot;The Factory object[&quot; + this.getClientId() + &quot;] has been created before, and failed.&quot;, null);
default:
break;
&#125;
&#125;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>我们来看下这个 pullMessageService,org.apache.rocketmq.client.impl.consumer.PullMessageService,<br><img data-src="https://mystore-1255223546.cos.ap-chengdu.myqcloud.com/uPic/QdeiVv.png"><br>实现了 runnable 接口,<br>然后可以看到 run 方法</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public void run() &#123;
log.info(this.getServiceName() + &quot; service started&quot;);
while (!this.isStopped()) &#123;
try &#123;
PullRequest pullRequest &#x3D; this.pullRequestQueue.take();
this.pullMessage(pullRequest);
&#125; catch (InterruptedException ignored) &#123;
&#125; catch (Exception e) &#123;
log.error(&quot;Pull Message Service Run Method exception&quot;, e);
&#125;
&#125;
log.info(this.getServiceName() + &quot; service end&quot;);
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>接着在看 pullMessage 方法</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">private void pullMessage(final PullRequest pullRequest) &#123;
final MQConsumerInner consumer &#x3D; this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer !&#x3D; null) &#123;
DefaultMQPushConsumerImpl impl &#x3D; (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
&#125; else &#123;
log.warn(&quot;No matched consumer for the PullRequest &#123;&#125;, drop it&quot;, pullRequest);
&#125;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>实际上调用了这个方法,这个方法很长,我在代码里注释下下每一段的功能</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public void pullMessage(final PullRequest pullRequest) &#123;
final ProcessQueue processQueue &#x3D; pullRequest.getProcessQueue();
&#x2F;&#x2F; 这里开始就是检查状态,确定是否往下执行
if (processQueue.isDropped()) &#123;
log.info(&quot;the pull request[&#123;&#125;] is dropped.&quot;, pullRequest.toString());
return;
&#125;
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try &#123;
this.makeSureStateOK();
&#125; catch (MQClientException e) &#123;
log.warn(&quot;pullMessage exception, consumer state not ok&quot;, e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
&#125;
if (this.isPause()) &#123;
log.warn(&quot;consumer was paused, execute pull request later. instanceName&#x3D;&#123;&#125;, group&#x3D;&#123;&#125;&quot;, this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
&#125;
&#x2F;&#x2F; 这块其实是个类似于限流的功能块,对消息数量和消息大小做限制
long cachedMessageCount &#x3D; processQueue.getMsgCount().get();
long cachedMessageSizeInMiB &#x3D; processQueue.getMsgSize().get() &#x2F; (1024 * 1024);
if (cachedMessageCount &gt; this.defaultMQPushConsumer.getPullThresholdForQueue()) &#123;
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) &#x3D;&#x3D; 0) &#123;
log.warn(
&quot;the cached message count exceeds the threshold &#123;&#125;, so do flow control, minOffset&#x3D;&#123;&#125;, maxOffset&#x3D;&#123;&#125;, count&#x3D;&#123;&#125;, size&#x3D;&#123;&#125; MiB, pullRequest&#x3D;&#123;&#125;, flowControlTimes&#x3D;&#123;&#125;&quot;,
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
&#125;
return;
&#125;
if (cachedMessageSizeInMiB &gt; this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) &#123;
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) &#x3D;&#x3D; 0) &#123;
log.warn(
&quot;the cached message size exceeds the threshold &#123;&#125; MiB, so do flow control, minOffset&#x3D;&#123;&#125;, maxOffset&#x3D;&#123;&#125;, count&#x3D;&#123;&#125;, size&#x3D;&#123;&#125; MiB, pullRequest&#x3D;&#123;&#125;, flowControlTimes&#x3D;&#123;&#125;&quot;,
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
&#125;
return;
&#125;
&#x2F;&#x2F; 若不是顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false),则检查ProcessQueue对象的msgTreeMap:TreeMap&lt;Long,MessageExt&gt;变量的第一个key值与最后一个key值之间的差额,该key值表示查询的队列偏移量queueoffset;若差额大于阈值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后重新将该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法;这里的意思主要就是消息有堆积了,等会再来拉取
if (!this.consumeOrderly) &#123;
if (processQueue.getMaxSpan() &gt; this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) &#123;
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) &#x3D;&#x3D; 0) &#123;
log.warn(
&quot;the queue&#39;s messages, span too long, so do flow control, minOffset&#x3D;&#123;&#125;, maxOffset&#x3D;&#123;&#125;, maxSpan&#x3D;&#123;&#125;, pullRequest&#x3D;&#123;&#125;, flowControlTimes&#x3D;&#123;&#125;&quot;,
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
&#125;
return;
&#125;
&#125; else &#123;
if (processQueue.isLocked()) &#123;
if (!pullRequest.isLockedFirst()) &#123;
final long offset &#x3D; this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy &#x3D; offset &lt; pullRequest.getNextOffset();
log.info(&quot;the first time to pull message, so fix offset from broker. pullRequest: &#123;&#125; NewOffset: &#123;&#125; brokerBusy: &#123;&#125;&quot;,
pullRequest, offset, brokerBusy);
if (brokerBusy) &#123;
log.info(&quot;[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: &#123;&#125; NewOffset: &#123;&#125;&quot;,
pullRequest, offset);
&#125;
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
&#125;
&#125; else &#123;
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info(&quot;pull message later because not locked in broker, &#123;&#125;&quot;, pullRequest);
return;
&#125;
&#125;
&#x2F;&#x2F; 以PullRequest.messageQueue对象的topic值为参数从RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData&gt;中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法,稍后重试;并跳出该方法;
final SubscriptionData subscriptionData &#x3D; this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null &#x3D;&#x3D; subscriptionData) &#123;
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn(&quot;find the consumer&#39;s subscription failed, &#123;&#125;&quot;, pullRequest);
return;
&#125;
final long beginTimestamp &#x3D; System.currentTimeMillis();
&#x2F;&#x2F; 异步拉取回调,先不讨论细节
PullCallback pullCallback &#x3D; new PullCallback() &#123;
@Override
public void onSuccess(PullResult pullResult) &#123;
if (pullResult !&#x3D; null) &#123;
pullResult &#x3D; DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) &#123;
case FOUND:
long prevRequestOffset &#x3D; pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT &#x3D; System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset &#x3D; Long.MAX_VALUE;
if (pullResult.getMsgFoundList() &#x3D;&#x3D; null || pullResult.getMsgFoundList().isEmpty()) &#123;
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
&#125; else &#123;
firstMsgOffset &#x3D; pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume &#x3D; processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() &gt; 0) &#123;
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
&#125; else &#123;
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
&#125;
&#125;
if (pullResult.getNextBeginOffset() &lt; prevRequestOffset
|| firstMsgOffset &lt; prevRequestOffset) &#123;
log.warn(
&quot;[BUG] pull message result maybe data wrong, nextBeginOffset: &#123;&#125; firstMsgOffset: &#123;&#125; prevRequestOffset: &#123;&#125;&quot;,
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
&#125;
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn(&quot;the pull request offset illegal, &#123;&#125; &#123;&#125;&quot;,
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() &#123;
@Override
public void run() &#123;
try &#123;
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn(&quot;fix the pull request offset, &#123;&#125;&quot;, pullRequest);
&#125; catch (Throwable e) &#123;
log.error(&quot;executeTaskLater Exception&quot;, e);
&#125;
&#125;
&#125;, 10000);
break;
default:
break;
&#125;
&#125;
&#125;
@Override
public void onException(Throwable e) &#123;
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) &#123;
log.warn(&quot;execute the pull request exception&quot;, e);
&#125;
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
&#125;
&#125;;
&#x2F;&#x2F; 如果为集群模式,即可置commitOffsetEnable为 true
boolean commitOffsetEnable &#x3D; false;
long commitOffsetValue &#x3D; 0L;
if (MessageModel.CLUSTERING &#x3D;&#x3D; this.defaultMQPushConsumer.getMessageModel()) &#123;
commitOffsetValue &#x3D; this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue &gt; 0) &#123;
commitOffsetEnable &#x3D; true;
&#125;
&#125;
&#x2F;&#x2F; 将上面获得的commitOffsetEnable更新到订阅关系里
String subExpression &#x3D; null;
boolean classFilter &#x3D; false;
SubscriptionData sd &#x3D; this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd !&#x3D; null) &#123;
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() &amp;&amp; !sd.isClassFilterMode()) &#123;
subExpression &#x3D; sd.getSubString();
&#125;
classFilter &#x3D; sd.isClassFilterMode();
&#125;
&#x2F;&#x2F; 组成 sysFlag
int sysFlag &#x3D; PullSysFlag.buildSysFlag(
commitOffsetEnable, &#x2F;&#x2F; commitOffset
true, &#x2F;&#x2F; suspend
subExpression !&#x3D; null, &#x2F;&#x2F; subscription
classFilter &#x2F;&#x2F; class filter
);
&#x2F;&#x2F; 调用真正的拉取消息接口
try &#123;
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
&#125; catch (Exception e) &#123;
log.error(&quot;pullKernelImpl exception&quot;, e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
&#125;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>以下就是拉取消息的底层 api,不够不是特别复杂,主要是在找 broker,和设置请求参数</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException &#123;
FindBrokerResult findBrokerResult &#x3D;
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null &#x3D;&#x3D; findBrokerResult) &#123;
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult &#x3D;
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
&#125;
if (findBrokerResult !&#x3D; null) &#123;
&#123;
&#x2F;&#x2F; check version
if (!ExpressionType.isTagType(expressionType)
&amp;&amp; findBrokerResult.getBrokerVersion() &lt; MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) &#123;
throw new MQClientException(&quot;The broker[&quot; + mq.getBrokerName() + &quot;, &quot;
+ findBrokerResult.getBrokerVersion() + &quot;] does not upgrade to support for filter message by &quot; + expressionType, null);
&#125;
&#125;
int sysFlagInner &#x3D; sysFlag;
if (findBrokerResult.isSlave()) &#123;
sysFlagInner &#x3D; PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
&#125;
PullMessageRequestHeader requestHeader &#x3D; new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr &#x3D; findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) &#123;
brokerAddr &#x3D; computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
&#125;
PullResult pullResult &#x3D; this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
&#125;
throw new MQClientException(&quot;The broker[&quot; + mq.getBrokerName() + &quot;] not exist&quot;, null);
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>再看下一步的</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException &#123;
RemotingCommand request &#x3D; RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) &#123;
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
&#125;
return null;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>通过 communicationMode 判断是同步拉取还是异步拉取,异步就调用</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException &#123;
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() &#123;
@Override
public void operationComplete(ResponseFuture responseFuture) &#123;
异步
RemotingCommand response &#x3D; responseFuture.getResponseCommand();
if (response !&#x3D; null) &#123;
try &#123;
PullResult pullResult &#x3D; MQClientAPIImpl.this.processPullResponse(response);
assert pullResult !&#x3D; null;
pullCallback.onSuccess(pullResult);
&#125; catch (Exception e) &#123;
pullCallback.onException(e);
&#125;
&#125; else &#123;
if (!responseFuture.isSendRequestOK()) &#123;
pullCallback.onException(new MQClientException(&quot;send request failed to &quot; + addr + &quot;. Request: &quot; + request, responseFuture.getCause()));
&#125; else if (responseFuture.isTimeout()) &#123;
pullCallback.onException(new MQClientException(&quot;wait response from &quot; + addr + &quot; timeout :&quot; + responseFuture.getTimeoutMillis() + &quot;ms&quot; + &quot;. Request: &quot; + request,
responseFuture.getCause()));
&#125; else &#123;
pullCallback.onException(new MQClientException(&quot;unknown reason. addr: &quot; + addr + &quot;, timeoutMillis: &quot; + timeoutMillis + &quot;. Request: &quot; + request, responseFuture.getCause()));
&#125;
&#125;
&#125;
&#125;);
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>并且会调用前面 pullCallback 的onSuccess和onException方法,同步的就是调用</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">private PullResult pullMessageSync(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException &#123;
RemotingCommand response &#x3D; this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response !&#x3D; null;
return this.processPullResponse(response);
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>然后就是这个 remotingClient 的 invokeAsync 跟 invokeSync 方法</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException &#123;
long beginStartTime &#x3D; System.currentTimeMillis();
final Channel channel &#x3D; this.getAndCreateChannel(addr);
if (channel !&#x3D; null &amp;&amp; channel.isActive()) &#123;
try &#123;
doBeforeRpcHooks(addr, request);
long costTime &#x3D; System.currentTimeMillis() - beginStartTime;
if (timeoutMillis &lt; costTime) &#123;
throw new RemotingTooMuchRequestException(&quot;invokeAsync call timeout&quot;);
&#125;
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
&#125; catch (RemotingSendRequestException e) &#123;
log.warn(&quot;invokeAsync: send request exception, so close the channel[&#123;&#125;]&quot;, addr);
this.closeChannel(addr, channel);
throw e;
&#125;
&#125; else &#123;
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
&#125;
&#125;
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException &#123;
long beginStartTime &#x3D; System.currentTimeMillis();
final Channel channel &#x3D; this.getAndCreateChannel(addr);
if (channel !&#x3D; null &amp;&amp; channel.isActive()) &#123;
try &#123;
doBeforeRpcHooks(addr, request);
long costTime &#x3D; System.currentTimeMillis() - beginStartTime;
if (timeoutMillis &lt; costTime) &#123;
throw new RemotingTimeoutException(&quot;invokeSync call timeout&quot;);
&#125;
RemotingCommand response &#x3D; this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
&#125; catch (RemotingSendRequestException e) &#123;
log.warn(&quot;invokeSync: send request exception, so close the channel[&#123;&#125;]&quot;, addr);
this.closeChannel(addr, channel);
throw e;
&#125; catch (RemotingTimeoutException e) &#123;
if (nettyClientConfig.isClientCloseSocketIfTimeout()) &#123;
this.closeChannel(addr, channel);
log.warn(&quot;invokeSync: close socket because of timeout, &#123;&#125;ms, &#123;&#125;&quot;, timeoutMillis, addr);
&#125;
log.warn(&quot;invokeSync: wait response timeout exception, the channel[&#123;&#125;]&quot;, addr);
throw e;
&#125;
&#125; else &#123;
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
&#125;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre><p>再往下看</p><pre class="line-numbers language-Java" data-language="Java"><code class="language-Java">public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException &#123;
final int opaque &#x3D; request.getOpaque();
try &#123;
同步跟异步都是会把结果用ResponseFuture抱起来
final ResponseFuture responseFuture &#x3D; new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr &#x3D; channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() &#123;
@Override
public void operationComplete(ChannelFuture f) throws Exception &#123;
if (f.isSuccess()) &#123;
responseFuture.setSendRequestOK(true);
return;
&#125; else &#123;
responseFuture.setSendRequestOK(false);
&#125;
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn(&quot;send a request command to channel &lt;&quot; + addr + &quot;&gt; failed.&quot;);
&#125;
&#125;);
&#x2F;&#x2F; 区别是同步的是在这等待
RemotingCommand responseCommand &#x3D; responseFuture.waitResponse(timeoutMillis);
if (null &#x3D;&#x3D; responseCommand) &#123;
if (responseFuture.isSendRequestOK()) &#123;
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
&#125; else &#123;
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
&#125;
&#125;
return responseCommand;
&#125; finally &#123;
this.responseTable.remove(opaque);
&#125;
&#125;
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException &#123;
long beginStartTime &#x3D; System.currentTimeMillis();
final int opaque &#x3D; request.getOpaque();
boolean acquired &#x3D; this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) &#123;
final SemaphoreReleaseOnlyOnce once &#x3D; new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime &#x3D; System.currentTimeMillis() - beginStartTime;
if (timeoutMillis &lt; costTime) &#123;
once.release();
throw new RemotingTimeoutException(&quot;invokeAsyncImpl call timeout&quot;);
&#125;
final ResponseFuture responseFuture &#x3D; new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try &#123;
channel.writeAndFlush(request).addListener(new ChannelFutureListener() &#123;
@Override
public void operationComplete(ChannelFuture f) throws Exception &#123;
if (f.isSuccess()) &#123;
responseFuture.setSendRequestOK(true);
return;
&#125;
requestFail(opaque);
log.warn(&quot;send a request command to channel &lt;&#123;&#125;&gt; failed.&quot;, RemotingHelper.parseChannelRemoteAddr(channel));
&#125;
&#125;);
&#125; catch (Exception e) &#123;
responseFuture.release();
log.warn(&quot;send a request command to channel &lt;&quot; + RemotingHelper.parseChannelRemoteAddr(channel) + &quot;&gt; Exception&quot;, e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
&#125;
&#125; else &#123;
if (timeoutMillis &lt;&#x3D; 0) &#123;
throw new RemotingTooMuchRequestException(&quot;invokeAsyncImpl invoke too fast&quot;);
&#125; else &#123;
String info &#x3D;
String.format(&quot;invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d&quot;,
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
&#125;
&#125;
&#125;<span aria-hidden="true" class="line-numbers-rows"><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span><span></span></span></code></pre></div><div class="popular-posts-header">Related Posts</div><ul class="popular-posts"><li class="popular-posts-item"><div class="popular-posts-title"><a href="/2020/07/05/聊一下-RocketMQ-的-NameServer-源码/" rel="bookmark">聊一下 RocketMQ 的 NameServer 源码</a></div></li><li class="popular-posts-item"><div class="popular-posts-title"><a href="/2020/07/19/聊聊-RocketMQ-的-Broker-源码/" rel="bookmark">聊聊 RocketMQ 的 Broker 源码</a></div></li><li class="popular-posts-item"><div class="popular-posts-title"><a href="/2020/06/21/介绍一下-RocketMQ/" rel="bookmark">介绍一下 RocketMQ</a></div></li><li class="popular-posts-item"><div class="popular-posts-title"><a href="/2021/09/04/聊一下-RocketMQ-的消息存储/" rel="bookmark">聊一下 RocketMQ 的消息存储之 MMAP</a></div></li><li class="popular-posts-item"><div class="popular-posts-title"><a href="/2021/08/29/聊一下-RocketMQ-的顺序消息/" rel="bookmark">聊一下 RocketMQ 的顺序消息</a></div></li></ul><footer class="post-footer"><div class="reward-container"><div>Buy me a coffee</div><button>Donate</button><div class="post-reward"><div><img src="https://i.loli.net/2020/01/12/NUlWanT31E8LQGx.png" alt="Nicksxs WeChat Pay"> <span>WeChat Pay</span></div><div><img src="https://ooo.0o0.ooo/2016/08/17/57b421e773057.jpg" alt="Nicksxs Alipay"> <span>Alipay</span></div></div></div><div class="post-copyright"><ul><li class="post-copyright-author"><strong>Post author: </strong>Nicksxs</li><li class="post-copyright-link"><strong>Post link: </strong><a href="https://nicksxs.me/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/" title="聊一下 RocketMQ 的 DefaultMQPushConsumer 源码">https://nicksxs.me/2020/06/26/聊一下-RocketMQ-的-Consumer/</a></li><li class="post-copyright-license"><strong>Copyright Notice: </strong>All articles in this blog are licensed under <a href="https://creativecommons.org/licenses/by-nc-sa/4.0/" rel="noopener" target="_blank"><i class="fab fa-fw fa-creative-commons"></i>BY-NC-SA</a> unless stating additionally.</li></ul></div><div class="post-tags"><a href="/tags/MQ/" rel="tag"># MQ</a> <a href="/tags/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/" rel="tag"># 消息队列</a> <a href="/tags/RocketMQ/" rel="tag"># RocketMQ</a> <a href="/tags/%E5%89%8A%E5%B3%B0%E5%A1%AB%E8%B0%B7/" rel="tag"># 削峰填谷</a> <a href="/tags/%E4%B8%AD%E9%97%B4%E4%BB%B6/" rel="tag"># 中间件</a> <a href="/tags/%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/" rel="tag"># 源码解析</a> <a href="/tags/DefaultMQPushConsumer/" rel="tag"># DefaultMQPushConsumer</a></div><div class="post-nav"><div class="post-nav-item"><a href="/2020/06/21/%E4%BB%8B%E7%BB%8D%E4%B8%80%E4%B8%8B-RocketMQ/" rel="prev" title="介绍一下 RocketMQ"><i class="fa fa-chevron-left"></i> 介绍一下 RocketMQ</a></div><div class="post-nav-item"><a href="/2020/07/05/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-NameServer-%E6%BA%90%E7%A0%81/" rel="next" title="聊一下 RocketMQ 的 NameServer 源码">聊一下 RocketMQ 的 NameServer 源码 <i class="fa fa-chevron-right"></i></a></div></div></footer></article></div><div class="comments" id="disqus_thread"><noscript>Please enable JavaScript to view the comments powered by Disqus.</noscript></div></div></main><footer class="footer"><div class="footer-inner"><div class="copyright">&copy; <span itemprop="copyrightYear">2021</span> <span class="with-love"><i class="fa fa-heart"></i> </span><span class="author" itemprop="copyrightHolder">Nicksxs</span></div><div class="busuanzi-count"><span class="post-meta-item" id="busuanzi_container_site_uv"><span class="post-meta-item-icon"><i class="fa fa-user"></i> </span><span class="site-uv" title="Total Visitors"><span id="busuanzi_value_site_uv"></span> </span></span><span class="post-meta-item" id="busuanzi_container_site_pv"><span class="post-meta-item-icon"><i class="fa fa-eye"></i> </span><span class="site-pv" title="Total Views"><span id="busuanzi_value_site_pv"></span></span></span></div></div></footer><script src="https://cdn.jsdelivr.net/npm/animejs@3.2.1/lib/anime.min.js" integrity="sha256-XL2inqUJaslATFnHdJOi9GfQ60on8Wx1C2H8DYiN1xY=" crossorigin="anonymous"></script><script src="https://cdn.jsdelivr.net/npm/jquery@3.6.0/dist/jquery.min.js" integrity="sha256-/xUj+3OJU5yExlq6GSYGSHk7tPXikynS7ogEvDej/m4=" crossorigin="anonymous"></script><script src="https://cdn.jsdelivr.net/npm/@fancyapps/fancybox@3.5.7/dist/jquery.fancybox.min.js" integrity="sha256-yt2kYMy0w8AbtF89WXb2P1rfjcP/HTHLT7097U8Y5b8=" crossorigin="anonymous"></script><script src="https://cdn.jsdelivr.net/npm/lozad@1.16.0/dist/lozad.min.js" integrity="sha256-mOFREFhqmHeQbXpK2lp4nA3qooVgACfh88fpJftLBbc=" crossorigin="anonymous"></script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/comments.min.js"></script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/utils.min.js"></script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/next-boot.min.js"></script><script async src="https://busuanzi.ibruce.info/busuanzi/2.3/busuanzi.pure.mini.js"></script><script class="next-config" data-name="leancloud_visitors" type="application/json">{"enable":true,"app_id":"ysza182Vghlqjdt7QiwGLLJy-gzGzoHsz","app_key":"s9GDqbn7gnGGkusf66YRVccw","server_url":"https://leancloud.cn","security":true}</script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/third-party/statistics/lean-analytics.min.js"></script><script src="https://cdn.jsdelivr.net/npm/quicklink@2.2.0/dist/quicklink.umd.js" integrity="sha256-4kQf9z5ntdQrzsBC3YSHnEz02Z9C1UeW/E9OgnvlzSY=" crossorigin="anonymous"></script><script class="next-config" data-name="quicklink" type="application/json">{"enable":true,"home":false,"archive":false,"delay":true,"timeout":3000,"priority":true,"url":"https://nicksxs.me/2020/06/26/%E8%81%8A%E4%B8%80%E4%B8%8B-RocketMQ-%E7%9A%84-Consumer/"}</script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/third-party/quicklink.min.js"></script><script class="next-config" data-name="disqus" type="application/json">{"enable":true,"shortname":"nicksxs","count":true,"i18n":{"disqus":"disqus"}}</script><script src="https://cdn.jsdelivr.net/npm/hexo-theme-next@8.6.1/source/js/third-party/comments/disqus.min.js"></script></body></html>