Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

0%

IndexFile 结构 hash 结构能够通过 key 寻找到对应在 CommitLog 中的位置

IndexFile 的构建则是分发给这个进行处理

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}
public void buildIndex(DispatchRequest req) {
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    return;
            }

            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }

            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }

配置的数量

private boolean messageIndexEnable = true;
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;

最核心的其实是 IndexFile 的结构和如何写入

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
          // 获取 key 的 hash
            int keyHash = indexKeyHashMethod(key);
          // 计算属于哪个 slot
            int slotPos = keyHash % this.hashSlotNum;
          // 计算 slot 位置 因为结构是有个 indexHead,主要是分为三段 header,slot 和 index
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

              // 计算索引存放位置,头部 + slot 数量 * slot 大小 + 已有的 index 数量 + index 大小
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
							
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

              // 存放的是数量位移,不是绝对位置
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

具体可以看一下这个简略的示意图

周末把《蛮荒记》看完了,前面是发现微信读书有《搜神记》和《蛮荒记》,但是《搜神记》看了会发现很多都是跳段了,不知道为啥,貌似也没什么少儿不宜的情节,所以就上网找了原版来看,为什么看这个呢,主要还是高中的时候看过,觉得写得很不错,属于那时候的玄幻小说里的独一档,基于山海经创造了一个半架空的大荒宇宙,五族帝尊,人物名都是听说过的,而且又能契合部分历史,整个故事布局非常宏大,并且情节矛盾埋得很深,这里就不对具体情节作介绍了,只是聊聊对书中的一些人物和情节的看法感受。

乌丝兰玛是个贯穿两部,甚至在蛮荒的最后还要再搞事情,极其坚定的自以为是的大 boss,其实除了最后被我们的主人公打败,前面几乎就是无所不能,下了一盘无比巨大的棋,主人公都只是其中一个棋子和意外,但是正如很多反派,一直以来都是背着一个信念,并且这个所谓的信念是比较正义的,只是为了这个正义的信念和目标却做了各种丧尽天良的事情,说起来跟灭霸有点像,为了环保哈哈,相对来说感觉姬远玄也只是个最大牌的工具人,或者说是中间人,深爱的妹妹冰夷也意外被蚩尤怒拿一血。

但是中间那个赤霞仙子一定要给烈烟石的心上锁,导致最后认不出来蚩尤,也间接导致了蚩尤被杀,如果不考虑最后情节或者推动故事的需求,这个还是我很讨厌的,有点类似于《驴得水》里那个校长,看着貌似是个正常的,做的事情也是正派,但是其实是害人不浅,即使南阳仙子因此被抛进了火山,那也是有贱人在那挑食,并且赤松子是赤飚怒的儿子,烈烟石跟蚩尤又没这层关系,就很像倚天屠龙记里的灭绝师太和极品家丁里的那个玉德仙坊的院主,后者还好一些,前者几乎就是导致周芷若一生悲剧的始作俑者,自己偏执的善恶观,还要给徒弟灌输如此恶毒的理念和让她立下像紧箍咒似的誓言,在人一生中本来就有很多不能如愿的,又被最亲最尊敬的人下了这样的紧箍咒,人生的不幸也加倍了。

似乎习惯了总要有个总结的,想说的应该是我觉得这些剧也好,书也好,我觉得最坏的人可能是大部分人眼中的一些次要人物,或者至少大 boss 才是最坏的人,当然这个坏也不是严格的二分法,只是我觉得最让我觉得负面的人物,这些人可能看起来情景出现的不多,只是说了很少的话,做了很少的事,但是在我看来却做了最大的恶。

题目介绍

Merge two sorted linked lists and return it as a sorted list. The list should be made by splicing together the nodes of the first two lists.

将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。

示例 1

输入:l1 = [1,2,4], l2 = [1,3,4]
输出:[1,1,2,3,4,4]

示例 2

输入: l1 = [], l2 = []
输出: []

示例 3

输入: l1 = [], l2 = [0]
输出: [0]

简要分析

这题是 Easy 的,看着也挺简单,两个链表进行合并,就是比较下大小,可能将就点的话最好就在两个链表中原地合并

题解代码

public ListNode mergeTwoLists(ListNode l1, ListNode l2) {
        // 下面两个if判断了入参的边界,如果其一为null,直接返回另一个就可以了
        if (l1 == null) {
            return l2;
        }
        if (l2 == null) {
            return l1;
        }
        // new 一个合并后的头结点
        ListNode merged = new ListNode();
        // 这个是当前节点
        ListNode current = merged;
        // 一开始给这个while加了l1和l2不全为null的条件,后面想了下不需要
        // 因为内部前两个if就是跳出条件
        while (true) {
            if (l1 == null) {
                // 这里其实跟开头类似,只不过这里需要将l2剩余部分接到merged链表后面
                // 所以不能是直接current = l2,这样就是把后面的直接丢了
                current.val = l2.val;
                current.next = l2.next;
                break;
            }
            if (l2 == null) {
                current.val = l1.val;
                current.next = l1.next;
                break;
            }
            // 这里是两个链表都不为空的时候,就比较下大小
            if (l1.val < l2.val) {
                current.val = l1.val;
                l1 = l1.next;
            } else {
                current.val = l2.val;
                l2 = l2.next;
            }
            // 这里是new个新的,其实也可以放在循环头上
            current.next = new ListNode();
            current = current.next;
        }
        current = null;
        // 返回这个头结点
        return merged;
    }

结果

ConsumeQueue 其实是定位到一个 topic 下的消息在 CommitLog 下的偏移量,它也是固定大小的

// ConsumeQueue file size,default is 30W
private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;

public static final int CQ_STORE_UNIT_SIZE = 20;

所以文件大小是5.7M 左右

5udpag

ConsumeQueue 的构建是通过org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService运行后的 doReput 方法,而启动是的 reputFromOffset 则是通过org.apache.rocketmq.store.DefaultMessageStore#start中下面代码设置并启动

log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
            this.reputMessageService.start();

看一下 doReput 的逻辑

private void doReput() {
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                    this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }

              // 根据偏移量获取消息
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                          // 消息校验和转换
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                  // 进行分发处理,包括 ConsumeQueue 和 IndexFile
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }

                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                            .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                } else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {

                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    // If user open the dledger pattern or the broker is master node,
                                    // it will not ignore the exception and fix the reputFromOffset variable
                                    if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                        DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);
                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }

分发的逻辑看到这

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }

真正存储的是在这

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }

    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

这里也可以看到 ConsumeQueue 的存储格式,

AA6Tve

偏移量,消息大小,跟 tag 的 hashCode

其实这个表示有点不太对,应该是 Druid 动态切换数据源的方法,只是应用在了 springboot 框架中,准备代码准备了半天,之前在一次数据库迁移中使用了,发现 Druid 还是很强大的,用来做动态数据源切换很方便。

首先这里的场景跟我原来用的有点点区别,在项目中使用的是通过配置中心控制数据源切换,统一切换,而这里的例子多加了个可以根据接口注解配置

第一部分是最核心的,如何基于 Spring JDBC 和 Druid 来实现数据源切换,是继承了org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource 这个类,他的determineCurrentLookupKey方法会被调用来获得用来决定选择那个数据源的对象,也就是 lookupKey,也可以通过这个类看到就是通过这个 lookupKey 来路由找到数据源。

public class DynamicDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        if (DatabaseContextHolder.getDatabaseType() != null) {
            return DatabaseContextHolder.getDatabaseType().getName();
        }
        return DatabaseType.MASTER1.getName();
    }
}

而如何使用这个 lookupKey 呢,就涉及到我们的 DataSource 配置了,原来就是我们可以直接通过spring 的 jdbc 配置数据源,像这样

现在我们要使用 Druid 作为数据源了,然后配置 DynamicDataSource的参数,通过 key 来选择对应的 DataSource,也就是下面配的 master1 和 master2

<bean id="master1" class="com.alibaba.druid.pool.DruidDataSource" init-method="init"
          destroy-method="close"
          p:driverClassName="com.mysql.cj.jdbc.Driver"
          p:url="${master1.demo.datasource.url}"
          p:username="${master1.demo.datasource.username}"
          p:password="${master1.demo.datasource.password}"
          p:initialSize="5"
          p:minIdle="1"
          p:maxActive="10"
          p:maxWait="60000"
          p:timeBetweenEvictionRunsMillis="60000"
          p:minEvictableIdleTimeMillis="300000"
          p:validationQuery="SELECT 'x'"
          p:testWhileIdle="true"
          p:testOnBorrow="false"
          p:testOnReturn="false"
          p:poolPreparedStatements="false"
          p:maxPoolPreparedStatementPerConnectionSize="20"
          p:connectionProperties="config.decrypt=true"
          p:filters="stat,config"/>

    <bean id="master2" class="com.alibaba.druid.pool.DruidDataSource" init-method="init"
          destroy-method="close"
          p:driverClassName="com.mysql.cj.jdbc.Driver"
          p:url="${master2.demo.datasource.url}"
          p:username="${master2.demo.datasource.username}"
          p:password="${master2.demo.datasource.password}"
          p:initialSize="5"
          p:minIdle="1"
          p:maxActive="10"
          p:maxWait="60000"
          p:timeBetweenEvictionRunsMillis="60000"
          p:minEvictableIdleTimeMillis="300000"
          p:validationQuery="SELECT 'x'"
          p:testWhileIdle="true"
          p:testOnBorrow="false"
          p:testOnReturn="false"
          p:poolPreparedStatements="false"
          p:maxPoolPreparedStatementPerConnectionSize="20"
          p:connectionProperties="config.decrypt=true"
          p:filters="stat,config"/>

    <bean id="dataSource" class="com.nicksxs.springdemo.config.DynamicDataSource">
        <property name="targetDataSources">
            <map key-type="java.lang.String">
                <!-- master -->
                <entry key="master1" value-ref="master1"/>
                <!-- slave -->
                <entry key="master2" value-ref="master2"/>
            </map>
        </property>
        <property name="defaultTargetDataSource" ref="master1"/>
    </bean>

现在就要回到头上,介绍下这个DatabaseContextHolder,这里使用了 ThreadLocal 存放这个 DatabaseType,为啥要用这个是因为前面说的我们想要让接口层面去配置不同的数据源,要把持相互隔离不受影响,就使用了 ThreadLocal,关于它也可以看我前面写的一篇文章聊聊传说中的 ThreadLocal,而 DatabaseType 就是个简单的枚举

public class DatabaseContextHolder {
    public static final ThreadLocal<DatabaseType> databaseTypeThreadLocal = new ThreadLocal<>();

    public static DatabaseType getDatabaseType() {
        return databaseTypeThreadLocal.get();
    }

    public static void putDatabaseType(DatabaseType databaseType) {
        databaseTypeThreadLocal.set(databaseType);
    }

    public static void clearDatabaseType() {
        databaseTypeThreadLocal.remove();
    }
}
public enum DatabaseType {
    MASTER1("master1", "1"),
    MASTER2("master2", "2");

    private final String name;
    private final String value;

    DatabaseType(String name, String value) {
        this.name = name;
        this.value = value;
    }

    public String getName() {
        return name;
    }

    public String getValue() {
        return value;
    }

    public static DatabaseType getDatabaseType(String name) {
        if (MASTER2.name.equals(name)) {
            return MASTER2;
        }
        return MASTER1;
    }
}

这边可以看到就是通过动态地通过putDatabaseType设置lookupKey来进行数据源切换,要通过接口注解配置来进行设置的话,我们就需要一个注解

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DataSource {
    String value();
}

这个注解可以配置在我的接口方法上,比如这样

public interface StudentService {

    @DataSource("master1")
    public Student queryOne();

    @DataSource("master2")
    public Student queryAnother();

}

通过切面来进行数据源的设置

@Aspect
@Component
@Order(-1)
public class DataSourceAspect {

    @Pointcut("execution(* com.nicksxs.springdemo.service..*.*(..))")
    public void pointCut() {

    }


    @Before("pointCut()")
    public void before(JoinPoint point)
    {
        Object target = point.getTarget();
        System.out.println(target.toString());
        String method = point.getSignature().getName();
        System.out.println(method);
        Class<?>[] classz = target.getClass().getInterfaces();
        Class<?>[] parameterTypes = ((MethodSignature) point.getSignature())
                .getMethod().getParameterTypes();
        try {
            Method m = classz[0].getMethod(method, parameterTypes);
            System.out.println("method"+ m.getName());
            if (m.isAnnotationPresent(DataSource.class)) {
                DataSource data = m.getAnnotation(DataSource.class);
                System.out.println("dataSource:"+data.value());
                DatabaseContextHolder.putDatabaseType(DatabaseType.getDatabaseType(data.value()));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @After("pointCut()")
    public void after() {
				DatabaseContextHolder.clearDatabaseType();
    }
}

通过接口判断是否带有注解跟是注解的值,DatabaseType 的配置不太好,不过先忽略了,然后在切点后进行清理

这是我 master1 的数据,

master2 的数据

然后跑一下简单的 demo,

@Override
public void run(String...args) {
	LOGGER.info("run here");
	System.out.println(studentService.queryOne());
	System.out.println(studentService.queryAnother());

}

看一下运行结果

其实这个方法应用场景不止可以用来迁移数据库,还能实现精细化的读写数据源分离之类的,算是做个简单记录和分享。