Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

之前看了 dubbo 的一些代码,在学习过程中,主要关注那些比较“高级”的内容,SPI,自适应扩展等,却忘了一些作为一个 rpc 框架最核心需要的部分,比如如何通信,序列化,网络,容错机制等等,因为其实这个最核心的就是远程调用,自适应扩展其实就是让代码可扩展性,可读性,更优雅等,写的搓一点其实也问题不大,但是一个合适的通信协议,序列化方法,如何容错等却是真正保证是一个 rpc 框架最重要的要素。
首先来看这张图
cluster
在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
各节点关系:

  • 这里的 InvokerProvider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
  • Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • ClusterDirectory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

集群容错模式

Failover Cluster

失败自动切换,当出现失败,重试其它服务器 1。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。

重试次数配置如下:

<dubbo:service retries=”2” />
这里重点看下 Failover Cluster集群模式的实现

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

这个代码就非常简单,重点需要看FailoverClusterInvoker里的代码,FailoverClusterInvoker继承了AbstractClusterInvoker类,其中invoke 方法是在抽象类里实现的

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    // binding attachments into invocation.
    // 绑定 attachments 到 invocation 中.
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
    // 列举 Invoker
    List<Invoker<T>> invokers = list(invocation);
    // 加载 LoadBalance 负载均衡器
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 调用 实际的 doInvoke 进行后续操作
    return doInvoke(invocation, invokers, loadbalance);
}
// 这是个抽象方法,实际是由子类实现的
 protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

然后重点就是FailoverClusterInvoker中的doInvoke方法了,其实它里面也就这么一个方法

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 获取重试次数,这里默认是 2 次,还有可以注意下后面的+1
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        // 循环调用,失败重试
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
                // 通过调用 list 可得到最新可用的 Invoker 列表
                copyInvokers = list(invocation);
                // check again
                // 对 copyinvokers 进行判空检查
                checkInvokers(copyInvokers, invocation);
            }
            // 通过负载均衡来选择 invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            // 将其添加到 invoker 到 invoked 列表中
            invoked.add(invoker);
            // 设置上下文
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 正式调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

Failfast Cluster

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

Failback Cluster

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。

Broadcast Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错 2。通常用于通知所有提供者更新缓存或日志等本地资源信息。

题目介绍

Given a singly linked list, determine if it is a palindrome.
给定一个单向链表,判断是否是回文链表

例一 Example 1:

Input: 1->2
Output: false

例二 Example 2:

Input: 1->2->2->1
Output: true

挑战下自己

Follow up:
Could you do it in O(n) time and O(1) space?

简要分析

首先这是个单向链表,如果是双向的就可以一个从头到尾,一个从尾到头,显然那样就没啥意思了,然后想过要不找到中点,然后用一个栈,把前一半塞进栈里,但是这种其实也比较麻烦,比如长度是奇偶数,然后如何找到中点,这倒是可以借助于双指针,还是比较麻烦,再想一想,回文链表,就跟最开始的一样,链表只有单向的,我用个栈不就可以逆向了么,先把链表整个塞进栈里,然后在一个个 pop 出来跟链表从头开始比较,全对上了就是回文了

/**
 * Definition for singly-linked list.
 * public class ListNode {
 *     int val;
 *     ListNode next;
 *     ListNode() {}
 *     ListNode(int val) { this.val = val; }
 *     ListNode(int val, ListNode next) { this.val = val; this.next = next; }
 * }
 */
class Solution {
    public boolean isPalindrome(ListNode head) {
        if (head == null) {
            return true;
        }
        ListNode tail = head;
        LinkedList<Integer> stack = new LinkedList<>();
        // 这里就是一个循环,将所有元素依次压入栈
        while (tail != null) {
            stack.push(tail.val);
            tail = tail.next;
        }
        // 在逐个 pop 出来,其实这个出来的顺序就等于链表从尾到头遍历,同时跟链表从头到尾遍历进行逐对对比
        while (!stack.isEmpty()) {
            if (stack.peekFirst() == head.val) {
                stack.pollFirst();
                head = head.next;
            } else {
                return false;
            }
        }
        return true;
    }
}

一说到这个主题,想到的应该是双亲委派模型,不过讲的包括但不限于这个,主要内容是参考深入理解 Java 虚拟机书中的介绍,
一个类型的生命周期包含了七个阶段,加载,验证,准备,解析,初始化,使用,卸载。

  • 加载

  1. 通过一个类的全限定名来获取定义此类的二进制字节流
  2. 将这个字节流代表的静态存储结构转化为方法区的运行时数据结构
  3. 在内存中生成了一个代表这个类的 java.lang.Class 对象,作为方法区这个类的各种数据的访问入口
  • 验证

  1. 文件格式验证
  2. 元数据验证
  3. 字节码验证
  4. 符号引用验证
  • 准备

    准备阶段是正式为类中定义的变量(即静态变量,被static修饰的变量)分配内存并设置类变量初始值的阶段

  • 解析

    解析阶段是 Java 虚拟机将常量池内的符号引用替换为直接引用的过程

以上验证准备解析 三个阶段又合称为链接阶段,链接阶段要做的是将加载到JVM中的二进制字节流的类数据信息合并到JVM的运行时状态中。

  • 初始化

    类的初始化阶段是类加载过程的最后一个步骤,也是除了自定义类加载器之外将主动权交给了应用程序,其实就是执行类构造器()方法的过程,()并不是我们在 Java 代码中直接编写的方法,它是 Javac编译器的自动生成物,()方法是由编译器自动收集类中的所有类变量的复制动作和静态句块(static{}块)中的语句合并产生的,编译器收集的顺序是由语句在原文件中出现的顺序决定的,静态语句块中只能访问定义在静态语句块之前的变量,定义在它之后的变量,在前面的静态语句块可以复制,但是不能访问,同时还要保证父类的执行先于子类,然后保证多线程下的并发问题

最终,方法区会存储当前类类信息,包括类的静态变量、类初始化代码(定义静态变量时的赋值语句 和 静态初始化代码块)、实例变量定义、实例初始化代码(定义实例变量时的赋值语句实例代码块和构造方法)和实例方法,还有父类的类信息引用。

在前司和目前公司,用的配置中心都是使用的 Apollo,经过了业界验证,比较强大的配置管理系统,特别是在0.10 后开始支持对使用 value 注解的配置值进行自动更新,今天刚好有个同学问到我,就顺便写篇文章记录下,其实也是借助于 spring 强大的 bean 生命周期管理,可以实现BeanPostProcessor接口,使用postProcessBeforeInitialization方法,来对bean 内部的属性和方法进行判断,是否有 value 注解,如果有就是将它注册到一个 map 中,可以看到这个方法com.ctrip.framework.apollo.spring.annotation.SpringValueProcessor#processField

@Override
  protected void processField(Object bean, String beanName, Field field) {
    // register @Value on field
    Value value = field.getAnnotation(Value.class);
    if (value == null) {
      return;
    }
    Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());

    if (keys.isEmpty()) {
      return;
    }

    for (String key : keys) {
      SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
      springValueRegistry.register(beanFactory, key, springValue);
      logger.debug("Monitoring {}", springValue);
    }
  }

然后我们看下这个springValueRegistry是啥玩意

public class SpringValueRegistry {
  private static final long CLEAN_INTERVAL_IN_SECONDS = 5;
  private final Map<BeanFactory, Multimap<String, SpringValue>> registry = Maps.newConcurrentMap();
  private final AtomicBoolean initialized = new AtomicBoolean(false);
  private final Object LOCK = new Object();

  public void register(BeanFactory beanFactory, String key, SpringValue springValue) {
    if (!registry.containsKey(beanFactory)) {
      synchronized (LOCK) {
        if (!registry.containsKey(beanFactory)) {
          registry.put(beanFactory, LinkedListMultimap.<String, SpringValue>create());
        }
      }
    }

    registry.get(beanFactory).put(key, springValue);

    // lazy initialize
    if (initialized.compareAndSet(false, true)) {
      initialize();
    }
  }

这类其实就是个 map 来存放 springvalue,然后有com.ctrip.framework.apollo.spring.property.AutoUpdateConfigChangeListener来监听更新操作,当有变更时

@Override
 public void onChange(ConfigChangeEvent changeEvent) {
   Set<String> keys = changeEvent.changedKeys();
   if (CollectionUtils.isEmpty(keys)) {
     return;
   }
   for (String key : keys) {
     // 1. check whether the changed key is relevant
     Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
     if (targetValues == null || targetValues.isEmpty()) {
       continue;
     }

     // 2. check whether the value is really changed or not (since spring property sources have hierarchies)
     // 这里其实有一点比较绕,是因为 Apollo 里的 namespace 划分,会出现 key 相同,但是 namespace 不同的情况,所以会有个优先级存在,所以需要去校验 environment 里面的是否已经更新,如果未更新则表示不需要更新
     if (!shouldTriggerAutoUpdate(changeEvent, key)) {
       continue;
     }

     // 3. update the value
     for (SpringValue val : targetValues) {
       updateSpringValue(val);
     }
   }
 }

其实原理很简单,就是得了解知道下

题目介绍

给定一个二叉树,找出其最大深度。

二叉树的深度为根节点到最远叶子节点的最长路径上的节点数。

说明: 叶子节点是指没有子节点的节点。

示例:
给定二叉树 [3,9,20,null,null,15,7],

  3
 / \
9  20
  /  \
 15   7

返回它的最大深度 3 。

代码

// 主体是个递归的应用
public int maxDepth(TreeNode root) {
    // 节点的退出条件之一
    if (root == null) {
        return 0;
    }
    int left = 0;
    int right = 0;
    // 存在左子树,就递归左子树
    if (root.left != null) {
        left = maxDepth(root.left);
    }
    // 存在右子树,就递归右子树
    if (root.right != null) {
        right = maxDepth(root.right);
    }
    // 前面返回后,左右取大者
    return Math.max(left + 1, right + 1);
}

分析

其实对于树这类题,一般是以递归形式比较方便,只是要注意退出条件

0%