2.深入TiDB:入口代码分析及调试 TiDB

本文基于 TiDB release-5.1进行分析,需要用到 Go 1.16以后的版本

启动与调试

其实 TiDB 的调试非常的简单,我这里用的是 TiDB release-5.1,那么需要将 Go 的版本更新到 1.16 之后。main 函数是在 tidb-server 包里面,直接运行就好了。为了保证环境的统一,我用的是 Linux 的环境。

如果想要对自己的代码进行调试,只需要:

  1. 安装 mysql 客户端;

    yum install mysql
  2. 启动 TiDB tidb-server 包里面 main 函数;

  3. 启动 mysql 客户端;

    tidb 默认端口是 4000 ,账号是 root ,库我们选test

    mysql -h 127.0.0.1 -P 4000 -u root -D test
  4. 在对应的逻辑上断点;

    例如我们要看 insert 的执行逻辑,首先需要创建了一个表:

    CREATE TABLE t (
    id      VARCHAR(31),
    name    VARCHAR(50),
    age     int,
    key     id_idx (id)
    );

    在插入逻辑的地方断点住:

    image-20210718164645543

    然后执行插入指令即可

    INSERT INTO t VALUES ("pingcap001", "pingcap", 3);

从 main 函数开始

学会了如何调试 TiDB 之后,下面看看 TiDB 的 main 函数执行逻辑,它是在 tidb-server 包下面:


func main() {
    ...
    // 注册store
    registerStores()
    // 注册prometheus监控项
    registerMetrics()
    // 设置全局 config
    config.InitializeConfig(*configPath, *configCheck, *configStrict, overrideConfig)
    if config.GetGlobalConfig().OOMUseTmpStorage {
        config.GetGlobalConfig().UpdateTempStoragePath()
        err := disk.InitializeTempDir()
        terror.MustNil(err)
        checkTempStorageQuota()
    }
    setGlobalVars()
    // 设置CPU亲和性
    setCPUAffinity()
    //配置系统log
    setupLog()
    // 定时检测堆内存有没有超标
    setHeapProfileTracker()
    //注册分布式系统追踪链 jaeger
    setupTracing() // Should before createServer and after setup config.
    printInfo()
    // 设置binlog信息
    setupBinlogClient()
    // 配置监控
    setupMetrics()
    storage, dom := createStoreAndDomain()
    // 创建TiDB server
    svr := createServer(storage, dom)
    // 设置优雅关机
    exited := make(chan struct{})
    signal.SetupSignalHandler(func(graceful bool) {
        svr.Close()
        cleanup(svr, storage, dom, graceful)
        close(exited)
    })
    topsql.SetupTopSQL()
    //启动服务
    terror.MustNil(svr.Run())
    <-exited
    // 日志刷盘
    syncLog()
}

从上面的 main 方法可以看出它主要是加载配置项,然后设置配置信息。从上面的信息配置中,有几点我觉得可以借鉴到我们平时的项目中,一个是定时检测堆内存检测,另一个是优雅停机。

检测堆内存检测

堆内存检测的实现逻辑是在 setHeapProfileTracker 方法中:

func setHeapProfileTracker() {
    c := config.GetGlobalConfig()
    // 默认1分钟
    d := parseDuration(c.Performance.MemProfileInterval)
    // 异步运行
    go profile.HeapProfileForGlobalMemTracker(d)
}

func HeapProfileForGlobalMemTracker(d time.Duration) {
    log.Info("Mem Profile Tracker started")
    // 设置 ticker 为1分钟
    t := time.NewTicker(d)
    defer t.Stop()
    for {
        <-t.C
        // 通过 pprof 获取堆内存使用情况
        err := heapProfileForGlobalMemTracker()
        if err != nil {
            log.Warn("profile memory into tracker failed", zap.Error(err))
        }
    }
}

从上面的代码中可以看到 setHeapProfileTracker 里面实际上会启动一个 Goroutine 异步去定时 ticker (不熟悉定时器原理的可以看这篇:https://www.luozhiyun.com/archives/458 )执行 heapProfileForGlobalMemTracker 函数通过 pprof 获取堆内存使用情况。

func heapProfileForGlobalMemTracker() error {
    // 调用 pprof 获取堆内存使用情况
    bytes, err := col.getFuncMemUsage(kvcache.ProfileName)
    if err != nil {
        return err
    }
    defer func() {
        if p := recover(); p != nil {
            log.Error("GlobalLRUMemUsageTracker meet panic", zap.Any("panic", p), zap.Stack("stack"))
        }
    }()
    // 将内存放置到 cache 里
    kvcache.GlobalLRUMemUsageTracker.ReplaceBytesUsed(bytes)
    return nil
}

heapProfileForGlobalMemTracker 通过调用 pprof 获取堆内存使用情况,然后将获取到的信息传递给 GlobalLRUMemUsageTracker,这里比较有意思的是,GlobalLRUMemUsageTracker 是 Tracker 的实现类,会追踪 Tracker 整条链路的内存使用情况,如果达到阈值,那么会触发 父 Tracker 的 hook,抛出 panic 异常。

tracker

优雅停机

优雅停机在项目中就更加常用了,TiDB 在启动时会调用 SetupSignalHandler 函数执行相应的信号监听:

func SetupSignalHandler(shutdownFunc func(bool)) { 
    closeSignalChan := make(chan os.Signal, 1)
    signal.Notify(closeSignalChan,
        syscall.SIGHUP,
        syscall.SIGINT,
        syscall.SIGTERM,
        syscall.SIGQUIT)

    go func() {
        sig := <-closeSignalChan
        logutil.BgLogger().Info("got signal to exit", zap.Stringer("signal", sig))
        shutdownFunc(sig == syscall.SIGQUIT)
    }()
}

当监听到 SIGHUP 、SIGINT、SIGTERM、SIGQUIT 信号的时候,会执行传入的 shutdownFunc 函数:

    ...
    signal.SetupSignalHandler(func(graceful bool) {
        svr.Close()
        cleanup(svr, storage, dom, graceful)
        close(exited)
    })
    ...

传入到 SetupSignalHandler 中的函数首先会执行 server 的关闭,graceful 会当监听到 SIGQUIT 信号时为 true,然后会调用 cleanup 执行清理操作。

func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain, graceful bool) {
    // 是否是优雅停机
    if graceful {
        //优雅停机
        svr.GracefulDown(context.Background(), nil)
    } else {
        // 尝试优雅停机
        svr.TryGracefulDown()
    }
    // 清理所有插件资源
    plugin.Shutdown(context.Background()) 
    closeDomainAndStorage(storage, dom)
    disk.CleanUp()
    topsql.Close()
}

cleanup 里面则会清理连接、插件、磁盘以及关闭tikv资源等。如果 graceful 是 true,那么会调用 GracefulDown 循环清理空闲连接,直到连接数为0;如果是 false,那么会调用 TryGracefulDown 清理连接,如果连接在15秒内还没清理完毕则会强制清理。

启动服务

启动服务这个过程其实是和 net/http 的 server 非常的类似。入口在 main 函数的最下面,通过 server 的 Run 方法启动:

func (s *Server) Run() error {
    metrics.ServerEventCounter.WithLabelValues(metrics.EventStart).Inc()
    s.reportConfig()

    // 配置路由信息
    if s.cfg.Status.ReportStatus {
        s.startStatusHTTP()
    }
    for {
        // 监听客户端请求
        conn, err := s.listener.Accept()
        if err != nil {
            ...
        }
        // 创建connection
        clientConn := s.newConn(conn)

        // 处理connection请求
        go s.onConn(clientConn)
    }
}

Run 方法这里留下了主要的逻辑:

  1. 配置路由信息;
  2. 监听 connection;
  3. 为 connection 创建单独的 Goroutine 进行处理。

server

获取到的连接然后会调用 connection 的 Run 方法中读取 connection 的数据,接着调用到 connection 的 dispatch 方法来做请求逻辑转发处理。

func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
    ...
    // 执行的命令
    cmd := data[0]
    // 命令相应的参数
    data = data[1:]
    ...
    // 将[]byte 转为 string
    dataStr := string(hack.String(data))
    // 根据 cmd 选择相应的执行逻辑 
    switch cmd {
    case mysql.ComSleep: 
    case mysql.ComQuit: 
    case mysql.ComInitDB:  
    // 绝大多数sql 都会走这个逻辑
    // 包括增删改查
    case mysql.ComQuery:
        if len(data) > 0 && data[len(data)-1] == 0 {
            data = data[:len(data)-1]
            dataStr = string(hack.String(data))
        }
        return cc.handleQuery(ctx, dataStr)
    case mysql.ComFieldList: 
    case mysql.ComRefresh: 
    case mysql.ComShutdown:  
    case mysql.ComStatistics: 
    case mysql.ComPing: 
    case mysql.ComChangeUser:
    ...
    // ComEnd
    default:
        return mysql.NewErrf(mysql.ErrUnknown, "command %d not supported now", nil, cmd)
    }
}

dispatch 里面会获传入的数组,第一个byte 为命令类型,后面的为执行命令,如我们插入一条 insert 语句:

INSERT INTO t VALUES ("pingcap001", "pingcap", 3);

在这条语句中 cmd 为 3 ,data 为 INSERT INTO t VALUES ("pingcap001", "pingcap", 3);

然后根据 cmd 在 switch 判断中找到对应的执行逻辑进行相应的处理。

需要注意的是这里 mysql.ComQuery这个分支其实是包含了增删改查的,大家自己可以断点看看。

总结

这一篇其实是非常简单的,主要说一下如何配置环境进行相应的debug,然后就是介绍一下 main 方法里面主要做了些什么事情,以及我们可以从中学到什么。

对于 TiDB 的启动环节我们还可以参照前几次写的文章:《一文说透 Go 语言 HTTP 标准库》一起看看同样是服务端,TiDB为啥要自己实现一个。

Reference

https://zhuanlan.zhihu.com/p/163607256

https://www.qikqiak.com/post/use-vscode-remote-dev-debug/

https://zh.wikipedia.org/wiki/Unix%E4%BF%A1%E5%8F%B7

扫码_搜索联合传播样式-白色版 1