Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

0%

今天看了一下 redis 分布式锁 redlock 的实现,简单记录下,

加锁

原先我对 redis 锁的概念就是加锁使用 setnx,解锁使用 lua 脚本,但是 setnx 具体是啥,lua 脚本是啥不是很清楚
首先简单思考下这个问题,首先为啥不是先 get 一下 key 存不存在,然后再 set 一个 key value,因为加锁这个操作我们是要保证两点,一个是不能中途被打断,也就是说要原子性,如果是先 get 一下 key,如果不存在再 set 值的话,那就不是原子操作了;第二个是可不可以直接 set 值呢,显然不行,锁要保证唯一性,有且只能有一个线程或者其他应用单位获得该锁,正好 setnx 给了我们这种原子命令

然后是 setnx 的键和值分别是啥,键比较容易想到是要锁住的资源,比如 user_id, 这里有个我自己之前比较容易陷进去的误区,但是这个误区后
面再说,这里其实是把user_id 作为要锁住的资源,在我获得锁的时候别的线程不允许操作,以此保证业务的正确性,不会被多个线程同时修改,确定了键,再来看看值是啥,其实原先我认为值是啥都没关系,我只要锁住了,光键就够我用了,但是考虑下多个线程的问题,如果我这个线程加了锁,然后我因为 gc 停顿等原因卡死了,这个时候redis 的锁或者说就是 redis 的缓存已经过期了,这时候另一个线程获得锁成功,然后我这个线程又活过来了,然后我就仍然认为我拿着锁,我去对数据进行修改或者释放锁,是不是就出现问题了,所以是不是我们还需要一个东西来区分这个锁是哪个线程加的,所以我们可以将值设置成为一个线程独有识别的值,至少在相对长的一段时间内不会重复。

上面其实还有两个问题,一个是当 gc 超时时,我这个线程如何知道我手里的锁已经过期了,一种方法是我在加好锁之后就维护一个超时时间,这里其实还有个问题,不过跟第二个问题相关,就一起说了,就是设置超时时间,有些对于不是锁的 redis 缓存操作可以是先设置好值,然后在设置过期时间,那么这就又有上面说到的不是原子性的问题,那么就需要在同一条指令里把超时时间也设置了,幸好 redis 提供了这种支持

SET resource_name my_random_value NX PX 30000

这里借鉴一下解释下,resource_name就是 key,代表要锁住的东西,my_random_value就是识别我这个线程的,NX代表只有在不存在的时候才设置,然后PX 30000表示超时时间是 30秒自动过期

PS:记录下我原先有的一个误区,是不是要用 key 来区分加锁的线程,这样只有一个用处,就是自身线程可以识别是否是自己加的锁,但是最大的问题是别的线程不知道,其实这个用户的出发点是我在担心前面提过的一个问题,就是当 gc 停顿后,我要去判断当前的这个锁是否是我加的,还有就是当释放锁的时候,如果保证不会错误释放了其他线程加的锁,但是这样附带很多其他问题,最大的就是其他线程怎么知道能不能加这个锁。

解锁

当线程在锁过期之前就处理完了业务逻辑,那就可以提前释放这个锁,那么提前释放要怎么操作,直接del key显然是不行的,因为这样就是我前面想用线程随机值加资源名作为锁的初衷,我不能去释放别的线程加的锁,那么我要怎么办呢,先 get 一下看是不是我的?那又变成非原子的操作了,幸好redis 也考虑到了这个问题,给了lua 脚本来操作这种

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

这里的KEYS[1]就是前面加锁的resource_name,ARGV[1]就是线程的随机值my_random_value

多节点

前面说的其实是单节点 redis 作为分布式锁的情况,那么当我们的 redis 有多节点的情况呢,如果多节点下处于加锁或者解锁或者锁有效情况下
redis 的某个节点宕掉了怎么办,这里就有一些需要思考的地方,是否单独搞一个单节点的 redis作为分布式锁专用的,但是如果这个单节点的挂了呢,还有就是成本问题,所以我们需要一个多节点的分布式锁方案
这里就引出了开头说到的redlock,这个可是 redis的作者写的, 他的加锁过程是分以下几步去做这个事情

  • 获取当前时间(毫秒数)。
  • 按顺序依次向N个Redis节点执行获取锁的操作。这个获取操作跟前面基于单Redis节点的获取锁的过程相同,包含随机字符串my_random_value,也包含过期时间(比如PX 30000,即锁的有效时间)。为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(time out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个Redis节点获取锁失败以后,应该立即尝试下一个Redis节点。这里的失败,应该包含任何类型的失败,比如该Redis节点不可用,或者该Redis节点上的锁已经被其它客户端持有(注:Redlock原文中这里只提到了Redis节点不可用的情况,但也应该包含其它的失败情况)。
  • 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。如果客户端从大多数Redis节点(>= N/2+1)成功获取到了锁,并且获取锁总共消耗的时间没有超过锁的有效时间(lock validity time),那么这时客户端才认为最终获取锁成功;否则,认为最终获取锁失败。
  • 如果最终获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
  • 如果最终获取锁失败了(可能由于获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis节点发起释放锁的操作(即前面介绍的Redis Lua脚本)。
    释放锁的过程比较简单:客户端向所有Redis节点发起释放锁的操作,不管这些节点当时在获取锁的时候成功与否。这里为什么要向所有的节点发送释放锁的操作呢,这里是因为有部分的节点的失败原因可能是加锁时阻塞,加锁成功的结果没有及时返回,所以为了防止这种情况还是需要向所有发起这个释放锁的操作。
    初步记录就先到这。

对 Java 的 gc 实现比较感兴趣,原先一般都是看周志明的书,但其实并没有讲具体的 gc 源码,而是把整个思路和流程讲解了一下
特别是 G1 的具体实现
一般对 G1 的理解其实就是把原先整块的新生代老年代分成了以 region 为单位的小块内存,简而言之,就是原先对新生代老年代的收集会涉及到整个代的堆内存空间,而G1 把它变成了更细致的小块内存
这带来了一个很明显的好处和一个很明显的坏处,好处是内存收集可以更灵活,耗时会变短,但整个收集的处理复杂度就变高了
目前看了一点点关于 G1 收集的预期时间相关的代码

HeapWord* G1CollectedHeap::do_collection_pause(size_t word_size,
                                               uint gc_count_before,
                                               bool* succeeded,
                                               GCCause::Cause gc_cause) {
  assert_heap_not_locked_and_not_at_safepoint();
  VM_G1CollectForAllocation op(word_size,
                               gc_count_before,
                               gc_cause,
                               false, /* should_initiate_conc_mark */
                               g1_policy()->max_pause_time_ms());
  VMThread::execute(&op);

  HeapWord* result = op.result();
  bool ret_succeeded = op.prologue_succeeded() && op.pause_succeeded();
  assert(result == NULL || ret_succeeded,
         "the result should be NULL if the VM did not succeed");
  *succeeded = ret_succeeded;

  assert_heap_not_locked();
  return result;
}

这里就是收集时需要停顿的,其中VMThread::execute(&op);是具体执行的,真正执行的是VM_G1CollectForAllocation::doit方法

void VM_G1CollectForAllocation::doit() {
  G1CollectedHeap* g1h = G1CollectedHeap::heap();
  assert(!_should_initiate_conc_mark || g1h->should_do_concurrent_full_gc(_gc_cause),
      "only a GC locker, a System.gc(), stats update, whitebox, or a hum allocation induced GC should start a cycle");

  if (_word_size > 0) {
    // An allocation has been requested. So, try to do that first.
    _result = g1h->attempt_allocation_at_safepoint(_word_size,
                                                   false /* expect_null_cur_alloc_region */);
    if (_result != NULL) {
      // If we can successfully allocate before we actually do the
      // pause then we will consider this pause successful.
      _pause_succeeded = true;
      return;
    }
  }

  GCCauseSetter x(g1h, _gc_cause);
  if (_should_initiate_conc_mark) {
    // It's safer to read old_marking_cycles_completed() here, given
    // that noone else will be updating it concurrently. Since we'll
    // only need it if we're initiating a marking cycle, no point in
    // setting it earlier.
    _old_marking_cycles_completed_before = g1h->old_marking_cycles_completed();

    // At this point we are supposed to start a concurrent cycle. We
    // will do so if one is not already in progress.
    bool res = g1h->g1_policy()->force_initial_mark_if_outside_cycle(_gc_cause);

    // The above routine returns true if we were able to force the
    // next GC pause to be an initial mark; it returns false if a
    // marking cycle is already in progress.
    //
    // If a marking cycle is already in progress just return and skip the
    // pause below - if the reason for requesting this initial mark pause
    // was due to a System.gc() then the requesting thread should block in
    // doit_epilogue() until the marking cycle is complete.
    //
    // If this initial mark pause was requested as part of a humongous
    // allocation then we know that the marking cycle must just have
    // been started by another thread (possibly also allocating a humongous
    // object) as there was no active marking cycle when the requesting
    // thread checked before calling collect() in
    // attempt_allocation_humongous(). Retrying the GC, in this case,
    // will cause the requesting thread to spin inside collect() until the
    // just started marking cycle is complete - which may be a while. So
    // we do NOT retry the GC.
    if (!res) {
      assert(_word_size == 0, "Concurrent Full GC/Humongous Object IM shouldn't be allocating");
      if (_gc_cause != GCCause::_g1_humongous_allocation) {
        _should_retry_gc = true;
      }
      return;
    }
  }

  // Try a partial collection of some kind.
  _pause_succeeded = g1h->do_collection_pause_at_safepoint(_target_pause_time_ms);

  if (_pause_succeeded) {
    if (_word_size > 0) {
      // An allocation had been requested. Do it, eventually trying a stronger
      // kind of GC.
      _result = g1h->satisfy_failed_allocation(_word_size, &_pause_succeeded);
    } else {
      bool should_upgrade_to_full = !g1h->should_do_concurrent_full_gc(_gc_cause) &&
                                    !g1h->has_regions_left_for_allocation();
      if (should_upgrade_to_full) {
        // There has been a request to perform a GC to free some space. We have no
        // information on how much memory has been asked for. In case there are
        // absolutely no regions left to allocate into, do a maximally compacting full GC.
        log_info(gc, ergo)("Attempting maximally compacting collection");
        _pause_succeeded = g1h->do_full_collection(false, /* explicit gc */
                                                   true   /* clear_all_soft_refs */);
      }
    }
    guarantee(_pause_succeeded, "Elevated collections during the safepoint must always succeed.");
  } else {
    assert(_result == NULL, "invariant");
    // The only reason for the pause to not be successful is that, the GC locker is
    // active (or has become active since the prologue was executed). In this case
    // we should retry the pause after waiting for the GC locker to become inactive.
    _should_retry_gc = true;
  }
}

这里可以看到核心的是G1CollectedHeap::do_collection_pause_at_safepoint这个方法,它带上了目标暂停时间的值

G1CollectedHeap::do_collection_pause_at_safepoint(double target_pause_time_ms) {
  assert_at_safepoint_on_vm_thread();
  guarantee(!is_gc_active(), "collection is not reentrant");

  if (GCLocker::check_active_before_gc()) {
    return false;
  }

  _gc_timer_stw->register_gc_start();

  GCIdMark gc_id_mark;
  _gc_tracer_stw->report_gc_start(gc_cause(), _gc_timer_stw->gc_start());

  SvcGCMarker sgcm(SvcGCMarker::MINOR);
  ResourceMark rm;

  g1_policy()->note_gc_start();

  wait_for_root_region_scanning();

  print_heap_before_gc();
  print_heap_regions();
  trace_heap_before_gc(_gc_tracer_stw);

  _verifier->verify_region_sets_optional();
  _verifier->verify_dirty_young_regions();

  // We should not be doing initial mark unless the conc mark thread is running
  if (!_cm_thread->should_terminate()) {
    // This call will decide whether this pause is an initial-mark
    // pause. If it is, in_initial_mark_gc() will return true
    // for the duration of this pause.
    g1_policy()->decide_on_conc_mark_initiation();
  }

  // We do not allow initial-mark to be piggy-backed on a mixed GC.
  assert(!collector_state()->in_initial_mark_gc() ||
          collector_state()->in_young_only_phase(), "sanity");

  // We also do not allow mixed GCs during marking.
  assert(!collector_state()->mark_or_rebuild_in_progress() || collector_state()->in_young_only_phase(), "sanity");

  // Record whether this pause is an initial mark. When the current
  // thread has completed its logging output and it's safe to signal
  // the CM thread, the flag's value in the policy has been reset.
  bool should_start_conc_mark = collector_state()->in_initial_mark_gc();

  // Inner scope for scope based logging, timers, and stats collection
  {
    EvacuationInfo evacuation_info;

    if (collector_state()->in_initial_mark_gc()) {
      // We are about to start a marking cycle, so we increment the
      // full collection counter.
      increment_old_marking_cycles_started();
      _cm->gc_tracer_cm()->set_gc_cause(gc_cause());
    }

    _gc_tracer_stw->report_yc_type(collector_state()->yc_type());

    GCTraceCPUTime tcpu;

    G1HeapVerifier::G1VerifyType verify_type;
    FormatBuffer<> gc_string("Pause Young ");
    if (collector_state()->in_initial_mark_gc()) {
      gc_string.append("(Concurrent Start)");
      verify_type = G1HeapVerifier::G1VerifyConcurrentStart;
    } else if (collector_state()->in_young_only_phase()) {
      if (collector_state()->in_young_gc_before_mixed()) {
        gc_string.append("(Prepare Mixed)");
      } else {
        gc_string.append("(Normal)");
      }
      verify_type = G1HeapVerifier::G1VerifyYoungNormal;
    } else {
      gc_string.append("(Mixed)");
      verify_type = G1HeapVerifier::G1VerifyMixed;
    }
    GCTraceTime(Info, gc) tm(gc_string, NULL, gc_cause(), true);

    uint active_workers = AdaptiveSizePolicy::calc_active_workers(workers()->total_workers(),
                                                                  workers()->active_workers(),
                                                                  Threads::number_of_non_daemon_threads());
    active_workers = workers()->update_active_workers(active_workers);
    log_info(gc,task)("Using %u workers of %u for evacuation", active_workers, workers()->total_workers());

    TraceCollectorStats tcs(g1mm()->incremental_collection_counters());
    TraceMemoryManagerStats tms(&_memory_manager, gc_cause(),
                                collector_state()->yc_type() == Mixed /* allMemoryPoolsAffected */);

    G1HeapTransition heap_transition(this);
    size_t heap_used_bytes_before_gc = used();

    // Don't dynamically change the number of GC threads this early.  A value of
    // 0 is used to indicate serial work.  When parallel work is done,
    // it will be set.

    { // Call to jvmpi::post_class_unload_events must occur outside of active GC
      IsGCActiveMark x;

      gc_prologue(false);

      if (VerifyRememberedSets) {
        log_info(gc, verify)("[Verifying RemSets before GC]");
        VerifyRegionRemSetClosure v_cl;
        heap_region_iterate(&v_cl);
      }

      _verifier->verify_before_gc(verify_type);

      _verifier->check_bitmaps("GC Start");

#if COMPILER2_OR_JVMCI
      DerivedPointerTable::clear();
#endif

      // Please see comment in g1CollectedHeap.hpp and
      // G1CollectedHeap::ref_processing_init() to see how
      // reference processing currently works in G1.

      // Enable discovery in the STW reference processor
      _ref_processor_stw->enable_discovery();

      {
        // We want to temporarily turn off discovery by the
        // CM ref processor, if necessary, and turn it back on
        // on again later if we do. Using a scoped
        // NoRefDiscovery object will do this.
        NoRefDiscovery no_cm_discovery(_ref_processor_cm);

        // Forget the current alloc region (we might even choose it to be part
        // of the collection set!).
        _allocator->release_mutator_alloc_region();

        // This timing is only used by the ergonomics to handle our pause target.
        // It is unclear why this should not include the full pause. We will
        // investigate this in CR 7178365.
        //
        // Preserving the old comment here if that helps the investigation:
        //
        // The elapsed time induced by the start time below deliberately elides
        // the possible verification above.
        double sample_start_time_sec = os::elapsedTime();

        g1_policy()->record_collection_pause_start(sample_start_time_sec);

        if (collector_state()->in_initial_mark_gc()) {
          concurrent_mark()->pre_initial_mark();
        }

        g1_policy()->finalize_collection_set(target_pause_time_ms, &_survivor);

        evacuation_info.set_collectionset_regions(collection_set()->region_length());

        // Make sure the remembered sets are up to date. This needs to be
        // done before register_humongous_regions_with_cset(), because the
        // remembered sets are used there to choose eager reclaim candidates.
        // If the remembered sets are not up to date we might miss some
        // entries that need to be handled.
        g1_rem_set()->cleanupHRRS();

        register_humongous_regions_with_cset();

        assert(_verifier->check_cset_fast_test(), "Inconsistency in the InCSetState table.");

        // We call this after finalize_cset() to
        // ensure that the CSet has been finalized.
        _cm->verify_no_cset_oops();

        if (_hr_printer.is_active()) {
          G1PrintCollectionSetClosure cl(&_hr_printer);
          _collection_set.iterate(&cl);
        }

        // Initialize the GC alloc regions.
        _allocator->init_gc_alloc_regions(evacuation_info);

        G1ParScanThreadStateSet per_thread_states(this, workers()->active_workers(), collection_set()->young_region_length());
        pre_evacuate_collection_set();

        // Actually do the work...
        evacuate_collection_set(&per_thread_states);

        post_evacuate_collection_set(evacuation_info, &per_thread_states);

        const size_t* surviving_young_words = per_thread_states.surviving_young_words();
        free_collection_set(&_collection_set, evacuation_info, surviving_young_words);

        eagerly_reclaim_humongous_regions();

        record_obj_copy_mem_stats();
        _survivor_evac_stats.adjust_desired_plab_sz();
        _old_evac_stats.adjust_desired_plab_sz();

        double start = os::elapsedTime();
        start_new_collection_set();
        g1_policy()->phase_times()->record_start_new_cset_time_ms((os::elapsedTime() - start) * 1000.0);

        if (evacuation_failed()) {
          set_used(recalculate_used());
          if (_archive_allocator != NULL) {
            _archive_allocator->clear_used();
          }
          for (uint i = 0; i < ParallelGCThreads; i++) {
            if (_evacuation_failed_info_array[i].has_failed()) {
              _gc_tracer_stw->report_evacuation_failed(_evacuation_failed_info_array[i]);
            }
          }
        } else {
          // The "used" of the the collection set have already been subtracted
          // when they were freed.  Add in the bytes evacuated.
          increase_used(g1_policy()->bytes_copied_during_gc());
        }

        if (collector_state()->in_initial_mark_gc()) {
          // We have to do this before we notify the CM threads that
          // they can start working to make sure that all the
          // appropriate initialization is done on the CM object.
          concurrent_mark()->post_initial_mark();
          // Note that we don't actually trigger the CM thread at
          // this point. We do that later when we're sure that
          // the current thread has completed its logging output.
        }

        allocate_dummy_regions();

        _allocator->init_mutator_alloc_region();

        {
          size_t expand_bytes = _heap_sizing_policy->expansion_amount();
          if (expand_bytes > 0) {
            size_t bytes_before = capacity();
            // No need for an ergo logging here,
            // expansion_amount() does this when it returns a value > 0.
            double expand_ms;
            if (!expand(expand_bytes, _workers, &expand_ms)) {
              // We failed to expand the heap. Cannot do anything about it.
            }
            g1_policy()->phase_times()->record_expand_heap_time(expand_ms);
          }
        }

        // We redo the verification but now wrt to the new CSet which
        // has just got initialized after the previous CSet was freed.
        _cm->verify_no_cset_oops();

        // This timing is only used by the ergonomics to handle our pause target.
        // It is unclear why this should not include the full pause. We will
        // investigate this in CR 7178365.
        double sample_end_time_sec = os::elapsedTime();
        double pause_time_ms = (sample_end_time_sec - sample_start_time_sec) * MILLIUNITS;
        size_t total_cards_scanned = g1_policy()->phase_times()->sum_thread_work_items(G1GCPhaseTimes::ScanRS, G1GCPhaseTimes::ScanRSScannedCards);
        g1_policy()->record_collection_pause_end(pause_time_ms, total_cards_scanned, heap_used_bytes_before_gc);

        evacuation_info.set_collectionset_used_before(collection_set()->bytes_used_before());
        evacuation_info.set_bytes_copied(g1_policy()->bytes_copied_during_gc());

        if (VerifyRememberedSets) {
          log_info(gc, verify)("[Verifying RemSets after GC]");
          VerifyRegionRemSetClosure v_cl;
          heap_region_iterate(&v_cl);
        }

        _verifier->verify_after_gc(verify_type);
        _verifier->check_bitmaps("GC End");

        assert(!_ref_processor_stw->discovery_enabled(), "Postcondition");
        _ref_processor_stw->verify_no_references_recorded();

        // CM reference discovery will be re-enabled if necessary.
      }

#ifdef TRACESPINNING
      ParallelTaskTerminator::print_termination_counts();
#endif

      gc_epilogue(false);
    }

    // Print the remainder of the GC log output.
    if (evacuation_failed()) {
      log_info(gc)("To-space exhausted");
    }

    g1_policy()->print_phases();
    heap_transition.print();

    // It is not yet to safe to tell the concurrent mark to
    // start as we have some optional output below. We don't want the
    // output from the concurrent mark thread interfering with this
    // logging output either.

    _hrm.verify_optional();
    _verifier->verify_region_sets_optional();

    TASKQUEUE_STATS_ONLY(print_taskqueue_stats());
    TASKQUEUE_STATS_ONLY(reset_taskqueue_stats());

    print_heap_after_gc();
    print_heap_regions();
    trace_heap_after_gc(_gc_tracer_stw);

    // We must call G1MonitoringSupport::update_sizes() in the same scoping level
    // as an active TraceMemoryManagerStats object (i.e. before the destructor for the
    // TraceMemoryManagerStats is called) so that the G1 memory pools are updated
    // before any GC notifications are raised.
    g1mm()->update_sizes();

    _gc_tracer_stw->report_evacuation_info(&evacuation_info);
    _gc_tracer_stw->report_tenuring_threshold(_g1_policy->tenuring_threshold());
    _gc_timer_stw->register_gc_end();
    _gc_tracer_stw->report_gc_end(_gc_timer_stw->gc_end(), _gc_timer_stw->time_partitions());
  }
  // It should now be safe to tell the concurrent mark thread to start
  // without its logging output interfering with the logging output
  // that came from the pause.

  if (should_start_conc_mark) {
    // CAUTION: after the doConcurrentMark() call below,
    // the concurrent marking thread(s) could be running
    // concurrently with us. Make sure that anything after
    // this point does not assume that we are the only GC thread
    // running. Note: of course, the actual marking work will
    // not start until the safepoint itself is released in
    // SuspendibleThreadSet::desynchronize().
    do_concurrent_mark();
  }

  return true;
}

往下走就是这一步G1Policy::finalize_collection_set,去处理新生代和老年代

void G1Policy::finalize_collection_set(double target_pause_time_ms, G1SurvivorRegions* survivor) {
  double time_remaining_ms = _collection_set->finalize_young_part(target_pause_time_ms, survivor);
  _collection_set->finalize_old_part(time_remaining_ms);
}

这里分别调用了两个方法,可以看到剩余时间是往下传的,来看一下具体的方法

double G1CollectionSet::finalize_young_part(double target_pause_time_ms, G1SurvivorRegions* survivors) {
  double young_start_time_sec = os::elapsedTime();

  finalize_incremental_building();

  guarantee(target_pause_time_ms > 0.0,
            "target_pause_time_ms = %1.6lf should be positive", target_pause_time_ms);

  size_t pending_cards = _policy->pending_cards();
  double base_time_ms = _policy->predict_base_elapsed_time_ms(pending_cards);
  double time_remaining_ms = MAX2(target_pause_time_ms - base_time_ms, 0.0);

  log_trace(gc, ergo, cset)("Start choosing CSet. pending cards: " SIZE_FORMAT " predicted base time: %1.2fms remaining time: %1.2fms target pause time: %1.2fms",
                            pending_cards, base_time_ms, time_remaining_ms, target_pause_time_ms);

  // The young list is laid with the survivor regions from the previous
  // pause are appended to the RHS of the young list, i.e.
  //   [Newly Young Regions ++ Survivors from last pause].

  uint survivor_region_length = survivors->length();
  uint eden_region_length = _g1h->eden_regions_count();
  init_region_lengths(eden_region_length, survivor_region_length);

  verify_young_cset_indices();

  // Clear the fields that point to the survivor list - they are all young now.
  survivors->convert_to_eden();

  _bytes_used_before = _inc_bytes_used_before;
  time_remaining_ms = MAX2(time_remaining_ms - _inc_predicted_elapsed_time_ms, 0.0);

  log_trace(gc, ergo, cset)("Add young regions to CSet. eden: %u regions, survivors: %u regions, predicted young region time: %1.2fms, target pause time: %1.2fms",
                            eden_region_length, survivor_region_length, _inc_predicted_elapsed_time_ms, target_pause_time_ms);

  // The number of recorded young regions is the incremental
  // collection set's current size
  set_recorded_rs_lengths(_inc_recorded_rs_lengths);

  double young_end_time_sec = os::elapsedTime();
  phase_times()->record_young_cset_choice_time_ms((young_end_time_sec - young_start_time_sec) * 1000.0);

  return time_remaining_ms;
}

下面是老年代的部分

void G1CollectionSet::finalize_old_part(double time_remaining_ms) {
  double non_young_start_time_sec = os::elapsedTime();
  double predicted_old_time_ms = 0.0;

  if (collector_state()->in_mixed_phase()) {
    cset_chooser()->verify();
    const uint min_old_cset_length = _policy->calc_min_old_cset_length();
    const uint max_old_cset_length = _policy->calc_max_old_cset_length();

    uint expensive_region_num = 0;
    bool check_time_remaining = _policy->adaptive_young_list_length();

    HeapRegion* hr = cset_chooser()->peek();
    while (hr != NULL) {
      if (old_region_length() >= max_old_cset_length) {
        // Added maximum number of old regions to the CSet.
        log_debug(gc, ergo, cset)("Finish adding old regions to CSet (old CSet region num reached max). old %u regions, max %u regions",
                                  old_region_length(), max_old_cset_length);
        break;
      }

      // Stop adding regions if the remaining reclaimable space is
      // not above G1HeapWastePercent.
      size_t reclaimable_bytes = cset_chooser()->remaining_reclaimable_bytes();
      double reclaimable_percent = _policy->reclaimable_bytes_percent(reclaimable_bytes);
      double threshold = (double) G1HeapWastePercent;
      if (reclaimable_percent <= threshold) {
        // We've added enough old regions that the amount of uncollected
        // reclaimable space is at or below the waste threshold. Stop
        // adding old regions to the CSet.
        log_debug(gc, ergo, cset)("Finish adding old regions to CSet (reclaimable percentage not over threshold). "
                                  "old %u regions, max %u regions, reclaimable: " SIZE_FORMAT "B (%1.2f%%) threshold: " UINTX_FORMAT "%%",
                                  old_region_length(), max_old_cset_length, reclaimable_bytes, reclaimable_percent, G1HeapWastePercent);
        break;
      }

      double predicted_time_ms = predict_region_elapsed_time_ms(hr);
      if (check_time_remaining) {
        if (predicted_time_ms > time_remaining_ms) {
          // Too expensive for the current CSet.

          if (old_region_length() >= min_old_cset_length) {
            // We have added the minimum number of old regions to the CSet,
            // we are done with this CSet.
            log_debug(gc, ergo, cset)("Finish adding old regions to CSet (predicted time is too high). "
                                      "predicted time: %1.2fms, remaining time: %1.2fms old %u regions, min %u regions",
                                      predicted_time_ms, time_remaining_ms, old_region_length(), min_old_cset_length);
            break;
          }

          // We'll add it anyway given that we haven't reached the
          // minimum number of old regions.
          expensive_region_num += 1;
        }
      } else {
        if (old_region_length() >= min_old_cset_length) {
          // In the non-auto-tuning case, we'll finish adding regions
          // to the CSet if we reach the minimum.

          log_debug(gc, ergo, cset)("Finish adding old regions to CSet (old CSet region num reached min). old %u regions, min %u regions",
                                    old_region_length(), min_old_cset_length);
          break;
        }
      }

      // We will add this region to the CSet.
      time_remaining_ms = MAX2(time_remaining_ms - predicted_time_ms, 0.0);
      predicted_old_time_ms += predicted_time_ms;
      cset_chooser()->pop(); // already have region via peek()
      _g1h->old_set_remove(hr);
      add_old_region(hr);

      hr = cset_chooser()->peek();
    }
    if (hr == NULL) {
      log_debug(gc, ergo, cset)("Finish adding old regions to CSet (candidate old regions not available)");
    }

    if (expensive_region_num > 0) {
      // We print the information once here at the end, predicated on
      // whether we added any apparently expensive regions or not, to
      // avoid generating output per region.
      log_debug(gc, ergo, cset)("Added expensive regions to CSet (old CSet region num not reached min)."
                                "old: %u regions, expensive: %u regions, min: %u regions, remaining time: %1.2fms",
                                old_region_length(), expensive_region_num, min_old_cset_length, time_remaining_ms);
    }

    cset_chooser()->verify();
  }

  stop_incremental_building();

  log_debug(gc, ergo, cset)("Finish choosing CSet. old: %u regions, predicted old region time: %1.2fms, time remaining: %1.2f",
                            old_region_length(), predicted_old_time_ms, time_remaining_ms);

  double non_young_end_time_sec = os::elapsedTime();
  phase_times()->record_non_young_cset_choice_time_ms((non_young_end_time_sec - non_young_start_time_sec) * 1000.0);

  QuickSort::sort(_collection_set_regions, _collection_set_cur_length, compare_region_idx, true);
}

上面第三行是个判断,当前是否是 mixed 回收阶段,如果不是的话其实是没有老年代什么事的,所以可以看到代码基本是从这个 if 判断
if (collector_state()->in_mixed_phase()) {开始往下走的
先写到这,偏向于做笔记用,有错轻拍

最近看了大神的 AQS 的文章,之前总是断断续续地看一点,每次都知难而退,下次看又从头开始,昨天总算硬着头皮看完了第一部分
首先 AQS 只要有这些属性

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

大概了解了 aqs 底层的双向等待队列,
结构是这样的

每个 node 里面主要是的代码结构也比较简单

static final class Node {
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // =====================================================


    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    //    ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;

}

其实可以主要关注这个 waitStatus 因为这个是后面的节点给前面的节点设置的,等于-1 的时候代表后面有节点等待,需要去唤醒,
这里使用了一个变种的 CLH 队列实现,CLH 队列相关内容可以查看这篇 自旋锁、排队自旋锁、MCS锁、CLH锁

目前公司要对一些新的产品功能做灰度测试,因为在后端业务代码层面添加判断比较麻烦,所以想在nginx上做点手脚,就想到了openresty
前后也踩了不少坑,这边先写一点

首先是日志
error_log logs/error.log debug;
需要nginx开启日志的debug才能看到日志

使用 lua_code_cache off即可, 另外注意只有使用 content_by_lua_file 才会生效

http {
  lua_code_cache off;
}

location ~* /(\d+-.*)/api/orgunits/load_all(.*) {
   default_type 'application/json;charset=utf-8';
   content_by_lua_file /data/projects/xxx/current/lua/controller/load_data.lua;
}

使用lua给nginx请求response头添加内容可以用这个

ngx.header['response'] = 'header'

使用总结

后续:

  1. 一开始在本地环境的时候使用content_by_lua_file只关注了头,后来发到测试环境发现请求内容都没代理转发到后端服务上
    网上查了下发现content_by_lua_file是将请求的所有内容包括response都用这里面的lua脚本生成了,content这个词就表示是请求内容
    后来改成了access_by_lua_file就正常了,只是要去获取请求内容和修改响应头,并不是要完整的接管请求

  2. 后来又碰到了一个坑是nginx有个client_body_buffer_size的配置参数,nginx在32位和64位系统里有8K和16K两个默认值,当请求内容大于这两个值的时候,会把请求内容放到临时文件里,这个时候openresty里的ngx.req.get_post_args()就会报“failed to get post args: requesty body in temp file not supported”这个错误,将client_body_buffer_size这个参数配置调大一点就好了

  3. 还有就是lua的异常捕获,网上看一般是用pcall和xpcall来进行保护调用,因为问题主要出在cjson的decode,这里有两个解决方案,一个就是将cjson.decode使用pcall封装,

    local decode = require("cjson").decode
    
    function json_decode( str )
        local ok, t = pcall(decode, str)
        if not ok then
          return nil
        end
    
        return t
    end

    这个是使用了pcall,称为保护调用,会在内部错误后返回两个参数,第一个是false,第二个是错误信息
    还有一种是使用cjson.safe包

    local json = require("cjson.safe")
    local str = [[ {"key:"value"} ]]
    
    local t = json.decode(str)
    if t then
        ngx.say(" --> ", type(t))
    end

    cjson.safe包会在解析失败的时候返回nil

  4. 还有一个是redis链接时如果host使用的是域名的话会提示“failed to connect: no resolver defined to resolve “redis.xxxxxx.com””,这里需要使用nginx的resolver指令,
    resolver 8.8.8.8 valid=3600s;

  5. 还有一点补充下
    就是业务在使用redis的时候使用了db的特性,所以在lua访问redis的时候也需要执行db,这里lua的redis库也支持了这个特性,可以使用instance:select(config:get(‘db’))来切换db

  6. 性能优化tips
    建议是尽量少使用阶段钩子,例如content_by_lua_file,*_by_lua

  7. 发现一个不错的openresty站点
    地址

初识ambari

ambari是一个大数据平台的管理工具,包含了hadoop, yarn, hive, hbase, spark等大数据的基础架构和工具,简化了数据平台的搭建,之前只是在同事搭建好平台后的一些使用,这次有机会从头开始用ambari来搭建一个测试的数据平台,过程中也踩到不少坑,简单记录下。

简单过程

  • 第一个坑
    在刚开始是按照官网的指南,用maven构建,因为GFW的原因,导致反复失败等待,也就是这个guide,因为对maven不熟悉导致有些按图索骥,浪费了很多时间,之后才知道可以直接加repo用yum安装,然而用yum安装马上就出现了第二个坑。
  • 第二个坑
    因为在线的repo还是因为网络原因很慢很慢,用proxychains勉强把ambari-server本身安装好了,ambari.repo将这个放进/etc/yum.repos.d/路径下,然后yum update && yum install ambari-server安装即可,如果有条件就用proxychains走下代理。
  • 第三步
    安装好ambari-server后先执行ambari-server setup做一些初始化设置,其中包含了JDK路径的设置,数据库设置,设置好就OK了,然后执行ambari-server start启动服务,这里有个小插曲,因为ambari-server涉及到这么多服务,所以管理控制监控之类的模块是必不可少的,这部分可以在ambari-server的web ui界面安装,也可以命令行提前安装,这部分被称为HDF Management Pack,运行ambari-server install-mpack \ --mpack=http://public-repo-1.hortonworks.com/HDF/centos7/2.x/updates/2.1.4.0/tars/hdf_ambari_mp/hdf-ambari-mpack-2.1.4.0-5.tar.gz \ --purge \ --verbose
    安装,当然这个压缩包可以下载之后指到本地路径安装,然后就可以重启ambari-server