Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

对 Java 的 gc 实现比较感兴趣,原先一般都是看周志明的书,但其实并没有讲具体的 gc 源码,而是把整个思路和流程讲解了一下
特别是 G1 的具体实现
一般对 G1 的理解其实就是把原先整块的新生代老年代分成了以 region 为单位的小块内存,简而言之,就是原先对新生代老年代的收集会涉及到整个代的堆内存空间,而G1 把它变成了更细致的小块内存
这带来了一个很明显的好处和一个很明显的坏处,好处是内存收集可以更灵活,耗时会变短,但整个收集的处理复杂度就变高了
目前看了一点点关于 G1 收集的预期时间相关的代码

HeapWord* G1CollectedHeap::do_collection_pause(size_t word_size,
                                               uint gc_count_before,
                                               bool* succeeded,
                                               GCCause::Cause gc_cause) {
  assert_heap_not_locked_and_not_at_safepoint();
  VM_G1CollectForAllocation op(word_size,
                               gc_count_before,
                               gc_cause,
                               false, /* should_initiate_conc_mark */
                               g1_policy()->max_pause_time_ms());
  VMThread::execute(&op);

  HeapWord* result = op.result();
  bool ret_succeeded = op.prologue_succeeded() && op.pause_succeeded();
  assert(result == NULL || ret_succeeded,
         "the result should be NULL if the VM did not succeed");
  *succeeded = ret_succeeded;

  assert_heap_not_locked();
  return result;
}

这里就是收集时需要停顿的,其中VMThread::execute(&op);是具体执行的,真正执行的是VM_G1CollectForAllocation::doit方法

void VM_G1CollectForAllocation::doit() {
  G1CollectedHeap* g1h = G1CollectedHeap::heap();
  assert(!_should_initiate_conc_mark || g1h->should_do_concurrent_full_gc(_gc_cause),
      "only a GC locker, a System.gc(), stats update, whitebox, or a hum allocation induced GC should start a cycle");

  if (_word_size > 0) {
    // An allocation has been requested. So, try to do that first.
    _result = g1h->attempt_allocation_at_safepoint(_word_size,
                                                   false /* expect_null_cur_alloc_region */);
    if (_result != NULL) {
      // If we can successfully allocate before we actually do the
      // pause then we will consider this pause successful.
      _pause_succeeded = true;
      return;
    }
  }

  GCCauseSetter x(g1h, _gc_cause);
  if (_should_initiate_conc_mark) {
    // It's safer to read old_marking_cycles_completed() here, given
    // that noone else will be updating it concurrently. Since we'll
    // only need it if we're initiating a marking cycle, no point in
    // setting it earlier.
    _old_marking_cycles_completed_before = g1h->old_marking_cycles_completed();

    // At this point we are supposed to start a concurrent cycle. We
    // will do so if one is not already in progress.
    bool res = g1h->g1_policy()->force_initial_mark_if_outside_cycle(_gc_cause);

    // The above routine returns true if we were able to force the
    // next GC pause to be an initial mark; it returns false if a
    // marking cycle is already in progress.
    //
    // If a marking cycle is already in progress just return and skip the
    // pause below - if the reason for requesting this initial mark pause
    // was due to a System.gc() then the requesting thread should block in
    // doit_epilogue() until the marking cycle is complete.
    //
    // If this initial mark pause was requested as part of a humongous
    // allocation then we know that the marking cycle must just have
    // been started by another thread (possibly also allocating a humongous
    // object) as there was no active marking cycle when the requesting
    // thread checked before calling collect() in
    // attempt_allocation_humongous(). Retrying the GC, in this case,
    // will cause the requesting thread to spin inside collect() until the
    // just started marking cycle is complete - which may be a while. So
    // we do NOT retry the GC.
    if (!res) {
      assert(_word_size == 0, "Concurrent Full GC/Humongous Object IM shouldn't be allocating");
      if (_gc_cause != GCCause::_g1_humongous_allocation) {
        _should_retry_gc = true;
      }
      return;
    }
  }

  // Try a partial collection of some kind.
  _pause_succeeded = g1h->do_collection_pause_at_safepoint(_target_pause_time_ms);

  if (_pause_succeeded) {
    if (_word_size > 0) {
      // An allocation had been requested. Do it, eventually trying a stronger
      // kind of GC.
      _result = g1h->satisfy_failed_allocation(_word_size, &_pause_succeeded);
    } else {
      bool should_upgrade_to_full = !g1h->should_do_concurrent_full_gc(_gc_cause) &&
                                    !g1h->has_regions_left_for_allocation();
      if (should_upgrade_to_full) {
        // There has been a request to perform a GC to free some space. We have no
        // information on how much memory has been asked for. In case there are
        // absolutely no regions left to allocate into, do a maximally compacting full GC.
        log_info(gc, ergo)("Attempting maximally compacting collection");
        _pause_succeeded = g1h->do_full_collection(false, /* explicit gc */
                                                   true   /* clear_all_soft_refs */);
      }
    }
    guarantee(_pause_succeeded, "Elevated collections during the safepoint must always succeed.");
  } else {
    assert(_result == NULL, "invariant");
    // The only reason for the pause to not be successful is that, the GC locker is
    // active (or has become active since the prologue was executed). In this case
    // we should retry the pause after waiting for the GC locker to become inactive.
    _should_retry_gc = true;
  }
}

这里可以看到核心的是G1CollectedHeap::do_collection_pause_at_safepoint这个方法,它带上了目标暂停时间的值

G1CollectedHeap::do_collection_pause_at_safepoint(double target_pause_time_ms) {
  assert_at_safepoint_on_vm_thread();
  guarantee(!is_gc_active(), "collection is not reentrant");

  if (GCLocker::check_active_before_gc()) {
    return false;
  }

  _gc_timer_stw->register_gc_start();

  GCIdMark gc_id_mark;
  _gc_tracer_stw->report_gc_start(gc_cause(), _gc_timer_stw->gc_start());

  SvcGCMarker sgcm(SvcGCMarker::MINOR);
  ResourceMark rm;

  g1_policy()->note_gc_start();

  wait_for_root_region_scanning();

  print_heap_before_gc();
  print_heap_regions();
  trace_heap_before_gc(_gc_tracer_stw);

  _verifier->verify_region_sets_optional();
  _verifier->verify_dirty_young_regions();

  // We should not be doing initial mark unless the conc mark thread is running
  if (!_cm_thread->should_terminate()) {
    // This call will decide whether this pause is an initial-mark
    // pause. If it is, in_initial_mark_gc() will return true
    // for the duration of this pause.
    g1_policy()->decide_on_conc_mark_initiation();
  }

  // We do not allow initial-mark to be piggy-backed on a mixed GC.
  assert(!collector_state()->in_initial_mark_gc() ||
          collector_state()->in_young_only_phase(), "sanity");

  // We also do not allow mixed GCs during marking.
  assert(!collector_state()->mark_or_rebuild_in_progress() || collector_state()->in_young_only_phase(), "sanity");

  // Record whether this pause is an initial mark. When the current
  // thread has completed its logging output and it's safe to signal
  // the CM thread, the flag's value in the policy has been reset.
  bool should_start_conc_mark = collector_state()->in_initial_mark_gc();

  // Inner scope for scope based logging, timers, and stats collection
  {
    EvacuationInfo evacuation_info;

    if (collector_state()->in_initial_mark_gc()) {
      // We are about to start a marking cycle, so we increment the
      // full collection counter.
      increment_old_marking_cycles_started();
      _cm->gc_tracer_cm()->set_gc_cause(gc_cause());
    }

    _gc_tracer_stw->report_yc_type(collector_state()->yc_type());

    GCTraceCPUTime tcpu;

    G1HeapVerifier::G1VerifyType verify_type;
    FormatBuffer<> gc_string("Pause Young ");
    if (collector_state()->in_initial_mark_gc()) {
      gc_string.append("(Concurrent Start)");
      verify_type = G1HeapVerifier::G1VerifyConcurrentStart;
    } else if (collector_state()->in_young_only_phase()) {
      if (collector_state()->in_young_gc_before_mixed()) {
        gc_string.append("(Prepare Mixed)");
      } else {
        gc_string.append("(Normal)");
      }
      verify_type = G1HeapVerifier::G1VerifyYoungNormal;
    } else {
      gc_string.append("(Mixed)");
      verify_type = G1HeapVerifier::G1VerifyMixed;
    }
    GCTraceTime(Info, gc) tm(gc_string, NULL, gc_cause(), true);

    uint active_workers = AdaptiveSizePolicy::calc_active_workers(workers()->total_workers(),
                                                                  workers()->active_workers(),
                                                                  Threads::number_of_non_daemon_threads());
    active_workers = workers()->update_active_workers(active_workers);
    log_info(gc,task)("Using %u workers of %u for evacuation", active_workers, workers()->total_workers());

    TraceCollectorStats tcs(g1mm()->incremental_collection_counters());
    TraceMemoryManagerStats tms(&_memory_manager, gc_cause(),
                                collector_state()->yc_type() == Mixed /* allMemoryPoolsAffected */);

    G1HeapTransition heap_transition(this);
    size_t heap_used_bytes_before_gc = used();

    // Don't dynamically change the number of GC threads this early.  A value of
    // 0 is used to indicate serial work.  When parallel work is done,
    // it will be set.

    { // Call to jvmpi::post_class_unload_events must occur outside of active GC
      IsGCActiveMark x;

      gc_prologue(false);

      if (VerifyRememberedSets) {
        log_info(gc, verify)("[Verifying RemSets before GC]");
        VerifyRegionRemSetClosure v_cl;
        heap_region_iterate(&v_cl);
      }

      _verifier->verify_before_gc(verify_type);

      _verifier->check_bitmaps("GC Start");

#if COMPILER2_OR_JVMCI
      DerivedPointerTable::clear();
#endif

      // Please see comment in g1CollectedHeap.hpp and
      // G1CollectedHeap::ref_processing_init() to see how
      // reference processing currently works in G1.

      // Enable discovery in the STW reference processor
      _ref_processor_stw->enable_discovery();

      {
        // We want to temporarily turn off discovery by the
        // CM ref processor, if necessary, and turn it back on
        // on again later if we do. Using a scoped
        // NoRefDiscovery object will do this.
        NoRefDiscovery no_cm_discovery(_ref_processor_cm);

        // Forget the current alloc region (we might even choose it to be part
        // of the collection set!).
        _allocator->release_mutator_alloc_region();

        // This timing is only used by the ergonomics to handle our pause target.
        // It is unclear why this should not include the full pause. We will
        // investigate this in CR 7178365.
        //
        // Preserving the old comment here if that helps the investigation:
        //
        // The elapsed time induced by the start time below deliberately elides
        // the possible verification above.
        double sample_start_time_sec = os::elapsedTime();

        g1_policy()->record_collection_pause_start(sample_start_time_sec);

        if (collector_state()->in_initial_mark_gc()) {
          concurrent_mark()->pre_initial_mark();
        }

        g1_policy()->finalize_collection_set(target_pause_time_ms, &_survivor);

        evacuation_info.set_collectionset_regions(collection_set()->region_length());

        // Make sure the remembered sets are up to date. This needs to be
        // done before register_humongous_regions_with_cset(), because the
        // remembered sets are used there to choose eager reclaim candidates.
        // If the remembered sets are not up to date we might miss some
        // entries that need to be handled.
        g1_rem_set()->cleanupHRRS();

        register_humongous_regions_with_cset();

        assert(_verifier->check_cset_fast_test(), "Inconsistency in the InCSetState table.");

        // We call this after finalize_cset() to
        // ensure that the CSet has been finalized.
        _cm->verify_no_cset_oops();

        if (_hr_printer.is_active()) {
          G1PrintCollectionSetClosure cl(&_hr_printer);
          _collection_set.iterate(&cl);
        }

        // Initialize the GC alloc regions.
        _allocator->init_gc_alloc_regions(evacuation_info);

        G1ParScanThreadStateSet per_thread_states(this, workers()->active_workers(), collection_set()->young_region_length());
        pre_evacuate_collection_set();

        // Actually do the work...
        evacuate_collection_set(&per_thread_states);

        post_evacuate_collection_set(evacuation_info, &per_thread_states);

        const size_t* surviving_young_words = per_thread_states.surviving_young_words();
        free_collection_set(&_collection_set, evacuation_info, surviving_young_words);

        eagerly_reclaim_humongous_regions();

        record_obj_copy_mem_stats();
        _survivor_evac_stats.adjust_desired_plab_sz();
        _old_evac_stats.adjust_desired_plab_sz();

        double start = os::elapsedTime();
        start_new_collection_set();
        g1_policy()->phase_times()->record_start_new_cset_time_ms((os::elapsedTime() - start) * 1000.0);

        if (evacuation_failed()) {
          set_used(recalculate_used());
          if (_archive_allocator != NULL) {
            _archive_allocator->clear_used();
          }
          for (uint i = 0; i < ParallelGCThreads; i++) {
            if (_evacuation_failed_info_array[i].has_failed()) {
              _gc_tracer_stw->report_evacuation_failed(_evacuation_failed_info_array[i]);
            }
          }
        } else {
          // The "used" of the the collection set have already been subtracted
          // when they were freed.  Add in the bytes evacuated.
          increase_used(g1_policy()->bytes_copied_during_gc());
        }

        if (collector_state()->in_initial_mark_gc()) {
          // We have to do this before we notify the CM threads that
          // they can start working to make sure that all the
          // appropriate initialization is done on the CM object.
          concurrent_mark()->post_initial_mark();
          // Note that we don't actually trigger the CM thread at
          // this point. We do that later when we're sure that
          // the current thread has completed its logging output.
        }

        allocate_dummy_regions();

        _allocator->init_mutator_alloc_region();

        {
          size_t expand_bytes = _heap_sizing_policy->expansion_amount();
          if (expand_bytes > 0) {
            size_t bytes_before = capacity();
            // No need for an ergo logging here,
            // expansion_amount() does this when it returns a value > 0.
            double expand_ms;
            if (!expand(expand_bytes, _workers, &expand_ms)) {
              // We failed to expand the heap. Cannot do anything about it.
            }
            g1_policy()->phase_times()->record_expand_heap_time(expand_ms);
          }
        }

        // We redo the verification but now wrt to the new CSet which
        // has just got initialized after the previous CSet was freed.
        _cm->verify_no_cset_oops();

        // This timing is only used by the ergonomics to handle our pause target.
        // It is unclear why this should not include the full pause. We will
        // investigate this in CR 7178365.
        double sample_end_time_sec = os::elapsedTime();
        double pause_time_ms = (sample_end_time_sec - sample_start_time_sec) * MILLIUNITS;
        size_t total_cards_scanned = g1_policy()->phase_times()->sum_thread_work_items(G1GCPhaseTimes::ScanRS, G1GCPhaseTimes::ScanRSScannedCards);
        g1_policy()->record_collection_pause_end(pause_time_ms, total_cards_scanned, heap_used_bytes_before_gc);

        evacuation_info.set_collectionset_used_before(collection_set()->bytes_used_before());
        evacuation_info.set_bytes_copied(g1_policy()->bytes_copied_during_gc());

        if (VerifyRememberedSets) {
          log_info(gc, verify)("[Verifying RemSets after GC]");
          VerifyRegionRemSetClosure v_cl;
          heap_region_iterate(&v_cl);
        }

        _verifier->verify_after_gc(verify_type);
        _verifier->check_bitmaps("GC End");

        assert(!_ref_processor_stw->discovery_enabled(), "Postcondition");
        _ref_processor_stw->verify_no_references_recorded();

        // CM reference discovery will be re-enabled if necessary.
      }

#ifdef TRACESPINNING
      ParallelTaskTerminator::print_termination_counts();
#endif

      gc_epilogue(false);
    }

    // Print the remainder of the GC log output.
    if (evacuation_failed()) {
      log_info(gc)("To-space exhausted");
    }

    g1_policy()->print_phases();
    heap_transition.print();

    // It is not yet to safe to tell the concurrent mark to
    // start as we have some optional output below. We don't want the
    // output from the concurrent mark thread interfering with this
    // logging output either.

    _hrm.verify_optional();
    _verifier->verify_region_sets_optional();

    TASKQUEUE_STATS_ONLY(print_taskqueue_stats());
    TASKQUEUE_STATS_ONLY(reset_taskqueue_stats());

    print_heap_after_gc();
    print_heap_regions();
    trace_heap_after_gc(_gc_tracer_stw);

    // We must call G1MonitoringSupport::update_sizes() in the same scoping level
    // as an active TraceMemoryManagerStats object (i.e. before the destructor for the
    // TraceMemoryManagerStats is called) so that the G1 memory pools are updated
    // before any GC notifications are raised.
    g1mm()->update_sizes();

    _gc_tracer_stw->report_evacuation_info(&evacuation_info);
    _gc_tracer_stw->report_tenuring_threshold(_g1_policy->tenuring_threshold());
    _gc_timer_stw->register_gc_end();
    _gc_tracer_stw->report_gc_end(_gc_timer_stw->gc_end(), _gc_timer_stw->time_partitions());
  }
  // It should now be safe to tell the concurrent mark thread to start
  // without its logging output interfering with the logging output
  // that came from the pause.

  if (should_start_conc_mark) {
    // CAUTION: after the doConcurrentMark() call below,
    // the concurrent marking thread(s) could be running
    // concurrently with us. Make sure that anything after
    // this point does not assume that we are the only GC thread
    // running. Note: of course, the actual marking work will
    // not start until the safepoint itself is released in
    // SuspendibleThreadSet::desynchronize().
    do_concurrent_mark();
  }

  return true;
}

往下走就是这一步G1Policy::finalize_collection_set,去处理新生代和老年代

void G1Policy::finalize_collection_set(double target_pause_time_ms, G1SurvivorRegions* survivor) {
  double time_remaining_ms = _collection_set->finalize_young_part(target_pause_time_ms, survivor);
  _collection_set->finalize_old_part(time_remaining_ms);
}

这里分别调用了两个方法,可以看到剩余时间是往下传的,来看一下具体的方法

double G1CollectionSet::finalize_young_part(double target_pause_time_ms, G1SurvivorRegions* survivors) {
  double young_start_time_sec = os::elapsedTime();

  finalize_incremental_building();

  guarantee(target_pause_time_ms > 0.0,
            "target_pause_time_ms = %1.6lf should be positive", target_pause_time_ms);

  size_t pending_cards = _policy->pending_cards();
  double base_time_ms = _policy->predict_base_elapsed_time_ms(pending_cards);
  double time_remaining_ms = MAX2(target_pause_time_ms - base_time_ms, 0.0);

  log_trace(gc, ergo, cset)("Start choosing CSet. pending cards: " SIZE_FORMAT " predicted base time: %1.2fms remaining time: %1.2fms target pause time: %1.2fms",
                            pending_cards, base_time_ms, time_remaining_ms, target_pause_time_ms);

  // The young list is laid with the survivor regions from the previous
  // pause are appended to the RHS of the young list, i.e.
  //   [Newly Young Regions ++ Survivors from last pause].

  uint survivor_region_length = survivors->length();
  uint eden_region_length = _g1h->eden_regions_count();
  init_region_lengths(eden_region_length, survivor_region_length);

  verify_young_cset_indices();

  // Clear the fields that point to the survivor list - they are all young now.
  survivors->convert_to_eden();

  _bytes_used_before = _inc_bytes_used_before;
  time_remaining_ms = MAX2(time_remaining_ms - _inc_predicted_elapsed_time_ms, 0.0);

  log_trace(gc, ergo, cset)("Add young regions to CSet. eden: %u regions, survivors: %u regions, predicted young region time: %1.2fms, target pause time: %1.2fms",
                            eden_region_length, survivor_region_length, _inc_predicted_elapsed_time_ms, target_pause_time_ms);

  // The number of recorded young regions is the incremental
  // collection set's current size
  set_recorded_rs_lengths(_inc_recorded_rs_lengths);

  double young_end_time_sec = os::elapsedTime();
  phase_times()->record_young_cset_choice_time_ms((young_end_time_sec - young_start_time_sec) * 1000.0);

  return time_remaining_ms;
}

下面是老年代的部分

void G1CollectionSet::finalize_old_part(double time_remaining_ms) {
  double non_young_start_time_sec = os::elapsedTime();
  double predicted_old_time_ms = 0.0;

  if (collector_state()->in_mixed_phase()) {
    cset_chooser()->verify();
    const uint min_old_cset_length = _policy->calc_min_old_cset_length();
    const uint max_old_cset_length = _policy->calc_max_old_cset_length();

    uint expensive_region_num = 0;
    bool check_time_remaining = _policy->adaptive_young_list_length();

    HeapRegion* hr = cset_chooser()->peek();
    while (hr != NULL) {
      if (old_region_length() >= max_old_cset_length) {
        // Added maximum number of old regions to the CSet.
        log_debug(gc, ergo, cset)("Finish adding old regions to CSet (old CSet region num reached max). old %u regions, max %u regions",
                                  old_region_length(), max_old_cset_length);
        break;
      }

      // Stop adding regions if the remaining reclaimable space is
      // not above G1HeapWastePercent.
      size_t reclaimable_bytes = cset_chooser()->remaining_reclaimable_bytes();
      double reclaimable_percent = _policy->reclaimable_bytes_percent(reclaimable_bytes);
      double threshold = (double) G1HeapWastePercent;
      if (reclaimable_percent <= threshold) {
        // We've added enough old regions that the amount of uncollected
        // reclaimable space is at or below the waste threshold. Stop
        // adding old regions to the CSet.
        log_debug(gc, ergo, cset)("Finish adding old regions to CSet (reclaimable percentage not over threshold). "
                                  "old %u regions, max %u regions, reclaimable: " SIZE_FORMAT "B (%1.2f%%) threshold: " UINTX_FORMAT "%%",
                                  old_region_length(), max_old_cset_length, reclaimable_bytes, reclaimable_percent, G1HeapWastePercent);
        break;
      }

      double predicted_time_ms = predict_region_elapsed_time_ms(hr);
      if (check_time_remaining) {
        if (predicted_time_ms > time_remaining_ms) {
          // Too expensive for the current CSet.

          if (old_region_length() >= min_old_cset_length) {
            // We have added the minimum number of old regions to the CSet,
            // we are done with this CSet.
            log_debug(gc, ergo, cset)("Finish adding old regions to CSet (predicted time is too high). "
                                      "predicted time: %1.2fms, remaining time: %1.2fms old %u regions, min %u regions",
                                      predicted_time_ms, time_remaining_ms, old_region_length(), min_old_cset_length);
            break;
          }

          // We'll add it anyway given that we haven't reached the
          // minimum number of old regions.
          expensive_region_num += 1;
        }
      } else {
        if (old_region_length() >= min_old_cset_length) {
          // In the non-auto-tuning case, we'll finish adding regions
          // to the CSet if we reach the minimum.

          log_debug(gc, ergo, cset)("Finish adding old regions to CSet (old CSet region num reached min). old %u regions, min %u regions",
                                    old_region_length(), min_old_cset_length);
          break;
        }
      }

      // We will add this region to the CSet.
      time_remaining_ms = MAX2(time_remaining_ms - predicted_time_ms, 0.0);
      predicted_old_time_ms += predicted_time_ms;
      cset_chooser()->pop(); // already have region via peek()
      _g1h->old_set_remove(hr);
      add_old_region(hr);

      hr = cset_chooser()->peek();
    }
    if (hr == NULL) {
      log_debug(gc, ergo, cset)("Finish adding old regions to CSet (candidate old regions not available)");
    }

    if (expensive_region_num > 0) {
      // We print the information once here at the end, predicated on
      // whether we added any apparently expensive regions or not, to
      // avoid generating output per region.
      log_debug(gc, ergo, cset)("Added expensive regions to CSet (old CSet region num not reached min)."
                                "old: %u regions, expensive: %u regions, min: %u regions, remaining time: %1.2fms",
                                old_region_length(), expensive_region_num, min_old_cset_length, time_remaining_ms);
    }

    cset_chooser()->verify();
  }

  stop_incremental_building();

  log_debug(gc, ergo, cset)("Finish choosing CSet. old: %u regions, predicted old region time: %1.2fms, time remaining: %1.2f",
                            old_region_length(), predicted_old_time_ms, time_remaining_ms);

  double non_young_end_time_sec = os::elapsedTime();
  phase_times()->record_non_young_cset_choice_time_ms((non_young_end_time_sec - non_young_start_time_sec) * 1000.0);

  QuickSort::sort(_collection_set_regions, _collection_set_cur_length, compare_region_idx, true);
}

上面第三行是个判断,当前是否是 mixed 回收阶段,如果不是的话其实是没有老年代什么事的,所以可以看到代码基本是从这个 if 判断
if (collector_state()->in_mixed_phase()) {开始往下走的
先写到这,偏向于做笔记用,有错轻拍

最近看了大神的 AQS 的文章,之前总是断断续续地看一点,每次都知难而退,下次看又从头开始,昨天总算硬着头皮看完了第一部分
首先 AQS 只要有这些属性

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

大概了解了 aqs 底层的双向等待队列,
结构是这样的

每个 node 里面主要是的代码结构也比较简单

static final class Node {
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // =====================================================


    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    //    ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;

}

其实可以主要关注这个 waitStatus 因为这个是后面的节点给前面的节点设置的,等于-1 的时候代表后面有节点等待,需要去唤醒,
这里使用了一个变种的 CLH 队列实现,CLH 队列相关内容可以查看这篇 自旋锁、排队自旋锁、MCS锁、CLH锁

目前公司要对一些新的产品功能做灰度测试,因为在后端业务代码层面添加判断比较麻烦,所以想在nginx上做点手脚,就想到了openresty
前后也踩了不少坑,这边先写一点

首先是日志
error_log logs/error.log debug;
需要nginx开启日志的debug才能看到日志

使用 lua_code_cache off即可, 另外注意只有使用 content_by_lua_file 才会生效

http {
  lua_code_cache off;
}

location ~* /(\d+-.*)/api/orgunits/load_all(.*) {
   default_type 'application/json;charset=utf-8';
   content_by_lua_file /data/projects/xxx/current/lua/controller/load_data.lua;
}

使用lua给nginx请求response头添加内容可以用这个

ngx.header['response'] = 'header'

使用总结

后续:

  1. 一开始在本地环境的时候使用content_by_lua_file只关注了头,后来发到测试环境发现请求内容都没代理转发到后端服务上
    网上查了下发现content_by_lua_file是将请求的所有内容包括response都用这里面的lua脚本生成了,content这个词就表示是请求内容
    后来改成了access_by_lua_file就正常了,只是要去获取请求内容和修改响应头,并不是要完整的接管请求

  2. 后来又碰到了一个坑是nginx有个client_body_buffer_size的配置参数,nginx在32位和64位系统里有8K和16K两个默认值,当请求内容大于这两个值的时候,会把请求内容放到临时文件里,这个时候openresty里的ngx.req.get_post_args()就会报“failed to get post args: requesty body in temp file not supported”这个错误,将client_body_buffer_size这个参数配置调大一点就好了

  3. 还有就是lua的异常捕获,网上看一般是用pcall和xpcall来进行保护调用,因为问题主要出在cjson的decode,这里有两个解决方案,一个就是将cjson.decode使用pcall封装,

    local decode = require("cjson").decode
    
    function json_decode( str )
        local ok, t = pcall(decode, str)
        if not ok then
          return nil
        end
    
        return t
    end

    这个是使用了pcall,称为保护调用,会在内部错误后返回两个参数,第一个是false,第二个是错误信息
    还有一种是使用cjson.safe包

    local json = require("cjson.safe")
    local str = [[ {"key:"value"} ]]
    
    local t = json.decode(str)
    if t then
        ngx.say(" --> ", type(t))
    end

    cjson.safe包会在解析失败的时候返回nil

  4. 还有一个是redis链接时如果host使用的是域名的话会提示“failed to connect: no resolver defined to resolve “redis.xxxxxx.com””,这里需要使用nginx的resolver指令,
    resolver 8.8.8.8 valid=3600s;

  5. 还有一点补充下
    就是业务在使用redis的时候使用了db的特性,所以在lua访问redis的时候也需要执行db,这里lua的redis库也支持了这个特性,可以使用instance:select(config:get(‘db’))来切换db

  6. 性能优化tips
    建议是尽量少使用阶段钩子,例如content_by_lua_file,*_by_lua

  7. 发现一个不错的openresty站点
    地址

初识ambari

ambari是一个大数据平台的管理工具,包含了hadoop, yarn, hive, hbase, spark等大数据的基础架构和工具,简化了数据平台的搭建,之前只是在同事搭建好平台后的一些使用,这次有机会从头开始用ambari来搭建一个测试的数据平台,过程中也踩到不少坑,简单记录下。

简单过程

  • 第一个坑
    在刚开始是按照官网的指南,用maven构建,因为GFW的原因,导致反复失败等待,也就是这个guide,因为对maven不熟悉导致有些按图索骥,浪费了很多时间,之后才知道可以直接加repo用yum安装,然而用yum安装马上就出现了第二个坑。
  • 第二个坑
    因为在线的repo还是因为网络原因很慢很慢,用proxychains勉强把ambari-server本身安装好了,ambari.repo将这个放进/etc/yum.repos.d/路径下,然后yum update && yum install ambari-server安装即可,如果有条件就用proxychains走下代理。
  • 第三步
    安装好ambari-server后先执行ambari-server setup做一些初始化设置,其中包含了JDK路径的设置,数据库设置,设置好就OK了,然后执行ambari-server start启动服务,这里有个小插曲,因为ambari-server涉及到这么多服务,所以管理控制监控之类的模块是必不可少的,这部分可以在ambari-server的web ui界面安装,也可以命令行提前安装,这部分被称为HDF Management Pack,运行ambari-server install-mpack \ --mpack=http://public-repo-1.hortonworks.com/HDF/centos7/2.x/updates/2.1.4.0/tars/hdf_ambari_mp/hdf-ambari-mpack-2.1.4.0-5.tar.gz \ --purge \ --verbose
    安装,当然这个压缩包可以下载之后指到本地路径安装,然后就可以重启ambari-server

rabbitmq 介绍

接触了一下rabbitmq,原来在选型的时候是在rabbitmq跟kafka之间做选择,网上搜了一下之后发现kafka的优势在于吞吐量,而rabbitmq相对注重可靠性,因为应用在im上,需要保证消息不能丢失所以就暂时选定rabbitmq,
Message Queue的需求由来已久,80年代最早在金融交易中,高盛等公司采用Teknekron公司的产品,当时的Message queuing软件叫做:the information bus(TIB)。 TIB被电信和通讯公司采用,路透社收购了Teknekron公司。之后,IBM开发了MQSeries,微软开发了Microsoft Message Queue(MSMQ)。这些商业MQ供应商的问题是厂商锁定,价格高昂。2001年,Java Message queuing试图解决锁定和交互性的问题,但对应用来说反而更加麻烦了。
RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发concurrent和distribution系统的一种语言,在电信领域使用广泛。OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件/库/工具,如mnesia/SASL,极大方便了Erlang应用的开发。OTP就类似于Python语言中众多的module,用户借助这些module可以很方便的开发应用。
于是2004年,摩根大通和iMatrix开始着手Advanced Message Queuing Protocol (AMQP)开放标准的开发。2006年,AMQP规范发布。2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ 1.0 发布。所有主要的编程语言均有与代理接口通讯的客户端库。

简单的使用经验

通俗的理解

这里介绍下其中的一些概念,connection表示和队列服务器的连接,一般情况下是tcp连接, channel表示通道,可以在一个连接上建立多个通道,这里主要是节省了tcp连接握手的成本,exchange可以理解成一个路由器,将消息推送给对应的队列queue,其实是像一个订阅的模式。

集群经验

rabbitmqctl stop这个是关闭rabbitmq,在搭建集群时候先关闭服务,然后使用rabbitmq-server -detached静默启动,这时候使用rabbitmqctl cluster_status查看集群状态,因为还没将节点加入集群,所以只能看到类似

Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}]
...done.

然后就可以把当前节点加入集群,

rabbit2$ rabbitmqctl stop_app #这个stop_app与stop的区别是前者停的是rabbitmq应用,保留erlang节点,
                              #后者是停止了rabbitmq和erlang节点
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1 #这里可以用--ram指定将当前节点作为内存节点加入集群
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

其他可以参考官方文档

一些坑

消息丢失

这里碰到过一个坑,对于使用exchange来做消息路由的,会有一个情况,就是在routing_key没被订阅的时候,会将该条找不到路由对应的queue的消息丢掉What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.对应链接,而当使用空的exchange时,会保留消息,当出现消费者的时候就可以将收到之前生产者所推送的消息对应链接,这里就是用了空的exchange。

集群搭建

集群搭建的时候有个erlang vm生成的random cookie,这个是用来做集群之间认证的,相同的cookie才能连接,但是如果通过vim打开复制后在其他几点新建文件写入会多一个换行,导致集群建立是报错,所以这里最好使用scp等传输命令直接传输cookie文件,同时要注意下cookie的文件权限。
另外在集群搭建的时候如果更改过hostname,那么要把rabbitmq的数据库删除,否则启动后会马上挂掉

0%